Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/cmd.py: 31%

553 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# cmd.py 

2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

3# 

4# This module is part of GitPython and is released under 

5# the BSD License: http://www.opensource.org/licenses/bsd-license.php 

6from __future__ import annotations 

7from contextlib import contextmanager 

8import io 

9import logging 

10import os 

11import signal 

12from subprocess import call, Popen, PIPE, DEVNULL 

13import subprocess 

14import threading 

15from textwrap import dedent 

16 

17from git.compat import ( 

18 defenc, 

19 force_bytes, 

20 safe_decode, 

21 is_posix, 

22 is_win, 

23) 

24from git.exc import CommandError 

25from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present 

26 

27from .exc import GitCommandError, GitCommandNotFound 

28from .util import ( 

29 LazyMixin, 

30 stream_copy, 

31) 

32 

33# typing --------------------------------------------------------------------------- 

34 

35from typing import ( 

36 Any, 

37 AnyStr, 

38 BinaryIO, 

39 Callable, 

40 Dict, 

41 IO, 

42 Iterator, 

43 List, 

44 Mapping, 

45 Sequence, 

46 TYPE_CHECKING, 

47 TextIO, 

48 Tuple, 

49 Union, 

50 cast, 

51 overload, 

52) 

53 

54from git.types import PathLike, Literal, TBD 

55 

56if TYPE_CHECKING: 56 ↛ 57line 56 didn't jump to line 57, because the condition on line 56 was never true

57 from git.repo.base import Repo 

58 from git.diff import DiffIndex 

59 

60 

61# --------------------------------------------------------------------------------- 

62 

63execute_kwargs = { 

64 "istream", 

65 "with_extended_output", 

66 "with_exceptions", 

67 "as_process", 

68 "stdout_as_string", 

69 "output_stream", 

70 "with_stdout", 

71 "kill_after_timeout", 

72 "universal_newlines", 

73 "shell", 

74 "env", 

75 "max_chunk_size", 

76 "strip_newline_in_stdout", 

77} 

78 

79log = logging.getLogger(__name__) 

80log.addHandler(logging.NullHandler()) 

81 

82__all__ = ("Git",) 

83 

84 

85# ============================================================================== 

86## @name Utilities 

87# ------------------------------------------------------------------------------ 

88# Documentation 

89## @{ 

90 

91 

92def handle_process_output( 

93 process: "Git.AutoInterrupt" | Popen, 

94 stdout_handler: Union[ 

95 None, 

96 Callable[[AnyStr], None], 

97 Callable[[List[AnyStr]], None], 

98 Callable[[bytes, "Repo", "DiffIndex"], None], 

99 ], 

100 stderr_handler: Union[None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]], 

101 finalizer: Union[None, Callable[[Union[subprocess.Popen, "Git.AutoInterrupt"]], None]] = None, 

102 decode_streams: bool = True, 

103 kill_after_timeout: Union[None, float] = None, 

104) -> None: 

105 """Registers for notifications to learn that process output is ready to read, and dispatches lines to 

106 the respective line handlers. 

107 This function returns once the finalizer returns 

108 

109 :return: result of finalizer 

110 :param process: subprocess.Popen instance 

111 :param stdout_handler: f(stdout_line_string), or None 

112 :param stderr_handler: f(stderr_line_string), or None 

113 :param finalizer: f(proc) - wait for proc to finish 

114 :param decode_streams: 

115 Assume stdout/stderr streams are binary and decode them before pushing \ 

116 their contents to handlers. 

117 Set it to False if `universal_newline == True` (then streams are in text-mode) 

118 or if decoding must happen later (i.e. for Diffs). 

119 :param kill_after_timeout: 

120 float or None, Default = None 

121 To specify a timeout in seconds for the git command, after which the process 

122 should be killed. 

123 """ 

124 # Use 2 "pump" threads and wait for both to finish. 

125 def pump_stream( 

126 cmdline: List[str], 

127 name: str, 

128 stream: Union[BinaryIO, TextIO], 

129 is_decode: bool, 

130 handler: Union[None, Callable[[Union[bytes, str]], None]], 

131 ) -> None: 

132 try: 

133 for line in stream: 

134 if handler: 

135 if is_decode: 

136 assert isinstance(line, bytes) 

137 line_str = line.decode(defenc) 

138 handler(line_str) 

139 else: 

140 handler(line) 

141 

142 except Exception as ex: 

143 log.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}") 

144 if "I/O operation on closed file" not in str(ex): 

145 # Only reraise if the error was not due to the stream closing 

146 raise CommandError([f"<{name}-pump>"] + remove_password_if_present(cmdline), ex) from ex 

147 finally: 

148 stream.close() 

149 

150 if hasattr(process, "proc"): 

151 process = cast("Git.AutoInterrupt", process) 

152 cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, "args", "") 

153 p_stdout = process.proc.stdout if process.proc else None 

154 p_stderr = process.proc.stderr if process.proc else None 

155 else: 

156 process = cast(Popen, process) 

157 cmdline = getattr(process, "args", "") 

158 p_stdout = process.stdout 

159 p_stderr = process.stderr 

160 

161 if not isinstance(cmdline, (tuple, list)): 

162 cmdline = cmdline.split() 

163 

164 pumps: List[Tuple[str, IO, Callable[..., None] | None]] = [] 

165 if p_stdout: 

166 pumps.append(("stdout", p_stdout, stdout_handler)) 

167 if p_stderr: 

168 pumps.append(("stderr", p_stderr, stderr_handler)) 

169 

170 threads: List[threading.Thread] = [] 

171 

172 for name, stream, handler in pumps: 

173 t = threading.Thread(target=pump_stream, args=(cmdline, name, stream, decode_streams, handler)) 

174 t.daemon = True 

175 t.start() 

176 threads.append(t) 

177 

178 ## FIXME: Why Join?? Will block if `stdin` needs feeding... 

179 # 

180 for t in threads: 

181 t.join(timeout=kill_after_timeout) 

182 if t.is_alive(): 

183 if isinstance(process, Git.AutoInterrupt): 

184 process._terminate() 

185 else: # Don't want to deal with the other case 

186 raise RuntimeError( 

187 "Thread join() timed out in cmd.handle_process_output()." 

188 f" kill_after_timeout={kill_after_timeout} seconds" 

189 ) 

190 if stderr_handler: 

191 error_str: Union[str, bytes] = ( 

192 "error: process killed because it timed out." f" kill_after_timeout={kill_after_timeout} seconds" 

193 ) 

