Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/cmd.py: 31%
553 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# cmd.py
2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
3#
4# This module is part of GitPython and is released under
5# the BSD License: http://www.opensource.org/licenses/bsd-license.php
6from __future__ import annotations
7from contextlib import contextmanager
8import io
9import logging
10import os
11import signal
12from subprocess import call, Popen, PIPE, DEVNULL
13import subprocess
14import threading
15from textwrap import dedent
17from git.compat import (
18 defenc,
19 force_bytes,
20 safe_decode,
21 is_posix,
22 is_win,
23)
24from git.exc import CommandError
25from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present
27from .exc import GitCommandError, GitCommandNotFound
28from .util import (
29 LazyMixin,
30 stream_copy,
31)
33# typing ---------------------------------------------------------------------------
35from typing import (
36 Any,
37 AnyStr,
38 BinaryIO,
39 Callable,
40 Dict,
41 IO,
42 Iterator,
43 List,
44 Mapping,
45 Sequence,
46 TYPE_CHECKING,
47 TextIO,
48 Tuple,
49 Union,
50 cast,
51 overload,
52)
54from git.types import PathLike, Literal, TBD
56if TYPE_CHECKING: 56 ↛ 57line 56 didn't jump to line 57, because the condition on line 56 was never true
57 from git.repo.base import Repo
58 from git.diff import DiffIndex
61# ---------------------------------------------------------------------------------
63execute_kwargs = {
64 "istream",
65 "with_extended_output",
66 "with_exceptions",
67 "as_process",
68 "stdout_as_string",
69 "output_stream",
70 "with_stdout",
71 "kill_after_timeout",
72 "universal_newlines",
73 "shell",
74 "env",
75 "max_chunk_size",
76 "strip_newline_in_stdout",
77}
79log = logging.getLogger(__name__)
80log.addHandler(logging.NullHandler())
82__all__ = ("Git",)
85# ==============================================================================
86## @name Utilities
87# ------------------------------------------------------------------------------
88# Documentation
89## @{
92def handle_process_output(
93 process: "Git.AutoInterrupt" | Popen,
94 stdout_handler: Union[
95 None,
96 Callable[[AnyStr], None],
97 Callable[[List[AnyStr]], None],
98 Callable[[bytes, "Repo", "DiffIndex"], None],
99 ],
100 stderr_handler: Union[None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]],
101 finalizer: Union[None, Callable[[Union[subprocess.Popen, "Git.AutoInterrupt"]], None]] = None,
102 decode_streams: bool = True,
103 kill_after_timeout: Union[None, float] = None,
104) -> None:
105 """Registers for notifications to learn that process output is ready to read, and dispatches lines to
106 the respective line handlers.
107 This function returns once the finalizer returns
109 :return: result of finalizer
110 :param process: subprocess.Popen instance
111 :param stdout_handler: f(stdout_line_string), or None
112 :param stderr_handler: f(stderr_line_string), or None
113 :param finalizer: f(proc) - wait for proc to finish
114 :param decode_streams:
115 Assume stdout/stderr streams are binary and decode them before pushing \
116 their contents to handlers.
117 Set it to False if `universal_newline == True` (then streams are in text-mode)
118 or if decoding must happen later (i.e. for Diffs).
119 :param kill_after_timeout:
120 float or None, Default = None
121 To specify a timeout in seconds for the git command, after which the process
122 should be killed.
123 """
124 # Use 2 "pump" threads and wait for both to finish.
125 def pump_stream(
126 cmdline: List[str],
127 name: str,
128 stream: Union[BinaryIO, TextIO],
129 is_decode: bool,
130 handler: Union[None, Callable[[Union[bytes, str]], None]],
131 ) -> None:
132 try:
133 for line in stream:
134 if handler:
135 if is_decode:
136 assert isinstance(line, bytes)
137 line_str = line.decode(defenc)
138 handler(line_str)
139 else:
140 handler(line)
142 except Exception as ex:
143 log.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}")
144 if "I/O operation on closed file" not in str(ex):
145 # Only reraise if the error was not due to the stream closing
146 raise CommandError([f"<{name}-pump>"] + remove_password_if_present(cmdline), ex) from ex
147 finally:
148 stream.close()
150 if hasattr(process, "proc"):
151 process = cast("Git.AutoInterrupt", process)
152 cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, "args", "")
153 p_stdout = process.proc.stdout if process.proc else None
154 p_stderr = process.proc.stderr if process.proc else None
155 else:
156 process = cast(Popen, process)
157 cmdline = getattr(process, "args", "")
158 p_stdout = process.stdout
159 p_stderr = process.stderr
161 if not isinstance(cmdline, (tuple, list)):
162 cmdline = cmdline.split()
164 pumps: List[Tuple[str, IO, Callable[..., None] | None]] = []
165 if p_stdout:
166 pumps.append(("stdout", p_stdout, stdout_handler))
167 if p_stderr:
168 pumps.append(("stderr", p_stderr, stderr_handler))
170 threads: List[threading.Thread] = []
172 for name, stream, handler in pumps:
173 t = threading.Thread(target=pump_stream, args=(cmdline, name, stream, decode_streams, handler))
174 t.daemon = True
175 t.start()
176 threads.append(t)
178 ## FIXME: Why Join?? Will block if `stdin` needs feeding...
179 #
180 for t in threads:
181 t.join(timeout=kill_after_timeout)
182 if t.is_alive():
183 if isinstance(process, Git.AutoInterrupt):
184 process._terminate()
185 else: # Don't want to deal with the other case
186 raise RuntimeError(
187 "Thread join() timed out in cmd.handle_process_output()."
188 f" kill_after_timeout={kill_after_timeout} seconds"
189 )
190 if stderr_handler:
191 error_str: Union[str, bytes] = (
192 "error: process killed because it timed out." f" kill_after_timeout={kill_after_timeout} seconds"
193 )
194 if not decode_streams and isinstance(p_stderr, BinaryIO):
195 # Assume stderr_handler needs binary input
196 error_str = cast(str, error_str)
197 error_str = error_str.encode()
198 # We ignore typing on the next line because mypy does not like
199 # the way we inferred that stderr takes str or bytes
200 stderr_handler(error_str) # type: ignore
202 if finalizer:
203 return finalizer(process)
204 else:
205 return None
208def dashify(string: str) -> str:
209 return string.replace("_", "-")
212def slots_to_dict(self: object, exclude: Sequence[str] = ()) -> Dict[str, Any]:
213 return {s: getattr(self, s) for s in self.__slots__ if s not in exclude}
216def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None:
217 for k, v in d.items():
218 setattr(self, k, v)
219 for k in excluded:
220 setattr(self, k, None)
223## -- End Utilities -- @}
226# value of Windows process creation flag taken from MSDN
227CREATE_NO_WINDOW = 0x08000000
229## CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards,
230# see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal
231PROC_CREATIONFLAGS = (
232 CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP if is_win else 0 # type: ignore[attr-defined]
233) # mypy error if not windows
236class Git(LazyMixin):
238 """
239 The Git class manages communication with the Git binary.
241 It provides a convenient interface to calling the Git binary, such as in::
243 g = Git( git_dir )
244 g.init() # calls 'git init' program
245 rval = g.ls_files() # calls 'git ls-files' program
247 ``Debugging``
248 Set the GIT_PYTHON_TRACE environment variable print each invocation
249 of the command to stdout.
250 Set its value to 'full' to see details about the returned values.
251 """
253 __slots__ = (
254 "_working_dir",
255 "cat_file_all",
256 "cat_file_header",
257 "_version_info",
258 "_git_options",
259 "_persistent_git_options",
260 "_environment",
261 )
263 _excluded_ = ("cat_file_all", "cat_file_header", "_version_info")
265 def __getstate__(self) -> Dict[str, Any]:
266 return slots_to_dict(self, exclude=self._excluded_)
268 def __setstate__(self, d: Dict[str, Any]) -> None:
269 dict_to_slots_and__excluded_are_none(self, d, excluded=self._excluded_)
271 # CONFIGURATION
273 git_exec_name = "git" # default that should work on linux and windows
275 # Enables debugging of GitPython's git commands
276 GIT_PYTHON_TRACE = os.environ.get("GIT_PYTHON_TRACE", False)
278 # If True, a shell will be used when executing git commands.
279 # This should only be desirable on Windows, see https://github.com/gitpython-developers/GitPython/pull/126
280 # and check `git/test_repo.py:TestRepo.test_untracked_files()` TC for an example where it is required.
281 # Override this value using `Git.USE_SHELL = True`
282 USE_SHELL = False
284 # Provide the full path to the git executable. Otherwise it assumes git is in the path
285 _git_exec_env_var = "GIT_PYTHON_GIT_EXECUTABLE"
286 _refresh_env_var = "GIT_PYTHON_REFRESH"
287 GIT_PYTHON_GIT_EXECUTABLE = None
288 # note that the git executable is actually found during the refresh step in
289 # the top level __init__
291 @classmethod
292 def refresh(cls, path: Union[None, PathLike] = None) -> bool:
293 """This gets called by the refresh function (see the top level
294 __init__).
295 """
296 # discern which path to refresh with
297 if path is not None: 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true
298 new_git = os.path.expanduser(path)
299 new_git = os.path.abspath(new_git)
300 else:
301 new_git = os.environ.get(cls._git_exec_env_var, cls.git_exec_name)
303 # keep track of the old and new git executable path
304 old_git = cls.GIT_PYTHON_GIT_EXECUTABLE
305 cls.GIT_PYTHON_GIT_EXECUTABLE = new_git
307 # test if the new git executable path is valid
309 # - a GitCommandNotFound error is spawned by ourselves
310 # - a PermissionError is spawned if the git executable provided
311 # cannot be executed for whatever reason
313 has_git = False
314 try:
315 cls().version()
316 has_git = True
317 except (GitCommandNotFound, PermissionError):
318 pass
320 # warn or raise exception if test failed
321 if not has_git: 321 ↛ 322line 321 didn't jump to line 322
322 err = (
323 dedent(
324 """\
325 Bad git executable.
326 The git executable must be specified in one of the following ways:
327 - be included in your $PATH
328 - be set via $%s
329 - explicitly set via git.refresh()
330 """
331 )
332 % cls._git_exec_env_var
333 )
335 # revert to whatever the old_git was
336 cls.GIT_PYTHON_GIT_EXECUTABLE = old_git
338 if old_git is None:
339 # on the first refresh (when GIT_PYTHON_GIT_EXECUTABLE is
340 # None) we only are quiet, warn, or error depending on the
341 # GIT_PYTHON_REFRESH value
343 # determine what the user wants to happen during the initial
344 # refresh we expect GIT_PYTHON_REFRESH to either be unset or
345 # be one of the following values:
346 # 0|q|quiet|s|silence
347 # 1|w|warn|warning
348 # 2|r|raise|e|error
350 mode = os.environ.get(cls._refresh_env_var, "raise").lower()
352 quiet = ["quiet", "q", "silence", "s", "none", "n", "0"]
353 warn = ["warn", "w", "warning", "1"]
354 error = ["error", "e", "raise", "r", "2"]
356 if mode in quiet:
357 pass
358 elif mode in warn or mode in error:
359 err = (
360 dedent(
361 """\
362 %s
363 All git commands will error until this is rectified.
365 This initial warning can be silenced or aggravated in the future by setting the
366 $%s environment variable. Use one of the following values:
367 - %s: for no warning or exception
368 - %s: for a printed warning
369 - %s: for a raised exception
371 Example:
372 export %s=%s
373 """
374 )
375 % (
376 err,
377 cls._refresh_env_var,
378 "|".join(quiet),
379 "|".join(warn),
380 "|".join(error),
381 cls._refresh_env_var,
382 quiet[0],
383 )
384 )
386 if mode in warn:
387 print("WARNING: %s" % err)
388 else:
389 raise ImportError(err)
390 else:
391 err = (
392 dedent(
393 """\
394 %s environment variable has been set but it has been set with an invalid value.
396 Use only the following values:
397 - %s: for no warning or exception
398 - %s: for a printed warning
399 - %s: for a raised exception
400 """
401 )
402 % (
403 cls._refresh_env_var,
404 "|".join(quiet),
405 "|".join(warn),
406 "|".join(error),
407 )
408 )
409 raise ImportError(err)
411 # we get here if this was the init refresh and the refresh mode
412 # was not error, go ahead and set the GIT_PYTHON_GIT_EXECUTABLE
413 # such that we discern the difference between a first import
414 # and a second import
415 cls.GIT_PYTHON_GIT_EXECUTABLE = cls.git_exec_name
416 else:
417 # after the first refresh (when GIT_PYTHON_GIT_EXECUTABLE
418 # is no longer None) we raise an exception
419 raise GitCommandNotFound("git", err)
421 return has_git
423 @classmethod
424 def is_cygwin(cls) -> bool:
425 return is_cygwin_git(cls.GIT_PYTHON_GIT_EXECUTABLE)
427 @overload
428 @classmethod
429 def polish_url(cls, url: str, is_cygwin: Literal[False] = ...) -> str:
430 ...
432 @overload
433 @classmethod
434 def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> str:
435 ...
437 @classmethod
438 def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> PathLike:
439 if is_cygwin is None:
440 is_cygwin = cls.is_cygwin()
442 if is_cygwin:
443 url = cygpath(url)
444 else:
445 """Remove any backslahes from urls to be written in config files.
447 Windows might create config-files containing paths with backslashed,
448 but git stops liking them as it will escape the backslashes.
449 Hence we undo the escaping just to be sure.
450 """
451 url = os.path.expandvars(url)
452 if url.startswith("~"):
453 url = os.path.expanduser(url)
454 url = url.replace("\\\\", "\\").replace("\\", "/")
455 return url
457 class AutoInterrupt(object):
458 """Kill/Interrupt the stored process instance once this instance goes out of scope. It is
459 used to prevent processes piling up in case iterators stop reading.
460 Besides all attributes are wired through to the contained process object.
462 The wait method was overridden to perform automatic status code checking
463 and possibly raise."""
465 __slots__ = ("proc", "args", "status")
467 # If this is non-zero it will override any status code during
468 # _terminate, used to prevent race conditions in testing
469 _status_code_if_terminate: int = 0
471 def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None:
472 self.proc = proc
473 self.args = args
474 self.status: Union[int, None] = None
476 def _terminate(self) -> None:
477 """Terminate the underlying process"""
478 if self.proc is None:
479 return
481 proc = self.proc
482 self.proc = None
483 if proc.stdin:
484 proc.stdin.close()
485 if proc.stdout:
486 proc.stdout.close()
487 if proc.stderr:
488 proc.stderr.close()
489 # did the process finish already so we have a return code ?
490 try:
491 if proc.poll() is not None:
492 self.status = self._status_code_if_terminate or proc.poll()
493 return None
494 except OSError as ex:
495 log.info("Ignored error after process had died: %r", ex)
497 # can be that nothing really exists anymore ...
498 if os is None or getattr(os, "kill", None) is None:
499 return None
501 # try to kill it
502 try:
503 proc.terminate()
504 status = proc.wait() # ensure process goes away
506 self.status = self._status_code_if_terminate or status
507 except OSError as ex:
508 log.info("Ignored error after process had died: %r", ex)
509 except AttributeError:
510 # try windows
511 # for some reason, providing None for stdout/stderr still prints something. This is why
512 # we simply use the shell and redirect to nul. Its slower than CreateProcess, question
513 # is whether we really want to see all these messages. Its annoying no matter what.
514 if is_win:
515 call(
516 ("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)),
517 shell=True,
518 )
519 # END exception handling
521 def __del__(self) -> None:
522 self._terminate()
524 def __getattr__(self, attr: str) -> Any:
525 return getattr(self.proc, attr)
527 # TODO: Bad choice to mimic `proc.wait()` but with different args.
528 def wait(self, stderr: Union[None, str, bytes] = b"") -> int:
529 """Wait for the process and return its status code.
531 :param stderr: Previously read value of stderr, in case stderr is already closed.
532 :warn: may deadlock if output or error pipes are used and not handled separately.
533 :raise GitCommandError: if the return status is not 0"""
534 if stderr is None:
535 stderr_b = b""
536 stderr_b = force_bytes(data=stderr, encoding="utf-8")
537 status: Union[int, None]
538 if self.proc is not None:
539 status = self.proc.wait()
540 p_stderr = self.proc.stderr
541 else: # Assume the underlying proc was killed earlier or never existed
542 status = self.status
543 p_stderr = None
545 def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes:
546 if stream:
547 try:
548 return stderr_b + force_bytes(stream.read())
549 except (OSError, ValueError):
550 return stderr_b or b""
551 else:
552 return stderr_b or b""
554 # END status handling
556 if status != 0:
557 errstr = read_all_from_possibly_closed_stream(p_stderr)
558 log.debug("AutoInterrupt wait stderr: %r" % (errstr,))
559 raise GitCommandError(remove_password_if_present(self.args), status, errstr)
560 return status
562 # END auto interrupt
564 class CatFileContentStream(object):
566 """Object representing a sized read-only stream returning the contents of
567 an object.
568 It behaves like a stream, but counts the data read and simulates an empty
569 stream once our sized content region is empty.
570 If not all data is read to the end of the objects's lifetime, we read the
571 rest to assure the underlying stream continues to work"""
573 __slots__: Tuple[str, ...] = ("_stream", "_nbr", "_size")
575 def __init__(self, size: int, stream: IO[bytes]) -> None:
576 self._stream = stream
577 self._size = size
578 self._nbr = 0 # num bytes read
580 # special case: if the object is empty, has null bytes, get the
581 # final newline right away.
582 if size == 0:
583 stream.read(1)
584 # END handle empty streams
586 def read(self, size: int = -1) -> bytes:
587 bytes_left = self._size - self._nbr
588 if bytes_left == 0:
589 return b""
590 if size > -1:
591 # assure we don't try to read past our limit
592 size = min(bytes_left, size)
593 else:
594 # they try to read all, make sure its not more than what remains
595 size = bytes_left
596 # END check early depletion
597 data = self._stream.read(size)
598 self._nbr += len(data)
600 # check for depletion, read our final byte to make the stream usable by others
601 if self._size - self._nbr == 0:
602 self._stream.read(1) # final newline
603 # END finish reading
604 return data
606 def readline(self, size: int = -1) -> bytes:
607 if self._nbr == self._size:
608 return b""
610 # clamp size to lowest allowed value
611 bytes_left = self._size - self._nbr
612 if size > -1:
613 size = min(bytes_left, size)
614 else:
615 size = bytes_left
616 # END handle size
618 data = self._stream.readline(size)
619 self._nbr += len(data)
621 # handle final byte
622 if self._size - self._nbr == 0:
623 self._stream.read(1)
624 # END finish reading
626 return data
628 def readlines(self, size: int = -1) -> List[bytes]:
629 if self._nbr == self._size:
630 return []
632 # leave all additional logic to our readline method, we just check the size
633 out = []
634 nbr = 0
635 while True:
636 line = self.readline()
637 if not line:
638 break
639 out.append(line)
640 if size > -1:
641 nbr += len(line)
642 if nbr > size:
643 break
644 # END handle size constraint
645 # END readline loop
646 return out
648 # skipcq: PYL-E0301
649 def __iter__(self) -> "Git.CatFileContentStream":
650 return self
652 def __next__(self) -> bytes:
653 return next(self)
655 def next(self) -> bytes:
656 line = self.readline()
657 if not line:
658 raise StopIteration
660 return line
662 def __del__(self) -> None:
663 bytes_left = self._size - self._nbr
664 if bytes_left:
665 # read and discard - seeking is impossible within a stream
666 # includes terminating newline
667 self._stream.read(bytes_left + 1)
668 # END handle incomplete read
670 def __init__(self, working_dir: Union[None, PathLike] = None):
671 """Initialize this instance with:
673 :param working_dir:
674 Git directory we should work in. If None, we always work in the current
675 directory as returned by os.getcwd().
676 It is meant to be the working tree directory if available, or the
677 .git directory in case of bare repositories."""
678 super(Git, self).__init__()
679 self._working_dir = expand_path(working_dir)
680 self._git_options: Union[List[str], Tuple[str, ...]] = ()
681 self._persistent_git_options: List[str] = []
683 # Extra environment variables to pass to git commands
684 self._environment: Dict[str, str] = {}
686 # cached command slots
687 self.cat_file_header: Union[None, TBD] = None
688 self.cat_file_all: Union[None, TBD] = None
690 def __getattr__(self, name: str) -> Any:
691 """A convenience method as it allows to call the command as if it was
692 an object.
693 :return: Callable object that will execute call _call_process with your arguments."""
694 if name[0] == "_":
695 return LazyMixin.__getattr__(self, name)
696 return lambda *args, **kwargs: self._call_process(name, *args, **kwargs)
698 def set_persistent_git_options(self, **kwargs: Any) -> None:
699 """Specify command line options to the git executable
700 for subsequent subcommand calls
702 :param kwargs:
703 is a dict of keyword arguments.
704 these arguments are passed as in _call_process
705 but will be passed to the git command rather than
706 the subcommand.
707 """
709 self._persistent_git_options = self.transform_kwargs(split_single_char_options=True, **kwargs)
711 def _set_cache_(self, attr: str) -> None:
712 if attr == "_version_info": 712 ↛ 722line 712 didn't jump to line 722, because the condition on line 712 was never false
713 # We only use the first 4 numbers, as everything else could be strings in fact (on windows)
714 process_version = self._call_process("version") # should be as default *args and **kwargs used
715 version_numbers = process_version.split(" ")[2]
717 self._version_info = cast(
718 Tuple[int, int, int, int],
719 tuple(int(n) for n in version_numbers.split(".")[:4] if n.isdigit()),
720 )
721 else:
722 super(Git, self)._set_cache_(attr)
723 # END handle version info
725 @property
726 def working_dir(self) -> Union[None, PathLike]:
727 """:return: Git directory we are working on"""
728 return self._working_dir
730 @property
731 def version_info(self) -> Tuple[int, int, int, int]:
732 """
733 :return: tuple(int, int, int, int) tuple with integers representing the major, minor
734 and additional version numbers as parsed from git version.
735 This value is generated on demand and is cached"""
736 return self._version_info
738 @overload
739 def execute(self, command: Union[str, Sequence[Any]], *, as_process: Literal[True]) -> "AutoInterrupt":
740 ...
742 @overload
743 def execute(
744 self,
745 command: Union[str, Sequence[Any]],
746 *,
747 as_process: Literal[False] = False,
748 stdout_as_string: Literal[True],
749 ) -> Union[str, Tuple[int, str, str]]:
750 ...
752 @overload
753 def execute(
754 self,
755 command: Union[str, Sequence[Any]],
756 *,
757 as_process: Literal[False] = False,
758 stdout_as_string: Literal[False] = False,
759 ) -> Union[bytes, Tuple[int, bytes, str]]:
760 ...
762 @overload
763 def execute(
764 self,
765 command: Union[str, Sequence[Any]],
766 *,
767 with_extended_output: Literal[False],
768 as_process: Literal[False],
769 stdout_as_string: Literal[True],
770 ) -> str:
771 ...
773 @overload
774 def execute(
775 self,
776 command: Union[str, Sequence[Any]],
777 *,
778 with_extended_output: Literal[False],
779 as_process: Literal[False],
780 stdout_as_string: Literal[False],
781 ) -> bytes:
782 ...
784 def execute(
785 self,
786 command: Union[str, Sequence[Any]],
787 istream: Union[None, BinaryIO] = None,
788 with_extended_output: bool = False,
789 with_exceptions: bool = True,
790 as_process: bool = False,
791 output_stream: Union[None, BinaryIO] = None,
792 stdout_as_string: bool = True,
793 kill_after_timeout: Union[None, float] = None,
794 with_stdout: bool = True,
795 universal_newlines: bool = False,
796 shell: Union[None, bool] = None,
797 env: Union[None, Mapping[str, str]] = None,
798 max_chunk_size: int = io.DEFAULT_BUFFER_SIZE,
799 strip_newline_in_stdout: bool = True,
800 **subprocess_kwargs: Any,
801 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]:
802 """Handles executing the command on the shell and consumes and returns
803 the returned information (stdout)
805 :param command:
806 The command argument list to execute.
807 It should be a string, or a sequence of program arguments. The
808 program to execute is the first item in the args sequence or string.
810 :param istream:
811 Standard input filehandle passed to subprocess.Popen.
813 :param with_extended_output:
814 Whether to return a (status, stdout, stderr) tuple.
816 :param with_exceptions:
817 Whether to raise an exception when git returns a non-zero status.
819 :param as_process:
820 Whether to return the created process instance directly from which
821 streams can be read on demand. This will render with_extended_output and
822 with_exceptions ineffective - the caller will have
823 to deal with the details himself.
824 It is important to note that the process will be placed into an AutoInterrupt
825 wrapper that will interrupt the process once it goes out of scope. If you
826 use the command in iterators, you should pass the whole process instance
827 instead of a single stream.
829 :param output_stream:
830 If set to a file-like object, data produced by the git command will be
831 output to the given stream directly.
832 This feature only has any effect if as_process is False. Processes will
833 always be created with a pipe due to issues with subprocess.
834 This merely is a workaround as data will be copied from the
835 output pipe to the given output stream directly.
836 Judging from the implementation, you shouldn't use this flag !
838 :param stdout_as_string:
839 if False, the commands standard output will be bytes. Otherwise, it will be
840 decoded into a string using the default encoding (usually utf-8).
841 The latter can fail, if the output contains binary data.
843 :param env:
844 A dictionary of environment variables to be passed to `subprocess.Popen`.
846 :param max_chunk_size:
847 Maximum number of bytes in one chunk of data passed to the output_stream in
848 one invocation of write() method. If the given number is not positive then
849 the default value is used.
851 :param subprocess_kwargs:
852 Keyword arguments to be passed to subprocess.Popen. Please note that
853 some of the valid kwargs are already set by this method, the ones you
854 specify may not be the same ones.
856 :param with_stdout: If True, default True, we open stdout on the created process
857 :param universal_newlines:
858 if True, pipes will be opened as text, and lines are split at
859 all known line endings.
860 :param shell:
861 Whether to invoke commands through a shell (see `Popen(..., shell=True)`).
862 It overrides :attr:`USE_SHELL` if it is not `None`.
863 :param kill_after_timeout:
864 To specify a timeout in seconds for the git command, after which the process
865 should be killed. This will have no effect if as_process is set to True. It is
866 set to None by default and will let the process run until the timeout is
867 explicitly specified. This feature is not supported on Windows. It's also worth
868 noting that kill_after_timeout uses SIGKILL, which can have negative side
869 effects on a repository. For example, stale locks in case of git gc could
870 render the repository incapable of accepting changes until the lock is manually
871 removed.
872 :param strip_newline_in_stdout:
873 Whether to strip the trailing `\n` of the command stdout.
874 :return:
875 * str(output) if extended_output = False (Default)
876 * tuple(int(status), str(stdout), str(stderr)) if extended_output = True
878 if output_stream is True, the stdout value will be your output stream:
879 * output_stream if extended_output = False
880 * tuple(int(status), output_stream, str(stderr)) if extended_output = True
882 Note git is executed with LC_MESSAGES="C" to ensure consistent
883 output regardless of system language.
885 :raise GitCommandError:
887 :note:
888 If you add additional keyword arguments to the signature of this method,
889 you must update the execute_kwargs tuple housed in this module."""
890 # Remove password for the command if present
891 redacted_command = remove_password_if_present(command)
892 if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != "full" or as_process): 892 ↛ 893line 892 didn't jump to line 893, because the condition on line 892 was never true
893 log.info(" ".join(redacted_command))
895 # Allow the user to have the command executed in their working dir.
896 try:
897 cwd = self._working_dir or os.getcwd() # type: Union[None, str]
898 if not os.access(str(cwd), os.X_OK): 898 ↛ 899line 898 didn't jump to line 899, because the condition on line 898 was never true
899 cwd = None
900 except FileNotFoundError:
901 cwd = None
903 # Start the process
904 inline_env = env
905 env = os.environ.copy()
906 # Attempt to force all output to plain ascii english, which is what some parsing code
907 # may expect.
908 # According to stackoverflow (http://goo.gl/l74GC8), we are setting LANGUAGE as well
909 # just to be sure.
910 env["LANGUAGE"] = "C"
911 env["LC_ALL"] = "C"
912 env.update(self._environment)
913 if inline_env is not None: 913 ↛ 914line 913 didn't jump to line 914, because the condition on line 913 was never true
914 env.update(inline_env)
916 if is_win: 916 ↛ 917line 916 didn't jump to line 917, because the condition on line 916 was never true
917 cmd_not_found_exception = OSError
918 if kill_after_timeout is not None:
919 raise GitCommandError(
920 redacted_command,
921 '"kill_after_timeout" feature is not supported on Windows.',
922 )
923 else:
924 cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable
925 # end handle
927 stdout_sink = PIPE if with_stdout else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb")
928 istream_ok = "None"
929 if istream: 929 ↛ 930line 929 didn't jump to line 930, because the condition on line 929 was never true
930 istream_ok = "<valid stream>"
931 log.debug(
932 "Popen(%s, cwd=%s, universal_newlines=%s, shell=%s, istream=%s)",
933 redacted_command,
934 cwd,
935 universal_newlines,
936 shell,
937 istream_ok,
938 )
939 try:
940 proc = Popen(
941 command,
942 env=env,
943 cwd=cwd,
944 bufsize=-1,
945 stdin=istream or DEVNULL,
946 stderr=PIPE,
947 stdout=stdout_sink,
948 shell=shell is not None and shell or self.USE_SHELL,
949 close_fds=is_posix, # unsupported on windows
950 universal_newlines=universal_newlines,
951 creationflags=PROC_CREATIONFLAGS,
952 **subprocess_kwargs,
953 )
955 except cmd_not_found_exception as err:
956 raise GitCommandNotFound(redacted_command, err) from err
957 else:
958 # replace with a typeguard for Popen[bytes]?
959 proc.stdout = cast(BinaryIO, proc.stdout)
960 proc.stderr = cast(BinaryIO, proc.stderr)
962 if as_process: 962 ↛ 963line 962 didn't jump to line 963, because the condition on line 962 was never true
963 return self.AutoInterrupt(proc, command)
965 def _kill_process(pid: int) -> None:
966 """Callback method to kill a process."""
967 p = Popen(
968 ["ps", "--ppid", str(pid)],
969 stdout=PIPE,
970 creationflags=PROC_CREATIONFLAGS,
971 )
972 child_pids = []
973 if p.stdout is not None:
974 for line in p.stdout:
975 if len(line.split()) > 0:
976 local_pid = (line.split())[0]
977 if local_pid.isdigit():
978 child_pids.append(int(local_pid))
979 try:
980 # Windows does not have SIGKILL, so use SIGTERM instead
981 sig = getattr(signal, "SIGKILL", signal.SIGTERM)
982 os.kill(pid, sig)
983 for child_pid in child_pids:
984 try:
985 os.kill(child_pid, sig)
986 except OSError:
987 pass
988 kill_check.set() # tell the main routine that the process was killed
989 except OSError:
990 # It is possible that the process gets completed in the duration after timeout
991 # happens and before we try to kill the process.
992 pass
993 return
995 # end
997 if kill_after_timeout is not None: 997 ↛ 998line 997 didn't jump to line 998, because the condition on line 997 was never true
998 kill_check = threading.Event()
999 watchdog = threading.Timer(kill_after_timeout, _kill_process, args=(proc.pid,))
1001 # Wait for the process to return
1002 status = 0
1003 stdout_value: Union[str, bytes] = b""
1004 stderr_value: Union[str, bytes] = b""
1005 newline = "\n" if universal_newlines else b"\n"
1006 try:
1007 if output_stream is None: 1007 ↛ 1028line 1007 didn't jump to line 1028, because the condition on line 1007 was never false
1008 if kill_after_timeout is not None: 1008 ↛ 1009line 1008 didn't jump to line 1009, because the condition on line 1008 was never true
1009 watchdog.start()
1010 stdout_value, stderr_value = proc.communicate()
1011 if kill_after_timeout is not None: 1011 ↛ 1012line 1011 didn't jump to line 1012, because the condition on line 1011 was never true
1012 watchdog.cancel()
1013 if kill_check.is_set():
1014 stderr_value = 'Timeout: the command "%s" did not complete in %d ' "secs." % (
1015 " ".join(redacted_command),
1016 kill_after_timeout,
1017 )
1018 if not universal_newlines:
1019 stderr_value = stderr_value.encode(defenc)
1020 # strip trailing "\n"
1021 if stdout_value.endswith(newline) and strip_newline_in_stdout: # type: ignore 1021 ↛ 1023line 1021 didn't jump to line 1023, because the condition on line 1021 was never false
1022 stdout_value = stdout_value[:-1]
1023 if stderr_value.endswith(newline): # type: ignore 1023 ↛ 1024line 1023 didn't jump to line 1024, because the condition on line 1023 was never true
1024 stderr_value = stderr_value[:-1]
1026 status = proc.returncode
1027 else:
1028 max_chunk_size = max_chunk_size if max_chunk_size and max_chunk_size > 0 else io.DEFAULT_BUFFER_SIZE
1029 stream_copy(proc.stdout, output_stream, max_chunk_size)
1030 stdout_value = proc.stdout.read()
1031 stderr_value = proc.stderr.read()
1032 # strip trailing "\n"
1033 if stderr_value.endswith(newline): # type: ignore
1034 stderr_value = stderr_value[:-1]
1035 status = proc.wait()
1036 # END stdout handling
1037 finally:
1038 proc.stdout.close()
1039 proc.stderr.close()
1041 if self.GIT_PYTHON_TRACE == "full": 1041 ↛ 1042line 1041 didn't jump to line 1042, because the condition on line 1041 was never true
1042 cmdstr = " ".join(redacted_command)
1044 def as_text(stdout_value: Union[bytes, str]) -> str:
1045 return not output_stream and safe_decode(stdout_value) or "<OUTPUT_STREAM>"
1047 # end
1049 if stderr_value:
1050 log.info(
1051 "%s -> %d; stdout: '%s'; stderr: '%s'",
1052 cmdstr,
1053 status,
1054 as_text(stdout_value),
1055 safe_decode(stderr_value),
1056 )
1057 elif stdout_value:
1058 log.info("%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value))
1059 else:
1060 log.info("%s -> %d", cmdstr, status)
1061 # END handle debug printing
1063 if with_exceptions and status != 0: 1063 ↛ 1064line 1063 didn't jump to line 1064, because the condition on line 1063 was never true
1064 raise GitCommandError(redacted_command, status, stderr_value, stdout_value)
1066 if isinstance(stdout_value, bytes) and stdout_as_string: # could also be output_stream 1066 ↛ 1070line 1066 didn't jump to line 1070, because the condition on line 1066 was never false
1067 stdout_value = safe_decode(stdout_value)
1069 # Allow access to the command's status code
1070 if with_extended_output: 1070 ↛ 1071line 1070 didn't jump to line 1071, because the condition on line 1070 was never true
1071 return (status, stdout_value, safe_decode(stderr_value))
1072 else:
1073 return stdout_value
1075 def environment(self) -> Dict[str, str]:
1076 return self._environment
1078 def update_environment(self, **kwargs: Any) -> Dict[str, Union[str, None]]:
1079 """
1080 Set environment variables for future git invocations. Return all changed
1081 values in a format that can be passed back into this function to revert
1082 the changes:
1084 ``Examples``::
1086 old_env = self.update_environment(PWD='/tmp')
1087 self.update_environment(**old_env)
1089 :param kwargs: environment variables to use for git processes
1090 :return: dict that maps environment variables to their old values
1091 """
1092 old_env = {}
1093 for key, value in kwargs.items():
1094 # set value if it is None
1095 if value is not None:
1096 old_env[key] = self._environment.get(key)
1097 self._environment[key] = value
1098 # remove key from environment if its value is None
1099 elif key in self._environment:
1100 old_env[key] = self._environment[key]
1101 del self._environment[key]
1102 return old_env
1104 @contextmanager
1105 def custom_environment(self, **kwargs: Any) -> Iterator[None]:
1106 """
1107 A context manager around the above ``update_environment`` method to restore the
1108 environment back to its previous state after operation.
1110 ``Examples``::
1112 with self.custom_environment(GIT_SSH='/bin/ssh_wrapper'):
1113 repo.remotes.origin.fetch()
1115 :param kwargs: see update_environment
1116 """
1117 old_env = self.update_environment(**kwargs)
1118 try:
1119 yield
1120 finally:
1121 self.update_environment(**old_env)
1123 def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool) -> List[str]:
1124 if len(name) == 1:
1125 if value is True:
1126 return ["-%s" % name]
1127 elif value not in (False, None):
1128 if split_single_char_options:
1129 return ["-%s" % name, "%s" % value]
1130 else:
1131 return ["-%s%s" % (name, value)]
1132 else:
1133 if value is True:
1134 return ["--%s" % dashify(name)]
1135 elif value is not False and value is not None:
1136 return ["--%s=%s" % (dashify(name), value)]
1137 return []
1139 def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any) -> List[str]:
1140 """Transforms Python style kwargs into git command line options."""
1141 args = []
1142 for k, v in kwargs.items(): 1142 ↛ 1143line 1142 didn't jump to line 1143, because the loop on line 1142 never started
1143 if isinstance(v, (list, tuple)):
1144 for value in v:
1145 args += self.transform_kwarg(k, value, split_single_char_options)
1146 else:
1147 args += self.transform_kwarg(k, v, split_single_char_options)
1148 return args
1150 @classmethod
1151 def __unpack_args(cls, arg_list: Sequence[str]) -> List[str]:
1153 outlist = []
1154 if isinstance(arg_list, (list, tuple)): 1154 ↛ 1158line 1154 didn't jump to line 1158, because the condition on line 1154 was never false
1155 for arg in arg_list: 1155 ↛ 1156line 1155 didn't jump to line 1156, because the loop on line 1155 never started
1156 outlist.extend(cls.__unpack_args(arg))
1157 else:
1158 outlist.append(str(arg_list))
1160 return outlist
1162 def __call__(self, **kwargs: Any) -> "Git":
1163 """Specify command line options to the git executable
1164 for a subcommand call
1166 :param kwargs:
1167 is a dict of keyword arguments.
1168 these arguments are passed as in _call_process
1169 but will be passed to the git command rather than
1170 the subcommand.
1172 ``Examples``::
1173 git(work_tree='/tmp').difftool()"""
1174 self._git_options = self.transform_kwargs(split_single_char_options=True, **kwargs)
1175 return self
1177 @overload
1178 def _call_process(self, method: str, *args: None, **kwargs: None) -> str:
1179 ... # if no args given, execute called with all defaults
1181 @overload
1182 def _call_process(
1183 self,
1184 method: str,
1185 istream: int,
1186 as_process: Literal[True],
1187 *args: Any,
1188 **kwargs: Any,
1189 ) -> "Git.AutoInterrupt":
1190 ...
1192 @overload
1193 def _call_process(
1194 self, method: str, *args: Any, **kwargs: Any
1195 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]:
1196 ...
1198 def _call_process(
1199 self, method: str, *args: Any, **kwargs: Any
1200 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]:
1201 """Run the given git command with the specified arguments and return
1202 the result as a String
1204 :param method:
1205 is the command. Contained "_" characters will be converted to dashes,
1206 such as in 'ls_files' to call 'ls-files'.
1208 :param args:
1209 is the list of arguments. If None is included, it will be pruned.
1210 This allows your commands to call git more conveniently as None
1211 is realized as non-existent
1213 :param kwargs:
1214 It contains key-values for the following:
1215 - the :meth:`execute()` kwds, as listed in :var:`execute_kwargs`;
1216 - "command options" to be converted by :meth:`transform_kwargs()`;
1217 - the `'insert_kwargs_after'` key which its value must match one of ``*args``
1218 and any cmd-options will be appended after the matched arg.
1220 Examples::
1222 git.rev_list('master', max_count=10, header=True)
1224 turns into::
1226 git rev-list max-count 10 --header master
1228 :return: Same as ``execute``
1229 if no args given used execute default (esp. as_process = False, stdout_as_string = True)
1230 and return str"""
1231 # Handle optional arguments prior to calling transform_kwargs
1232 # otherwise these'll end up in args, which is bad.
1233 exec_kwargs = {k: v for k, v in kwargs.items() if k in execute_kwargs}
1234 opts_kwargs = {k: v for k, v in kwargs.items() if k not in execute_kwargs}
1236 insert_after_this_arg = opts_kwargs.pop("insert_kwargs_after", None)
1238 # Prepare the argument list
1240 opt_args = self.transform_kwargs(**opts_kwargs)
1241 ext_args = self.__unpack_args([a for a in args if a is not None])
1243 if insert_after_this_arg is None: 1243 ↛ 1246line 1243 didn't jump to line 1246, because the condition on line 1243 was never false
1244 args_list = opt_args + ext_args
1245 else:
1246 try:
1247 index = ext_args.index(insert_after_this_arg)
1248 except ValueError as err:
1249 raise ValueError(
1250 "Couldn't find argument '%s' in args %s to insert cmd options after"
1251 % (insert_after_this_arg, str(ext_args))
1252 ) from err
1253 # end handle error
1254 args_list = ext_args[: index + 1] + opt_args + ext_args[index + 1 :]
1255 # end handle opts_kwargs
1257 call = [self.GIT_PYTHON_GIT_EXECUTABLE]
1259 # add persistent git options
1260 call.extend(self._persistent_git_options)
1262 # add the git options, then reset to empty
1263 # to avoid side_effects
1264 call.extend(self._git_options)
1265 self._git_options = ()
1267 call.append(dashify(method))
1268 call.extend(args_list)
1270 return self.execute(call, **exec_kwargs)
1272 def _parse_object_header(self, header_line: str) -> Tuple[str, str, int]:
1273 """
1274 :param header_line:
1275 <hex_sha> type_string size_as_int
1277 :return: (hex_sha, type_string, size_as_int)
1279 :raise ValueError: if the header contains indication for an error due to
1280 incorrect input sha"""
1281 tokens = header_line.split()
1282 if len(tokens) != 3:
1283 if not tokens:
1284 raise ValueError("SHA could not be resolved, git returned: %r" % (header_line.strip()))
1285 else:
1286 raise ValueError("SHA %s could not be resolved, git returned: %r" % (tokens[0], header_line.strip()))
1287 # END handle actual return value
1288 # END error handling
1290 if len(tokens[0]) != 40:
1291 raise ValueError("Failed to parse header: %r" % header_line)
1292 return (tokens[0], tokens[1], int(tokens[2]))
1294 def _prepare_ref(self, ref: AnyStr) -> bytes:
1295 # required for command to separate refs on stdin, as bytes
1296 if isinstance(ref, bytes):
1297 # Assume 40 bytes hexsha - bin-to-ascii for some reason returns bytes, not text
1298 refstr: str = ref.decode("ascii")
1299 elif not isinstance(ref, str):
1300 refstr = str(ref) # could be ref-object
1301 else:
1302 refstr = ref
1304 if not refstr.endswith("\n"):
1305 refstr += "\n"
1306 return refstr.encode(defenc)
1308 def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any) -> "Git.AutoInterrupt":
1309 cur_val = getattr(self, attr_name)
1310 if cur_val is not None:
1311 return cur_val
1313 options = {"istream": PIPE, "as_process": True}
1314 options.update(kwargs)
1316 cmd = self._call_process(cmd_name, *args, **options)
1317 setattr(self, attr_name, cmd)
1318 cmd = cast("Git.AutoInterrupt", cmd)
1319 return cmd
1321 def __get_object_header(self, cmd: "Git.AutoInterrupt", ref: AnyStr) -> Tuple[str, str, int]:
1322 if cmd.stdin and cmd.stdout:
1323 cmd.stdin.write(self._prepare_ref(ref))
1324 cmd.stdin.flush()
1325 return self._parse_object_header(cmd.stdout.readline())
1326 else:
1327 raise ValueError("cmd stdin was empty")
1329 def get_object_header(self, ref: str) -> Tuple[str, str, int]:
1330 """Use this method to quickly examine the type and size of the object behind
1331 the given ref.
1333 :note: The method will only suffer from the costs of command invocation
1334 once and reuses the command in subsequent calls.
1336 :return: (hexsha, type_string, size_as_int)"""
1337 cmd = self._get_persistent_cmd("cat_file_header", "cat_file", batch_check=True)
1338 return self.__get_object_header(cmd, ref)
1340 def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]:
1341 """As get_object_header, but returns object data as well
1342 :return: (hexsha, type_string, size_as_int,data_string)
1343 :note: not threadsafe"""
1344 hexsha, typename, size, stream = self.stream_object_data(ref)
1345 data = stream.read(size)
1346 del stream
1347 return (hexsha, typename, size, data)
1349 def stream_object_data(self, ref: str) -> Tuple[str, str, int, "Git.CatFileContentStream"]:
1350 """As get_object_header, but returns the data as a stream
1352 :return: (hexsha, type_string, size_as_int, stream)
1353 :note: This method is not threadsafe, you need one independent Command instance per thread to be safe !"""
1354 cmd = self._get_persistent_cmd("cat_file_all", "cat_file", batch=True)
1355 hexsha, typename, size = self.__get_object_header(cmd, ref)
1356 cmd_stdout = cmd.stdout if cmd.stdout is not None else io.BytesIO()
1357 return (hexsha, typename, size, self.CatFileContentStream(size, cmd_stdout))
1359 def clear_cache(self) -> "Git":
1360 """Clear all kinds of internal caches to release resources.
1362 Currently persistent commands will be interrupted.
1364 :return: self"""
1365 for cmd in (self.cat_file_all, self.cat_file_header):
1366 if cmd:
1367 cmd.__del__()
1369 self.cat_file_all = None
1370 self.cat_file_header = None
1371 return self