194 if not decode_streams and isinstance(p_stderr, BinaryIO): 

195 # Assume stderr_handler needs binary input 

196 error_str = cast(str, error_str) 

197 error_str = error_str.encode() 

198 # We ignore typing on the next line because mypy does not like 

199 # the way we inferred that stderr takes str or bytes 

200 stderr_handler(error_str) # type: ignore 

201 

202 if finalizer: 

203 return finalizer(process) 

204 else: 

205 return None 

206 

207 

208def dashify(string: str) -> str: 

209 return string.replace("_", "-") 

210 

211 

212def slots_to_dict(self: object, exclude: Sequence[str] = ()) -> Dict[str, Any]: 

213 return {s: getattr(self, s) for s in self.__slots__ if s not in exclude} 

214 

215 

216def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None: 

217 for k, v in d.items(): 

218 setattr(self, k, v) 

219 for k in excluded: 

220 setattr(self, k, None) 

221 

222 

223## -- End Utilities -- @} 

224 

225 

226# value of Windows process creation flag taken from MSDN 

227CREATE_NO_WINDOW = 0x08000000 

228 

229## CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards, 

230# see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal 

231PROC_CREATIONFLAGS = ( 

232 CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP if is_win else 0 # type: ignore[attr-defined] 

233) # mypy error if not windows 

234 

235 

236class Git(LazyMixin): 

237 

238 """ 

239 The Git class manages communication with the Git binary. 

240 

241 It provides a convenient interface to calling the Git binary, such as in:: 

242 

243 g = Git( git_dir ) 

244 g.init() # calls 'git init' program 

245 rval = g.ls_files() # calls 'git ls-files' program 

246 

247 ``Debugging`` 

248 Set the GIT_PYTHON_TRACE environment variable print each invocation 

249 of the command to stdout. 

250 Set its value to 'full' to see details about the returned values. 

251 """ 

252 

253 __slots__ = ( 

254 "_working_dir", 

255 "cat_file_all", 

256 "cat_file_header", 

257 "_version_info", 

258 "_git_options", 

259 "_persistent_git_options", 

260 "_environment", 

261 ) 

262 

263 _excluded_ = ("cat_file_all", "cat_file_header", "_version_info") 

264 

265 def __getstate__(self) -> Dict[str, Any]: 

266 return slots_to_dict(self, exclude=self._excluded_) 

267 

268 def __setstate__(self, d: Dict[str, Any]) -> None: 

269 dict_to_slots_and__excluded_are_none(self, d, excluded=self._excluded_) 

270 

271 # CONFIGURATION 

272 

273 git_exec_name = "git" # default that should work on linux and windows 

274 

275 # Enables debugging of GitPython's git commands 

276 GIT_PYTHON_TRACE = os.environ.get("GIT_PYTHON_TRACE", False) 

277 

278 # If True, a shell will be used when executing git commands. 

279 # This should only be desirable on Windows, see https://github.com/gitpython-developers/GitPython/pull/126 

280 # and check `git/test_repo.py:TestRepo.test_untracked_files()` TC for an example where it is required. 

281 # Override this value using `Git.USE_SHELL = True` 

282 USE_SHELL = False 

283 

284 # Provide the full path to the git executable. Otherwise it assumes git is in the path 

285 _git_exec_env_var = "GIT_PYTHON_GIT_EXECUTABLE" 

286 _refresh_env_var = "GIT_PYTHON_REFRESH" 

287 GIT_PYTHON_GIT_EXECUTABLE = None 

288 # note that the git executable is actually found during the refresh step in 

289 # the top level __init__ 

290 

291 @classmethod 

292 def refresh(cls, path: Union[None, PathLike] = None) -> bool: 

293 """This gets called by the refresh function (see the top level 

294 __init__). 

295 """ 

296 # discern which path to refresh with 

297 if path is not None: 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true

298 new_git = os.path.expanduser(path) 

299 new_git = os.path.abspath(new_git) 

300 else: 

301 new_git = os.environ.get(cls._git_exec_env_var, cls.git_exec_name) 

302 

303 # keep track of the old and new git executable path 

304 old_git = cls.GIT_PYTHON_GIT_EXECUTABLE 

305 cls.GIT_PYTHON_GIT_EXECUTABLE = new_git 

306 

307 # test if the new git executable path is valid 

308 

309 # - a GitCommandNotFound error is spawned by ourselves 

310 # - a PermissionError is spawned if the git executable provided 

311 # cannot be executed for whatever reason 

312 

313 has_git = False 

314 try: 

315 cls().version() 

316 has_git = True 

317 except (GitCommandNotFound, PermissionError): 

318 pass 

319 

320 # warn or raise exception if test failed 

321 if not has_git: 321 ↛ 322line 321 didn't jump to line 322

322 err = ( 

323 dedent( 

324 """\ 

325 Bad git executable. 

326 The git executable must be specified in one of the following ways: 

327 - be included in your $PATH 

328 - be set via $%s 

329 - explicitly set via git.refresh() 

330 """ 

331 ) 

332 % cls._git_exec_env_var 

333 ) 

334 

335 # revert to whatever the old_git was 

336 cls.GIT_PYTHON_GIT_EXECUTABLE = old_git 

337 

338 if old_git is None: 

339 # on the first refresh (when GIT_PYTHON_GIT_EXECUTABLE is 

340 # None) we only are quiet, warn, or error depending on the 

341 # GIT_PYTHON_REFRESH value 

342 

343 # determine what the user wants to happen during the initial 

344 # refresh we expect GIT_PYTHON_REFRESH to either be unset or 

345 # be one of the following values: 

346 # 0|q|quiet|s|silence 

347 # 1|w|warn|warning 

348 # 2|r|raise|e|error 

349 

350 mode = os.environ.get(cls._refresh_env_var, "raise").lower() 

351 

352 quiet = ["quiet", "q", "silence", "s", "none", "n", "0"] 

353 warn = ["warn", "w", "warning", "1"] 

354 error = ["error", "e", "raise", "r", "2"] 

355 

356 if mode in quiet: 

357 pass 

358 elif mode in warn or mode in error: 

359 err = ( 

360 dedent( 

361 """\ 

362 %s 

363 All git commands will error until this is rectified. 

364 

365 This initial warning can be silenced or aggravated in the future by setting the 

366 $%s environment variable. Use one of the following values: 

367 - %s: for no warning or exception 

368 - %s: for a printed warning 

369 - %s: for a raised exception 

370 

371 Example: 

372 export %s=%s 

373 """ 

374 ) 

375 % ( 

376 err, 

377 cls._refresh_env_var, 

378 "|".join(quiet), 

379 "|".join(warn), 

380 "|".join(error), 

381 cls._refresh_env_var, 

382 quiet[0], 

383 ) 

384 ) 

385 

386 if mode in warn: 

387 print("WARNING: %s" % err) 

388 else: 

389 raise ImportError(err) 

390 else: 

391 err = ( 

392 dedent( 

393 """\ 

394 %s environment variable has been set but it has been set with an invalid value. 

395 

396 Use only the following values: 

397 - %s: for no warning or exception 

398 - %s: for a printed warning 

399 - %s: for a raised exception 

400 """ 

401 ) 

402 % ( 

403 cls._refresh_env_var, 

404 "|".join(quiet), 

405 "|".join(warn), 

406 "|".join(error), 

407 ) 

408 ) 

409 raise ImportError(err) 

410 

411 # we get here if this was the init refresh and the refresh mode 

412 # was not error, go ahead and set the GIT_PYTHON_GIT_EXECUTABLE 

413 # such that we discern the difference between a first import 

414 # and a second import 

415 cls.GIT_PYTHON_GIT_EXECUTABLE = cls.git_exec_name 

416 else: 

417 # after the first refresh (when GIT_PYTHON_GIT_EXECUTABLE 

418 # is no longer None) we raise an exception 

419 raise GitCommandNotFound("git", err) 

420 

421 return has_git 

422 

423 @classmethod 

424 def is_cygwin(cls) -> bool: 

425 return is_cygwin_git(cls.GIT_PYTHON_GIT_EXECUTABLE) 

426 

427 @overload 

428 @classmethod 

429 def polish_url(cls, url: str, is_cygwin: Literal[False] = ...) -> str: 

430 ... 

431 

432 @overload 

433 @classmethod 

434 def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> str: 

435 ... 

436 

437 @classmethod 

438 def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> PathLike: 

439 if is_cygwin is None: 

440 is_cygwin = cls.is_cygwin() 

441 

442 if is_cygwin: 

443 url = cygpath(url) 

444 else: 

445 """Remove any backslahes from urls to be written in config files. 

446 

447 Windows might create config-files containing paths with backslashed, 

448 but git stops liking them as it will escape the backslashes. 

449 Hence we undo the escaping just to be sure. 

450 """ 

451 url = os.path.expandvars(url) 

452 if url.startswith("~"): 

453 url = os.path.expanduser(url) 

454 url = url.replace("\\\\", "\\").replace("\\", "/") 

455 return url 

456 

457 class AutoInterrupt(object): 

458 """Kill/Interrupt the stored process instance once this instance goes out of scope. It is 

459 used to prevent processes piling up in case iterators stop reading. 

460 Besides all attributes are wired through to the contained process object. 

461 

462 The wait method was overridden to perform automatic status code checking 

463 and possibly raise.""" 

464 

465 __slots__ = ("proc", "args", "status") 

466 

467 # If this is non-zero it will override any status code during 

468 # _terminate, used to prevent race conditions in testing 

469 _status_code_if_terminate: int = 0 

470 

471 def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None: 

472 self.proc = proc 

473 self.args = args 

474 self.status: Union[int, None] = None 

475 

476 def _terminate(self) -> None: 

477 """Terminate the underlying process""" 

478 if self.proc is None: 

479 return 

480 

481 proc = self.proc 

482 self.proc = None 

483 if proc.stdin: 

484 proc.stdin.close() 

485 if proc.stdout: 

486 proc.stdout.close() 

487 if proc.stderr: 

488 proc.stderr.close() 

489 # did the process finish already so we have a return code ? 

490 try: 

491 if proc.poll() is not None: 

492 self.status = self._status_code_if_terminate or proc.poll() 

493 return None 

494 except OSError as ex: 

495 log.info("Ignored error after process had died: %r", ex) 

496 

497 # can be that nothing really exists anymore ... 

498 if os is None or getattr(os, "kill", None) is None: 

499 return None 

500 

501 # try to kill it 

502 try: 

503 proc.terminate() 

504 status = proc.wait() # ensure process goes away 

505 

506 self.status = self._status_code_if_terminate or status 

507 except OSError as ex: 

508 log.info("Ignored error after process had died: %r", ex) 

509 except AttributeError: 

510 # try windows 

511 # for some reason, providing None for stdout/stderr still prints something. This is why 

512 # we simply use the shell and redirect to nul. Its slower than CreateProcess, question 

513 # is whether we really want to see all these messages. Its annoying no matter what. 

514 if is_win: 

515 call( 

516 ("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), 

517 shell=True, 

518 ) 

519 # END exception handling 

520 

521 def __del__(self) -> None: 

522 self._terminate() 

523 

524 def __getattr__(self, attr: str) -> Any: 

525 return getattr(self.proc, attr) 

526 

527 # TODO: Bad choice to mimic `proc.wait()` but with different args. 

528 def wait(self, stderr: Union[None, str, bytes] = b"") -> int: 

529 """Wait for the process and return its status code. 

530 

531 :param stderr: Previously read value of stderr, in case stderr is already closed. 

532 :warn: may deadlock if output or error pipes are used and not handled separately. 

533 :raise GitCommandError: if the return status is not 0""" 

534 if stderr is None: 

535 stderr_b = b"" 

536 stderr_b = force_bytes(data=stderr, encoding="utf-8") 

537 status: Union[int, None] 

538 if self.proc is not None: 

539 status = self.proc.wait() 

540 p_stderr = self.proc.stderr 

541 else: # Assume the underlying proc was killed earlier or never existed 

542 status = self.status 

543 p_stderr = None 

544 

545 def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes: 

546 if stream: 

547 try: 

548 return stderr_b + force_bytes(stream.read()) 

549 except (OSError, ValueError): 

550 return stderr_b or b"" 

551 else: 

552 return stderr_b or b"" 

553 

554 # END status handling 

555 

556 if status != 0: 

557 errstr = read_all_from_possibly_closed_stream(p_stderr) 

558 log.debug("AutoInterrupt wait stderr: %r" % (errstr,)) 

559 raise GitCommandError(remove_password_if_present(self.args), status, errstr) 

560 return status 

561 

562 # END auto interrupt 

563 

564 class CatFileContentStream(object): 

565 

566 """Object representing a sized read-only stream returning the contents of 

567 an object. 

568 It behaves like a stream, but counts the data read and simulates an empty 

569 stream once our sized content region is empty. 

570 If not all data is read to the end of the objects's lifetime, we read the 

571 rest to assure the underlying stream continues to work""" 

572 

573 __slots__: Tuple[str, ...] = ("_stream", "_nbr", "_size") 

574 

575 def __init__(self, size: int, stream: IO[bytes]) -> None: 

576 self._stream = stream 

577 self._size = size 

578 self._nbr = 0 # num bytes read 

579 

580 # special case: if the object is empty, has null bytes, get the 

581 # final newline right away. 

582 if size == 0: 

583 stream.read(1) 

584 # END handle empty streams 

585 

586 def read(self, size: int = -1) -> bytes: 

587 bytes_left = self._size - self._nbr 

588 if bytes_left == 0: 

589 return b"" 

590 if size > -1: 

591 # assure we don't try to read past our limit 

592 size = min(bytes_left, size) 

593 else: 

594 # they try to read all, make sure its not more than what remains 

595 size = bytes_left 

596 # END check early depletion 

597 data = self._stream.read(size) 

598 self._nbr += len(data) 

599 

600 # check for depletion, read our final byte to make the stream usable by others 

601 if self._size - self._nbr == 0: 

602 self._stream.read(1) # final newline 

603 # END finish reading 

604 return data 

605 

606 def readline(self, size: int = -1) -> bytes: 

607 if self._nbr == self._size: 

608 return b"" 

609 

610 # clamp size to lowest allowed value 

611 bytes_left = self._size - self._nbr 

612 if size > -1: 

613 size = min(bytes_left, size) 

614 else: 

615 size = bytes_left 

616 # END handle size 

617 

618 data = self._stream.readline(size) 

619 self._nbr += len(data) 

620 

621 # handle final byte 

622 if self._size - self._nbr == 0: 

623 self._stream.read(1) 

624 # END finish reading 

625 

626 return data 

627 

628 def readlines(self, size: int = -1) -> List[bytes]: 

629 if self._nbr == self._size: 

630 return [] 

631 

632 # leave all additional logic to our readline method, we just check the size 

633 out = [] 

634 nbr = 0 

635 while True: 

636 line = self.readline() 

637 if not line: 

638 break 

639 out.append(line) 

640 if size > -1: 

641 nbr += len(line) 

642 if nbr > size: 

643 break 

644 # END handle size constraint 

645 # END readline loop 

646 return out 

647 

648 # skipcq: PYL-E0301 

649 def __iter__(self) -> "Git.CatFileContentStream": 

650 return self 

651 

652 def __next__(self) -> bytes: 

653 return next(self) 

654 

655 def next(self) -> bytes: 

656 line = self.readline() 

657 if not line: 

658 raise StopIteration 

659 

660 return line 

661 

662 def __del__(self) -> None: 

663 bytes_left = self._size - self._nbr 

664 if bytes_left: 

665 # read and discard - seeking is impossible within a stream 

666 # includes terminating newline 

667 self._stream.read(bytes_left + 1) 

668 # END handle incomplete read 

669 

670 def __init__(self, working_dir: Union[None, PathLike] = None): 

671 """Initialize this instance with: 

672 

673 :param working_dir: 

674 Git directory we should work in. If None, we always work in the current 

675 directory as returned by os.getcwd(). 

676 It is meant to be the working tree directory if available, or the 

677 .git directory in case of bare repositories.""" 

678 super(Git, self).__init__() 

679 self._working_dir = expand_path(working_dir) 

680 self._git_options: Union[List[str], Tuple[str, ...]] = () 

681 self._persistent_git_options: List[str] = [] 

682 

683 # Extra environment variables to pass to git commands 

684 self._environment: Dict[str, str] = {} 

685 

686 # cached command slots 

687 self.cat_file_header: Union[None, TBD] = None 

688 self.cat_file_all: Union[None, TBD] = None 

689 

690 def __getattr__(self, name: str) -> Any: 

691 """A convenience method as it allows to call the command as if it was 

692 an object. 

693 :return: Callable object that will execute call _call_process with your arguments.""" 

694 if name[0] == "_": 

695 return LazyMixin.__getattr__(self, name) 

696 return lambda *args, **kwargs: self._call_process(name, *args, **kwargs) 

697 

698 def set_persistent_git_options(self, **kwargs: Any) -> None: 

699 """Specify command line options to the git executable 

700 for subsequent subcommand calls 

701 

702 :param kwargs: 

703 is a dict of keyword arguments. 

704 these arguments are passed as in _call_process 

705 but will be passed to the git command rather than 

706 the subcommand. 

707 """ 

708 

709 self._persistent_git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) 

710 

711 def _set_cache_(self, attr: str) -> None: 

712 if attr == "_version_info": 712 ↛ 722line 712 didn't jump to line 722, because the condition on line 712 was never false

713 # We only use the first 4 numbers, as everything else could be strings in fact (on windows) 

714 process_version = self._call_process("version") # should be as default *args and **kwargs used 

715 version_numbers = process_version.split(" ")[2] 

716 

717 self._version_info = cast( 

718 Tuple[int, int, int, int], 

719 tuple(int(n) for n in version_numbers.split(".")[:4] if n.isdigit()), 

720 ) 

721 else: 

722 super(Git, self)._set_cache_(attr) 

723 # END handle version info 

724 

725 @property 

726 def working_dir(self) -> Union[None, PathLike]: 

727 """:return: Git directory we are working on""" 

728 return self._working_dir 

729 

730 @property 

731 def version_info(self) -> Tuple[int, int, int, int]: 

732 """ 

733 :return: tuple(int, int, int, int) tuple with integers representing the major, minor 

734 and additional version numbers as parsed from git version. 

735 This value is generated on demand and is cached""" 

736 return self._version_info 

737 

738 @overload 

739 def execute(self, command: Union[str, Sequence[Any]], *, as_process: Literal[True]) -> "AutoInterrupt": 

740 ... 

741 

742 @overload 

743 def execute( 

744 self, 

745 command: Union[str, Sequence[Any]], 

746 *, 

747 as_process: Literal[False] = False, 

748 stdout_as_string: Literal[True], 

749 ) -> Union[str, Tuple[int, str, str]]: 

750 ... 

751 

752 @overload 

753 def execute( 

754 self, 

755 command: Union[str, Sequence[Any]], 

756 *, 

757 as_process: Literal[False] = False, 

758 stdout_as_string: Literal[False] = False, 

759 ) -> Union[bytes, Tuple[int, bytes, str]]: 

760 ... 

761 

762 @overload 

763 def execute( 

764 self, 

765 command: Union[str, Sequence[Any]], 

766 *, 

767 with_extended_output: Literal[False], 

768 as_process: Literal[False], 

769 stdout_as_string: Literal[True], 

770 ) -> str: 

771 ... 

772 

773 @overload 

774 def execute( 

775 self, 

776 command: Union[str, Sequence[Any]], 

777 *, 

778 with_extended_output: Literal[False], 

779 as_process: Literal[False], 

780 stdout_as_string: Literal[False], 

781 ) -> bytes: 

782 ... 

783 

784 def execute( 

785 self, 

786 command: Union[str, Sequence[Any]], 

787 istream: Union[None, BinaryIO] = None, 

788 with_extended_output: bool = False, 

789 with_exceptions: bool = True, 

790 as_process: bool = False, 

791 output_stream: Union[None, BinaryIO] = None, 

792 stdout_as_string: bool = True, 

793 kill_after_timeout: Union[None, float] = None, 

794 with_stdout: bool = True, 

795 universal_newlines: bool = False, 

796 shell: Union[None, bool] = None, 

797 env: Union[None, Mapping[str, str]] = None, 

798 max_chunk_size: int = io.DEFAULT_BUFFER_SIZE, 

799 strip_newline_in_stdout: bool = True, 

800 **subprocess_kwargs: Any, 

801 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]: 

802 """Handles executing the command on the shell and consumes and returns 

803 the returned information (stdout) 

804 

805 :param command: 

806 The command argument list to execute. 

807 It should be a string, or a sequence of program arguments. The 

808 program to execute is the first item in the args sequence or string. 

809 

810 :param istream: 

811 Standard input filehandle passed to subprocess.Popen. 

812 

813 :param with_extended_output: 

814 Whether to return a (status, stdout, stderr) tuple. 

815 

816 :param with_exceptions: 

817 Whether to raise an exception when git returns a non-zero status. 

818 

819 :param as_process: 

820 Whether to return the created process instance directly from which 

821 streams can be read on demand. This will render with_extended_output and 

822 with_exceptions ineffective - the caller will have 

823 to deal with the details himself. 

824 It is important to note that the process will be placed into an AutoInterrupt 

825 wrapper that will interrupt the process once it goes out of scope. If you 

826 use the command in iterators, you should pass the whole process instance 

827 instead of a single stream. 

828 

829 :param output_stream: 

830 If set to a file-like object, data produced by the git command will be 

831 output to the given stream directly. 

832 This feature only has any effect if as_process is False. Processes will 

833 always be created with a pipe due to issues with subprocess. 

834 This merely is a workaround as data will be copied from the 

835 output pipe to the given output stream directly. 

836 Judging from the implementation, you shouldn't use this flag ! 

837 

838 :param stdout_as_string: 

839 if False, the commands standard output will be bytes. Otherwise, it will be 

840 decoded into a string using the default encoding (usually utf-8). 

841 The latter can fail, if the output contains binary data. 

842 

843 :param env: 

844 A dictionary of environment variables to be passed to `subprocess.Popen`. 

845 

846 :param max_chunk_size: 

847 Maximum number of bytes in one chunk of data passed to the output_stream in 

848 one invocation of write() method. If the given number is not positive then 

849 the default value is used. 

850 

851 :param subprocess_kwargs: 

852 Keyword arguments to be passed to subprocess.Popen. Please note that 

853 some of the valid kwargs are already set by this method, the ones you 

854 specify may not be the same ones. 

855 

856 :param with_stdout: If True, default True, we open stdout on the created process 

857 :param universal_newlines: 

858 if True, pipes will be opened as text, and lines are split at 

859 all known line endings. 

860 :param shell: 

861 Whether to invoke commands through a shell (see `Popen(..., shell=True)`). 

862 It overrides :attr:`USE_SHELL` if it is not `None`. 

863 :param kill_after_timeout: 

864 To specify a timeout in seconds for the git command, after which the process 

865 should be killed. This will have no effect if as_process is set to True. It is 

866 set to None by default and will let the process run until the timeout is 

867 explicitly specified. This feature is not supported on Windows. It's also worth 

868 noting that kill_after_timeout uses SIGKILL, which can have negative side 

869 effects on a repository. For example, stale locks in case of git gc could 

870 render the repository incapable of accepting changes until the lock is manually 

871 removed. 

872 :param strip_newline_in_stdout: 

873 Whether to strip the trailing `\n` of the command stdout. 

874 :return: 

875 * str(output) if extended_output = False (Default) 

876 * tuple(int(status), str(stdout), str(stderr)) if extended_output = True 

877 

878 if output_stream is True, the stdout value will be your output stream: 

879 * output_stream if extended_output = False 

880 * tuple(int(status), output_stream, str(stderr)) if extended_output = True 

881 

882 Note git is executed with LC_MESSAGES="C" to ensure consistent 

883 output regardless of system language. 

884 

885 :raise GitCommandError: 

886 

887 :note: 

888 If you add additional keyword arguments to the signature of this method, 

889 you must update the execute_kwargs tuple housed in this module.""" 

890 # Remove password for the command if present 

891 redacted_command = remove_password_if_present(command) 

892 if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != "full" or as_process): 892 ↛ 893line 892 didn't jump to line 893, because the condition on line 892 was never true

893 log.info(" ".join(redacted_command)) 

894 

895 # Allow the user to have the command executed in their working dir. 

896 try: 

897 cwd = self._working_dir or os.getcwd() # type: Union[None, str] 

898 if not os.access(str(cwd), os.X_OK): 898 ↛ 899line 898 didn't jump to line 899, because the condition on line 898 was never true

899 cwd = None 

900 except FileNotFoundError: 

901 cwd = None 

902 

903 # Start the process 

904 inline_env = env 

905 env = os.environ.copy() 

906 # Attempt to force all output to plain ascii english, which is what some parsing code 

907 # may expect. 

908 # According to stackoverflow (http://goo.gl/l74GC8), we are setting LANGUAGE as well 

909 # just to be sure. 

910 env["LANGUAGE"] = "C" 

911 env["LC_ALL"] = "C" 

912 env.update(self._environment) 

913 if inline_env is not None: 913 ↛ 914line 913 didn't jump to line 914, because the condition on line 913 was never true

914 env.update(inline_env) 

915 

916 if is_win: 916 ↛ 917line 916 didn't jump to line 917, because the condition on line 916 was never true

917 cmd_not_found_exception = OSError 

918 if kill_after_timeout is not None: 

919 raise GitCommandError( 

920 redacted_command, 

921 '"kill_after_timeout" feature is not supported on Windows.', 

922 ) 

923 else: 

924 cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable 

925 # end handle 

926 

927 stdout_sink = PIPE if with_stdout else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb") 

928 istream_ok = "None" 

929 if istream: 929 ↛ 930line 929 didn't jump to line 930, because the condition on line 929 was never true

930 istream_ok = "<valid stream>" 

931 log.debug( 

932 "Popen(%s, cwd=%s, universal_newlines=%s, shell=%s, istream=%s)", 

933 redacted_command, 

934 cwd, 

935 universal_newlines, 

936 shell, 

937 istream_ok, 

938 ) 

939 try: 

940 proc = Popen( 

941 command, 

942 env=env, 

943 cwd=cwd, 

944 bufsize=-1, 

945 stdin=istream or DEVNULL, 

946 stderr=PIPE, 

947 stdout=stdout_sink, 

948 shell=shell is not None and shell or self.USE_SHELL, 

949 close_fds=is_posix, # unsupported on windows 

950 universal_newlines=universal_newlines, 

951 creationflags=PROC_CREATIONFLAGS, 

952 **subprocess_kwargs, 

953 ) 

954 

955 except cmd_not_found_exception as err: 

956 raise GitCommandNotFound(redacted_command, err) from err 

957 else: 

958 # replace with a typeguard for Popen[bytes]? 

959 proc.stdout = cast(BinaryIO, proc.stdout) 

960 proc.stderr = cast(BinaryIO, proc.stderr) 

961 

962 if as_process: 962 ↛ 963line 962 didn't jump to line 963, because the condition on line 962 was never true

963 return self.AutoInterrupt(proc, command) 

964 

965 def _kill_process(pid: int) -> None: 

966 """Callback method to kill a process.""" 

967 p = Popen( 

968 ["ps", "--ppid", str(pid)], 

969 stdout=PIPE, 

970 creationflags=PROC_CREATIONFLAGS, 

971 ) 

972 child_pids = [] 

973 if p.stdout is not None: 

974 for line in p.stdout: 

975 if len(line.split()) > 0: 

976 local_pid = (line.split())[0] 

977 if local_pid.isdigit(): 

978 child_pids.append(int(local_pid)) 

979 try: 

980 # Windows does not have SIGKILL, so use SIGTERM instead 

981 sig = getattr(signal, "SIGKILL", signal.SIGTERM) 

982 os.kill(pid, sig) 

983 for child_pid in child_pids: 

984 try: 

985 os.kill(child_pid, sig) 

986 except OSError: 

987 pass 

988 kill_check.set() # tell the main routine that the process was killed 

989 except OSError: 

990 # It is possible that the process gets completed in the duration after timeout 

991 # happens and before we try to kill the process. 

992 pass 

993 return 

994 

995 # end 

996 

997 if kill_after_timeout is not None: 997 ↛ 998line 997 didn't jump to line 998, because the condition on line 997 was never true

998 kill_check = threading.Event() 

999 watchdog = threading.Timer(kill_after_timeout, _kill_process, args=(proc.pid,)) 

1000 

1001 # Wait for the process to return 

1002 status = 0 

1003 stdout_value: Union[str, bytes] = b"" 

1004 stderr_value: Union[str, bytes] = b"" 

1005 newline = "\n" if universal_newlines else b"\n" 

1006 try: 

1007 if output_stream is None: 1007 ↛ 1028line 1007 didn't jump to line 1028, because the condition on line 1007 was never false

1008 if kill_after_timeout is not None: 1008 ↛ 1009line 1008 didn't jump to line 1009, because the condition on line 1008 was never true

1009 watchdog.start() 

1010 stdout_value, stderr_value = proc.communicate() 

1011 if kill_after_timeout is not None: 1011 ↛ 1012line 1011 didn't jump to line 1012, because the condition on line 1011 was never true

1012 watchdog.cancel() 

1013 if kill_check.is_set(): 

1014 stderr_value = 'Timeout: the command "%s" did not complete in %d ' "secs." % ( 

1015 " ".join(redacted_command), 

1016 kill_after_timeout, 

1017 ) 

1018 if not universal_newlines: 

1019 stderr_value = stderr_value.encode(defenc) 

1020 # strip trailing "\n" 

1021 if stdout_value.endswith(newline) and strip_newline_in_stdout: # type: ignore 1021 ↛ 1023line 1021 didn't jump to line 1023, because the condition on line 1021 was never false

1022 stdout_value = stdout_value[:-1] 

1023 if stderr_value.endswith(newline): # type: ignore 1023 ↛ 1024line 1023 didn't jump to line 1024, because the condition on line 1023 was never true

1024 stderr_value = stderr_value[:-1] 

1025 

1026 status = proc.returncode 

1027 else: 

1028 max_chunk_size = max_chunk_size if max_chunk_size and max_chunk_size > 0 else io.DEFAULT_BUFFER_SIZE 

1029 stream_copy(proc.stdout, output_stream, max_chunk_size) 

1030 stdout_value = proc.stdout.read() 

1031 stderr_value = proc.stderr.read() 

1032 # strip trailing "\n" 

1033 if stderr_value.endswith(newline): # type: ignore 

1034 stderr_value = stderr_value[:-1] 

1035 status = proc.wait() 

1036 # END stdout handling 

1037 finally: 

1038 proc.stdout.close() 

1039 proc.stderr.close() 

1040 

1041 if self.GIT_PYTHON_TRACE == "full": 1041 ↛ 1042line 1041 didn't jump to line 1042, because the condition on line 1041 was never true

1042 cmdstr = " ".join(redacted_command) 

1043 

1044 def as_text(stdout_value: Union[bytes, str]) -> str: 

1045 return not output_stream and safe_decode(stdout_value) or "<OUTPUT_STREAM>" 

1046 

1047 # end 

1048 

1049 if stderr_value: 

1050 log.info( 

1051 "%s -> %d; stdout: '%s'; stderr: '%s'", 

1052 cmdstr, 

1053 status, 

1054 as_text(stdout_value), 

1055 safe_decode(stderr_value), 

1056 ) 

1057 elif stdout_value: 

1058 log.info("%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value)) 

1059 else: 

1060 log.info("%s -> %d", cmdstr, status) 

1061 # END handle debug printing 

1062 

1063 if with_exceptions and status != 0: 1063 ↛ 1064line 1063 didn't jump to line 1064, because the condition on line 1063 was never true

1064 raise GitCommandError(redacted_command, status, stderr_value, stdout_value) 

1065 

1066 if isinstance(stdout_value, bytes) and stdout_as_string: # could also be output_stream 1066 ↛ 1070line 1066 didn't jump to line 1070, because the condition on line 1066 was never false

1067 stdout_value = safe_decode(stdout_value) 

1068 

1069 # Allow access to the command's status code 

1070 if with_extended_output: 1070 ↛ 1071line 1070 didn't jump to line 1071, because the condition on line 1070 was never true

1071 return (status, stdout_value, safe_decode(stderr_value)) 

1072 else: 

1073 return stdout_value 

1074 

1075 def environment(self) -> Dict[str, str]: 

1076 return self._environment 

1077 

1078 def update_environment(self, **kwargs: Any) -> Dict[str, Union[str, None]]: 

1079 """ 

1080 Set environment variables for future git invocations. Return all changed 

1081 values in a format that can be passed back into this function to revert 

1082 the changes: 

1083 

1084 ``Examples``:: 

1085 

1086 old_env = self.update_environment(PWD='/tmp') 

1087 self.update_environment(**old_env) 

1088 

1089 :param kwargs: environment variables to use for git processes 

1090 :return: dict that maps environment variables to their old values 

1091 """ 

1092 old_env = {} 

1093 for key, value in kwargs.items(): 

1094 # set value if it is None 

1095 if value is not None: 

1096 old_env[key] = self._environment.get(key) 

1097 self._environment[key] = value 

1098 # remove key from environment if its value is None 

1099 elif key in self._environment: 

1100 old_env[key] = self._environment[key] 

1101 del self._environment[key] 

1102 return old_env 

1103 

1104 @contextmanager 

1105 def custom_environment(self, **kwargs: Any) -> Iterator[None]: 

1106 """ 

1107 A context manager around the above ``update_environment`` method to restore the 

1108 environment back to its previous state after operation. 

1109 

1110 ``Examples``:: 

1111 

1112 with self.custom_environment(GIT_SSH='/bin/ssh_wrapper'): 

1113 repo.remotes.origin.fetch() 

1114 

1115 :param kwargs: see update_environment 

1116 """ 

1117 old_env = self.update_environment(**kwargs) 

1118 try: 

1119 yield 

1120 finally: 

1121 self.update_environment(**old_env) 

1122 

1123 def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool) -> List[str]: 

1124 if len(name) == 1: 

1125 if value is True: 

1126 return ["-%s" % name] 

1127 elif value not in (False, None): 

1128 if split_single_char_options: 

1129 return ["-%s" % name, "%s" % value] 

1130 else: 

1131 return ["-%s%s" % (name, value)] 

1132 else: 

1133 if value is True: 

1134 return ["--%s" % dashify(name)] 

1135 elif value is not False and value is not None: 

1136 return ["--%s=%s" % (dashify(name), value)] 

1137 return [] 

1138 

1139 def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any) -> List[str]: 

1140 """Transforms Python style kwargs into git command line options.""" 

1141 args = [] 

1142 for k, v in kwargs.items(): 1142 ↛ 1143line 1142 didn't jump to line 1143, because the loop on line 1142 never started

1143 if isinstance(v, (list, tuple)): 

1144 for value in v: 

1145 args += self.transform_kwarg(k, value, split_single_char_options) 

1146 else: 

1147 args += self.transform_kwarg(k, v, split_single_char_options) 

1148 return args 

1149 

1150 @classmethod 

1151 def __unpack_args(cls, arg_list: Sequence[str]) -> List[str]: 

1152 

1153 outlist = [] 

1154 if isinstance(arg_list, (list, tuple)): 1154 ↛ 1158line 1154 didn't jump to line 1158, because the condition on line 1154 was never false

1155 for arg in arg_list: 1155 ↛ 1156line 1155 didn't jump to line 1156, because the loop on line 1155 never started

1156 outlist.extend(cls.__unpack_args(arg)) 

1157 else: 

1158 outlist.append(str(arg_list)) 

1159 

1160 return outlist 

1161 

1162 def __call__(self, **kwargs: Any) -> "Git": 

1163 """Specify command line options to the git executable 

1164 for a subcommand call 

1165 

1166 :param kwargs: 

1167 is a dict of keyword arguments. 

1168 these arguments are passed as in _call_process 

1169 but will be passed to the git command rather than 

1170 the subcommand. 

1171 

1172 ``Examples``:: 

1173 git(work_tree='/tmp').difftool()""" 

1174 self._git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) 

1175 return self 

1176 

1177 @overload 

1178 def _call_process(self, method: str, *args: None, **kwargs: None) -> str: 

1179 ... # if no args given, execute called with all defaults 

1180 

1181 @overload 

1182 def _call_process( 

1183 self, 

1184 method: str, 

1185 istream: int, 

1186 as_process: Literal[True], 

1187 *args: Any, 

1188 **kwargs: Any, 

1189 ) -> "Git.AutoInterrupt": 

1190 ... 

1191 

1192 @overload 

1193 def _call_process( 

1194 self, method: str, *args: Any, **kwargs: Any 

1195 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: 

1196 ... 

1197 

1198 def _call_process( 

1199 self, method: str, *args: Any, **kwargs: Any 

1200 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: 

1201 """Run the given git command with the specified arguments and return 

1202 the result as a String 

1203 

1204 :param method: 

1205 is the command. Contained "_" characters will be converted to dashes, 

1206 such as in 'ls_files' to call 'ls-files'. 

1207 

1208 :param args: 

1209 is the list of arguments. If None is included, it will be pruned. 

1210 This allows your commands to call git more conveniently as None 

1211 is realized as non-existent 

1212 

1213 :param kwargs: 

1214 It contains key-values for the following: 

1215 - the :meth:`execute()` kwds, as listed in :var:`execute_kwargs`; 

1216 - "command options" to be converted by :meth:`transform_kwargs()`; 

1217 - the `'insert_kwargs_after'` key which its value must match one of ``*args`` 

1218 and any cmd-options will be appended after the matched arg. 

1219 

1220 Examples:: 

1221 

1222 git.rev_list('master', max_count=10, header=True) 

1223 

1224 turns into:: 

1225 

1226 git rev-list max-count 10 --header master 

1227 

1228 :return: Same as ``execute`` 

1229 if no args given used execute default (esp. as_process = False, stdout_as_string = True) 

1230 and return str""" 

1231 # Handle optional arguments prior to calling transform_kwargs 

1232 # otherwise these'll end up in args, which is bad. 

1233 exec_kwargs = {k: v for k, v in kwargs.items() if k in execute_kwargs} 

1234 opts_kwargs = {k: v for k, v in kwargs.items() if k not in execute_kwargs} 

1235 

1236 insert_after_this_arg = opts_kwargs.pop("insert_kwargs_after", None) 

1237 

1238 # Prepare the argument list 

1239 

1240 opt_args = self.transform_kwargs(**opts_kwargs) 

1241 ext_args = self.__unpack_args([a for a in args if a is not None]) 

1242 

1243 if insert_after_this_arg is None: 1243 ↛ 1246line 1243 didn't jump to line 1246, because the condition on line 1243 was never false

1244 args_list = opt_args + ext_args 

1245 else: 

1246 try: 

1247 index = ext_args.index(insert_after_this_arg) 

1248 except ValueError as err: 

1249 raise ValueError( 

1250 "Couldn't find argument '%s' in args %s to insert cmd options after" 

1251 % (insert_after_this_arg, str(ext_args)) 

1252 ) from err 

1253 # end handle error 

1254 args_list = ext_args[: index + 1] + opt_args + ext_args[index + 1 :] 

1255 # end handle opts_kwargs 

1256 

1257 call = [self.GIT_PYTHON_GIT_EXECUTABLE] 

1258 

1259 # add persistent git options 

1260 call.extend(self._persistent_git_options) 

1261 

1262 # add the git options, then reset to empty 

1263 # to avoid side_effects 

1264 call.extend(self._git_options) 

1265 self._git_options = () 

1266 

1267 call.append(dashify(method)) 

1268 call.extend(args_list) 

1269 

1270 return self.execute(call, **exec_kwargs) 

1271 

1272 def _parse_object_header(self, header_line: str) -> Tuple[str, str, int]: 

1273 """ 

1274 :param header_line: 

1275 <hex_sha> type_string size_as_int 

1276 

1277 :return: (hex_sha, type_string, size_as_int) 

1278 

1279 :raise ValueError: if the header contains indication for an error due to 

1280 incorrect input sha""" 

1281 tokens = header_line.split() 

1282 if len(tokens) != 3: 

1283 if not tokens: 

1284 raise ValueError("SHA could not be resolved, git returned: %r" % (header_line.strip())) 

1285 else: 

1286 raise ValueError("SHA %s could not be resolved, git returned: %r" % (tokens[0], header_line.strip())) 

1287 # END handle actual return value 

1288 # END error handling 

1289 

1290 if len(tokens[0]) != 40: 

1291 raise ValueError("Failed to parse header: %r" % header_line) 

1292 return (tokens[0], tokens[1], int(tokens[2])) 

1293 

1294 def _prepare_ref(self, ref: AnyStr) -> bytes: 

1295 # required for command to separate refs on stdin, as bytes 

1296 if isinstance(ref, bytes): 

1297 # Assume 40 bytes hexsha - bin-to-ascii for some reason returns bytes, not text 

1298 refstr: str = ref.decode("ascii") 

1299 elif not isinstance(ref, str): 

1300 refstr = str(ref) # could be ref-object 

1301 else: 

1302 refstr = ref 

1303 

1304 if not refstr.endswith("\n"): 

1305 refstr += "\n" 

1306 return refstr.encode(defenc) 

1307 

1308 def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any) -> "Git.AutoInterrupt": 

1309 cur_val = getattr(self, attr_name) 

1310 if cur_val is not None: 

1311 return cur_val 

1312 

1313 options = {"istream": PIPE, "as_process": True} 

1314 options.update(kwargs) 

1315 

1316 cmd = self._call_process(cmd_name, *args, **options) 

1317 setattr(self, attr_name, cmd) 

1318 cmd = cast("Git.AutoInterrupt", cmd) 

1319 return cmd 

1320 

1321 def __get_object_header(self, cmd: "Git.AutoInterrupt", ref: AnyStr) -> Tuple[str, str, int]: 

1322 if cmd.stdin and cmd.stdout: 

1323 cmd.stdin.write(self._prepare_ref(ref)) 

1324 cmd.stdin.flush() 

1325 return self._parse_object_header(cmd.stdout.readline()) 

1326 else: 

1327 raise ValueError("cmd stdin was empty") 

1328 

1329 def get_object_header(self, ref: str) -> Tuple[str, str, int]: 

1330 """Use this method to quickly examine the type and size of the object behind 

1331 the given ref. 

1332 

1333 :note: The method will only suffer from the costs of command invocation 

1334 once and reuses the command in subsequent calls. 

1335 

1336 :return: (hexsha, type_string, size_as_int)""" 

1337 cmd = self._get_persistent_cmd("cat_file_header", "cat_file", batch_check=True) 

1338 return self.__get_object_header(cmd, ref) 

1339 

1340 def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]: 

1341 """As get_object_header, but returns object data as well 

1342 :return: (hexsha, type_string, size_as_int,data_string) 

1343 :note: not threadsafe""" 

1344 hexsha, typename, size, stream = self.stream_object_data(ref) 

1345 data = stream.read(size) 

1346 del stream 

1347 return (hexsha, typename, size, data) 

1348 

1349 def stream_object_data(self, ref: str) -> Tuple[str, str, int, "Git.CatFileContentStream"]: 

1350 """As get_object_header, but returns the data as a stream 

1351 

1352 :return: (hexsha, type_string, size_as_int, stream) 

1353 :note: This method is not threadsafe, you need one independent Command instance per thread to be safe !""" 

1354 cmd = self._get_persistent_cmd("cat_file_all", "cat_file", batch=True) 

1355 hexsha, typename, size = self.__get_object_header(cmd, ref) 

1356 cmd_stdout = cmd.stdout if cmd.stdout is not None else io.BytesIO() 

1357 return (hexsha, typename, size, self.CatFileContentStream(size, cmd_stdout)) 

1358 

1359 def clear_cache(self) -> "Git": 

1360 """Clear all kinds of internal caches to release resources. 

1361 

1362 Currently persistent commands will be interrupted. 

1363 

1364 :return: self""" 

1365 for cmd in (self.cat_file_all, self.cat_file_header): 

1366 if cmd: 

1367 cmd.__del__() 

1368 

1369 self.cat_file_all = None 

1370 self.cat_file_header = None 

1371 return self