Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/util.py: 29%
538 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# utils.py
2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
3#
4# This module is part of GitPython and is released under
5# the BSD License: http://www.opensource.org/licenses/bsd-license.php
7from abc import abstractmethod
8import os.path as osp
9from .compat import is_win
10import contextlib
11from functools import wraps
12import getpass
13import logging
14import os
15import platform
16import subprocess
17import re
18import shutil
19import stat
20from sys import maxsize
21import time
22from urllib.parse import urlsplit, urlunsplit
23import warnings
25# from git.objects.util import Traversable
27# typing ---------------------------------------------------------
29from typing import (
30 Any,
31 AnyStr,
32 BinaryIO,
33 Callable,
34 Dict,
35 Generator,
36 IO,
37 Iterator,
38 List,
39 Optional,
40 Pattern,
41 Sequence,
42 Tuple,
43 TypeVar,
44 Union,
45 cast,
46 TYPE_CHECKING,
47 overload,
48)
50import pathlib
52if TYPE_CHECKING: 52 ↛ 53line 52 didn't jump to line 53, because the condition on line 52 was never true
53 from git.remote import Remote
54 from git.repo.base import Repo
55 from git.config import GitConfigParser, SectionConstraint
56 from git import Git
58 # from git.objects.base import IndexObject
61from .types import (
62 Literal,
63 SupportsIndex,
64 Protocol,
65 runtime_checkable, # because behind py version guards
66 PathLike,
67 HSH_TD,
68 Total_TD,
69 Files_TD, # aliases
70 Has_id_attribute,
71)
73T_IterableObj = TypeVar("T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True)
74# So IterableList[Head] is subtype of IterableList[IterableObj]
76# ---------------------------------------------------------------------
79from gitdb.util import ( # NOQA @IgnorePep8
80 make_sha,
81 LockedFD, # @UnusedImport
82 file_contents_ro, # @UnusedImport
83 file_contents_ro_filepath, # @UnusedImport
84 LazyMixin, # @UnusedImport
85 to_hex_sha, # @UnusedImport
86 to_bin_sha, # @UnusedImport
87 bin_to_hex, # @UnusedImport
88 hex_to_bin, # @UnusedImport
89)
92# NOTE: Some of the unused imports might be used/imported by others.
93# Handle once test-cases are back up and running.
94# Most of these are unused here, but are for use by git-python modules so these
95# don't see gitdb all the time. Flake of course doesn't like it.
96__all__ = [
97 "stream_copy",
98 "join_path",
99 "to_native_path_linux",
100 "join_path_native",
101 "Stats",
102 "IndexFileSHA1Writer",
103 "IterableObj",
104 "IterableList",
105 "BlockingLockFile",
106 "LockFile",
107 "Actor",
108 "get_user_id",
109 "assure_directory_exists",
110 "RemoteProgress",
111 "CallableRemoteProgress",
112 "rmtree",
113 "unbare_repo",
114 "HIDE_WINDOWS_KNOWN_ERRORS",
115]
117log = logging.getLogger(__name__)
119# types############################################################
122#: We need an easy way to see if Appveyor TCs start failing,
123#: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy,
124#: till then, we wish to hide them.
125HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_KNOWN_ERRORS", True)
126HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_FREEZE_ERRORS", True)
128# { Utility Methods
130T = TypeVar("T")
133def unbare_repo(func: Callable[..., T]) -> Callable[..., T]:
134 """Methods with this decorator raise InvalidGitRepositoryError if they
135 encounter a bare repository"""
137 from .exc import InvalidGitRepositoryError
139 @wraps(func)
140 def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T:
141 if self.repo.bare:
142 raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__)
143 # END bare method
144 return func(self, *args, **kwargs)
146 # END wrapper
148 return wrapper
151@contextlib.contextmanager
152def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]:
153 old_dir = os.getcwd()
154 os.chdir(new_dir)
155 try:
156 yield new_dir
157 finally:
158 os.chdir(old_dir)
161def rmtree(path: PathLike) -> None:
162 """Remove the given recursively.
164 :note: we use shutil rmtree but adjust its behaviour to see whether files that
165 couldn't be deleted are read-only. Windows will not remove them in that case"""
167 def onerror(func: Callable, path: PathLike, exc_info: str) -> None:
168 # Is the error an access error ?
169 os.chmod(path, stat.S_IWUSR)
171 try:
172 func(path) # Will scream if still not possible to delete.
173 except Exception as ex:
174 if HIDE_WINDOWS_KNOWN_ERRORS:
175 from unittest import SkipTest
177 raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex
178 raise
180 return shutil.rmtree(path, False, onerror)
183def rmfile(path: PathLike) -> None:
184 """Ensure file deleted also on *Windows* where read-only files need special treatment."""
185 if osp.isfile(path):
186 if is_win:
187 os.chmod(path, 0o777)
188 os.remove(path)
191def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int:
192 """Copy all data from the source stream into the destination stream in chunks
193 of size chunk_size
195 :return: amount of bytes written"""
196 br = 0
197 while True:
198 chunk = source.read(chunk_size)
199 destination.write(chunk)
200 br += len(chunk)
201 if len(chunk) < chunk_size:
202 break
203 # END reading output stream
204 return br
207def join_path(a: PathLike, *p: PathLike) -> PathLike:
208 """Join path tokens together similar to osp.join, but always use
209 '/' instead of possibly '\' on windows."""
210 path = str(a)
211 for b in p:
212 b = str(b)
213 if not b:
214 continue
215 if b.startswith("/"):
216 path += b[1:]
217 elif path == "" or path.endswith("/"):
218 path += b
219 else:
220 path += "/" + b
221 # END for each path token to add
222 return path
225if is_win: 225 ↛ 227line 225 didn't jump to line 227, because the condition on line 225 was never true
227 def to_native_path_windows(path: PathLike) -> PathLike:
228 path = str(path)
229 return path.replace("/", "\\")
231 def to_native_path_linux(path: PathLike) -> str:
232 path = str(path)
233 return path.replace("\\", "/")
235 __all__.append("to_native_path_windows")
236 to_native_path = to_native_path_windows
237else:
238 # no need for any work on linux
239 def to_native_path_linux(path: PathLike) -> str:
240 return str(path)
242 to_native_path = to_native_path_linux
245def join_path_native(a: PathLike, *p: PathLike) -> PathLike:
246 """
247 As join path, but makes sure an OS native path is returned. This is only
248 needed to play it safe on my dear windows and to assure nice paths that only
249 use '\'"""
250 return to_native_path(join_path(a, *p))
253def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool:
254 """Assure that the directory pointed to by path exists.
256 :param is_file: If True, path is assumed to be a file and handled correctly.
257 Otherwise it must be a directory
258 :return: True if the directory was created, False if it already existed"""
259 if is_file:
260 path = osp.dirname(path)
261 # END handle file
262 if not osp.isdir(path):
263 os.makedirs(path, exist_ok=True)
264 return True
265 return False
268def _get_exe_extensions() -> Sequence[str]:
269 PATHEXT = os.environ.get("PATHEXT", None)
270 return (
271 tuple(p.upper() for p in PATHEXT.split(os.pathsep)) if PATHEXT else (".BAT", "COM", ".EXE") if is_win else ("")
272 )
275def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
276 # From: http://stackoverflow.com/a/377028/548792
277 winprog_exts = _get_exe_extensions()
279 def is_exec(fpath: str) -> bool:
280 return (
281 osp.isfile(fpath)
282 and os.access(fpath, os.X_OK)
283 and (os.name != "nt" or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts))
284 )
286 progs = []
287 if not path:
288 path = os.environ["PATH"]
289 for folder in str(path).split(os.pathsep):
290 folder = folder.strip('"')
291 if folder:
292 exe_path = osp.join(folder, program)
293 for f in [exe_path] + ["%s%s" % (exe_path, e) for e in winprog_exts]:
294 if is_exec(f):
295 progs.append(f)
296 return progs
299def _cygexpath(drive: Optional[str], path: str) -> str:
300 if osp.isabs(path) and not drive:
301 # Invoked from `cygpath()` directly with `D:Apps\123`?
302 # It's an error, leave it alone just slashes)
303 p = path # convert to str if AnyPath given
304 else:
305 p = path and osp.normpath(osp.expandvars(osp.expanduser(path)))
306 if osp.isabs(p):
307 if drive:
308 # Confusing, maybe a remote system should expand vars.
309 p = path
310 else:
311 p = cygpath(p)
312 elif drive:
313 p = "/proc/cygdrive/%s/%s" % (drive.lower(), p)
314 p_str = str(p) # ensure it is a str and not AnyPath
315 return p_str.replace("\\", "/")
318_cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = ( 318 ↛ exitline 318 didn't jump to the function exit
319 # See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx
320 # and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths
321 (
322 re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"),
323 (lambda server, share, rest_path: "//%s/%s/%s" % (server, share, rest_path.replace("\\", "/"))),
324 False,
325 ),
326 (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False),
327 (re.compile(r"(\w):[/\\](.*)"), (_cygexpath), False),
328 (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), True),
329 (re.compile(r"(\w{2,}:.*)"), (lambda url: url), False), # remote URL, do nothing
330)
333def cygpath(path: str) -> str:
334 """Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment."""
335 path = str(path) # ensure is str and not AnyPath.
336 # Fix to use Paths when 3.5 dropped. or to be just str if only for urls?
337 if not path.startswith(("/cygdrive", "//", "/proc/cygdrive")):
338 for regex, parser, recurse in _cygpath_parsers:
339 match = regex.match(path)
340 if match:
341 path = parser(*match.groups())
342 if recurse:
343 path = cygpath(path)
344 break
345 else:
346 path = _cygexpath(None, path)
348 return path
351_decygpath_regex = re.compile(r"(?:/proc)?/cygdrive/(\w)(/.*)?")
354def decygpath(path: PathLike) -> str:
355 path = str(path)
356 m = _decygpath_regex.match(path)
357 if m:
358 drive, rest_path = m.groups()
359 path = "%s:%s" % (drive.upper(), rest_path or "")
361 return path.replace("/", "\\")
364#: Store boolean flags denoting if a specific Git executable
365#: is from a Cygwin installation (since `cache_lru()` unsupported on PY2).
366_is_cygwin_cache: Dict[str, Optional[bool]] = {}
369@overload
370def is_cygwin_git(git_executable: None) -> Literal[False]:
371 ...
374@overload
375def is_cygwin_git(git_executable: PathLike) -> bool:
376 ...
379def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool:
380 if is_win:
381 # is_win seems to be true only for Windows-native pythons
382 # cygwin has os.name = posix, I think
383 return False
385 if git_executable is None:
386 return False
388 git_executable = str(git_executable)
389 is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool]
390 if is_cygwin is None:
391 is_cygwin = False
392 try:
393 git_dir = osp.dirname(git_executable)
394 if not git_dir:
395 res = py_where(git_executable)
396 git_dir = osp.dirname(res[0]) if res else ""
398 # Just a name given, not a real path.
399 uname_cmd = osp.join(git_dir, "uname")
400 process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True)
401 uname_out, _ = process.communicate()
402 # retcode = process.poll()
403 is_cygwin = "CYGWIN" in uname_out
404 except Exception as ex:
405 log.debug("Failed checking if running in CYGWIN due to: %r", ex)
406 _is_cygwin_cache[git_executable] = is_cygwin
408 return is_cygwin
411def get_user_id() -> str:
412 """:return: string identifying the currently active system user as name@node"""
413 return "%s@%s" % (getpass.getuser(), platform.node())
416def finalize_process(proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any) -> None:
417 """Wait for the process (clone, fetch, pull or push) and handle its errors accordingly"""
418 # TODO: No close proc-streams??
419 proc.wait(**kwargs)
422@overload
423def expand_path(p: None, expand_vars: bool = ...) -> None:
424 ...
427@overload
428def expand_path(p: PathLike, expand_vars: bool = ...) -> str:
429 # improve these overloads when 3.5 dropped
430 ...
433def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]:
434 if isinstance(p, pathlib.Path): 434 ↛ 435line 434 didn't jump to line 435, because the condition on line 434 was never true
435 return p.resolve()
436 try:
437 p = osp.expanduser(p) # type: ignore
438 if expand_vars:
439 p = osp.expandvars(p) # type: ignore
440 return osp.normpath(osp.abspath(p)) # type: ignore
441 except Exception:
442 return None
445def remove_password_if_present(cmdline: Sequence[str]) -> List[str]:
446 """
447 Parse any command line argument and if on of the element is an URL with a
448 username and/or password, replace them by stars (in-place).
450 If nothing found just returns the command line as-is.
452 This should be used for every log line that print a command line, as well as
453 exception messages.
454 """
455 new_cmdline = []
456 for index, to_parse in enumerate(cmdline):
457 new_cmdline.append(to_parse)
458 try:
459 url = urlsplit(to_parse)
460 # Remove password from the URL if present
461 if url.password is None and url.username is None: 461 ↛ 464line 461 didn't jump to line 464, because the condition on line 461 was never false
462 continue
464 if url.password is not None:
465 url = url._replace(netloc=url.netloc.replace(url.password, "*****"))
466 if url.username is not None:
467 url = url._replace(netloc=url.netloc.replace(url.username, "*****"))
468 new_cmdline[index] = urlunsplit(url)
469 except ValueError:
470 # This is not a valid URL
471 continue
472 return new_cmdline
475# } END utilities
477# { Classes
480class RemoteProgress(object):
481 """
482 Handler providing an interface to parse progress information emitted by git-push
483 and git-fetch and to dispatch callbacks allowing subclasses to react to the progress.
484 """
486 _num_op_codes: int = 9
487 (
488 BEGIN,
489 END,
490 COUNTING,
491 COMPRESSING,
492 WRITING,
493 RECEIVING,
494 RESOLVING,
495 FINDING_SOURCES,
496 CHECKING_OUT,
497 ) = [1 << x for x in range(_num_op_codes)]
498 STAGE_MASK = BEGIN | END
499 OP_MASK = ~STAGE_MASK
501 DONE_TOKEN = "done."
502 TOKEN_SEPARATOR = ", "
504 __slots__ = (
505 "_cur_line",
506 "_seen_ops",
507 "error_lines", # Lines that started with 'error:' or 'fatal:'.
508 "other_lines",
509 ) # Lines not denoting progress (i.e.g. push-infos).
510 re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)")
511 re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)")
513 def __init__(self) -> None:
514 self._seen_ops: List[int] = []
515 self._cur_line: Optional[str] = None
516 self.error_lines: List[str] = []
517 self.other_lines: List[str] = []
519 def _parse_progress_line(self, line: AnyStr) -> None:
520 """Parse progress information from the given line as retrieved by git-push
521 or git-fetch.
523 - Lines that do not contain progress info are stored in :attr:`other_lines`.
524 - Lines that seem to contain an error (i.e. start with error: or fatal:) are stored
525 in :attr:`error_lines`."""
526 # handle
527 # Counting objects: 4, done.
528 # Compressing objects: 50% (1/2)
529 # Compressing objects: 100% (2/2)
530 # Compressing objects: 100% (2/2), done.
531 if isinstance(line, bytes): # mypy argues about ternary assignment
532 line_str = line.decode("utf-8")
533 else:
534 line_str = line
535 self._cur_line = line_str
537 if self._cur_line.startswith(("error:", "fatal:")):
538 self.error_lines.append(self._cur_line)
539 return
541 # find escape characters and cut them away - regex will not work with
542 # them as they are non-ascii. As git might expect a tty, it will send them
543 last_valid_index = None
544 for i, c in enumerate(reversed(line_str)):
545 if ord(c) < 32:
546 # its a slice index
547 last_valid_index = -i - 1
548 # END character was non-ascii
549 # END for each character in line
550 if last_valid_index is not None:
551 line_str = line_str[:last_valid_index]
552 # END cut away invalid part
553 line_str = line_str.rstrip()
555 cur_count, max_count = None, None
556 match = self.re_op_relative.match(line_str)
557 if match is None:
558 match = self.re_op_absolute.match(line_str)
560 if not match:
561 self.line_dropped(line_str)
562 self.other_lines.append(line_str)
563 return
564 # END could not get match
566 op_code = 0
567 _remote, op_name, _percent, cur_count, max_count, message = match.groups()
569 # get operation id
570 if op_name == "Counting objects":
571 op_code |= self.COUNTING
572 elif op_name == "Compressing objects":
573 op_code |= self.COMPRESSING
574 elif op_name == "Writing objects":
575 op_code |= self.WRITING
576 elif op_name == "Receiving objects":
577 op_code |= self.RECEIVING
578 elif op_name == "Resolving deltas":
579 op_code |= self.RESOLVING
580 elif op_name == "Finding sources":
581 op_code |= self.FINDING_SOURCES
582 elif op_name == "Checking out files":
583 op_code |= self.CHECKING_OUT
584 else:
585 # Note: On windows it can happen that partial lines are sent
586 # Hence we get something like "CompreReceiving objects", which is
587 # a blend of "Compressing objects" and "Receiving objects".
588 # This can't really be prevented, so we drop the line verbosely
589 # to make sure we get informed in case the process spits out new
590 # commands at some point.
591 self.line_dropped(line_str)
592 # Note: Don't add this line to the other lines, as we have to silently
593 # drop it
594 return None
595 # END handle op code
597 # figure out stage
598 if op_code not in self._seen_ops:
599 self._seen_ops.append(op_code)
600 op_code |= self.BEGIN
601 # END begin opcode
603 if message is None:
604 message = ""
605 # END message handling
607 message = message.strip()
608 if message.endswith(self.DONE_TOKEN):
609 op_code |= self.END
610 message = message[: -len(self.DONE_TOKEN)]
611 # END end message handling
612 message = message.strip(self.TOKEN_SEPARATOR)
614 self.update(
615 op_code,
616 cur_count and float(cur_count),
617 max_count and float(max_count),
618 message,
619 )
621 def new_message_handler(self) -> Callable[[str], None]:
622 """
623 :return:
624 a progress handler suitable for handle_process_output(), passing lines on to this Progress
625 handler in a suitable format"""
627 def handler(line: AnyStr) -> None:
628 return self._parse_progress_line(line.rstrip())
630 # end
631 return handler
633 def line_dropped(self, line: str) -> None:
634 """Called whenever a line could not be understood and was therefore dropped."""
635 pass
637 def update(
638 self,
639 op_code: int,
640 cur_count: Union[str, float],
641 max_count: Union[str, float, None] = None,
642 message: str = "",
643 ) -> None:
644 """Called whenever the progress changes
646 :param op_code:
647 Integer allowing to be compared against Operation IDs and stage IDs.
649 Stage IDs are BEGIN and END. BEGIN will only be set once for each Operation
650 ID as well as END. It may be that BEGIN and END are set at once in case only
651 one progress message was emitted due to the speed of the operation.
652 Between BEGIN and END, none of these flags will be set
654 Operation IDs are all held within the OP_MASK. Only one Operation ID will
655 be active per call.
656 :param cur_count: Current absolute count of items
658 :param max_count:
659 The maximum count of items we expect. It may be None in case there is
660 no maximum number of items or if it is (yet) unknown.
662 :param message:
663 In case of the 'WRITING' operation, it contains the amount of bytes
664 transferred. It may possibly be used for other purposes as well.
666 You may read the contents of the current line in self._cur_line"""
667 pass
670class CallableRemoteProgress(RemoteProgress):
671 """An implementation forwarding updates to any callable"""
673 __slots__ = "_callable"
675 def __init__(self, fn: Callable) -> None:
676 self._callable = fn
677 super(CallableRemoteProgress, self).__init__()
679 def update(self, *args: Any, **kwargs: Any) -> None:
680 self._callable(*args, **kwargs)
683class Actor(object):
684 """Actors hold information about a person acting on the repository. They
685 can be committers and authors or anything with a name and an email as
686 mentioned in the git log entries."""
688 # PRECOMPILED REGEX
689 name_only_regex = re.compile(r"<(.*)>")
690 name_email_regex = re.compile(r"(.*) <(.*?)>")
692 # ENVIRONMENT VARIABLES
693 # read when creating new commits
694 env_author_name = "GIT_AUTHOR_NAME"
695 env_author_email = "GIT_AUTHOR_EMAIL"
696 env_committer_name = "GIT_COMMITTER_NAME"
697 env_committer_email = "GIT_COMMITTER_EMAIL"
699 # CONFIGURATION KEYS
700 conf_name = "name"
701 conf_email = "email"
703 __slots__ = ("name", "email")
705 def __init__(self, name: Optional[str], email: Optional[str]) -> None:
706 self.name = name
707 self.email = email
709 def __eq__(self, other: Any) -> bool:
710 return self.name == other.name and self.email == other.email
712 def __ne__(self, other: Any) -> bool:
713 return not (self == other)
715 def __hash__(self) -> int:
716 return hash((self.name, self.email))
718 def __str__(self) -> str:
719 return self.name if self.name else ""
721 def __repr__(self) -> str:
722 return '<git.Actor "%s <%s>">' % (self.name, self.email)
724 @classmethod
725 def _from_string(cls, string: str) -> "Actor":
726 """Create an Actor from a string.
727 :param string: is the string, which is expected to be in regular git format
729 John Doe <jdoe@example.com>
731 :return: Actor"""
732 m = cls.name_email_regex.search(string)
733 if m:
734 name, email = m.groups()
735 return Actor(name, email)
736 else:
737 m = cls.name_only_regex.search(string)
738 if m:
739 return Actor(m.group(1), None)
740 # assume best and use the whole string as name
741 return Actor(string, None)
742 # END special case name
743 # END handle name/email matching
745 @classmethod
746 def _main_actor(
747 cls,
748 env_name: str,
749 env_email: str,
750 config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None,
751 ) -> "Actor":
752 actor = Actor("", "")
753 user_id = None # We use this to avoid multiple calls to getpass.getuser()
755 def default_email() -> str:
756 nonlocal user_id
757 if not user_id:
758 user_id = get_user_id()
759 return user_id
761 def default_name() -> str:
762 return default_email().split("@")[0]
764 for attr, evar, cvar, default in (
765 ("name", env_name, cls.conf_name, default_name),
766 ("email", env_email, cls.conf_email, default_email),
767 ):
768 try:
769 val = os.environ[evar]
770 setattr(actor, attr, val)
771 except KeyError:
772 if config_reader is not None:
773 try:
774 val = config_reader.get("user", cvar)
775 except Exception:
776 val = default()
777 setattr(actor, attr, val)
778 # END config-reader handling
779 if not getattr(actor, attr):
780 setattr(actor, attr, default())
781 # END handle name
782 # END for each item to retrieve
783 return actor
785 @classmethod
786 def committer(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor":
787 """
788 :return: Actor instance corresponding to the configured committer. It behaves
789 similar to the git implementation, such that the environment will override
790 configuration values of config_reader. If no value is set at all, it will be
791 generated
792 :param config_reader: ConfigReader to use to retrieve the values from in case
793 they are not set in the environment"""
794 return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader)
796 @classmethod
797 def author(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor":
798 """Same as committer(), but defines the main author. It may be specified in the environment,
799 but defaults to the committer"""
800 return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader)
803class Stats(object):
805 """
806 Represents stat information as presented by git at the end of a merge. It is
807 created from the output of a diff operation.
809 ``Example``::
811 c = Commit( sha1 )
812 s = c.stats
813 s.total # full-stat-dict
814 s.files # dict( filepath : stat-dict )
816 ``stat-dict``
818 A dictionary with the following keys and values::
820 deletions = number of deleted lines as int
821 insertions = number of inserted lines as int
822 lines = total number of lines changed as int, or deletions + insertions
824 ``full-stat-dict``
826 In addition to the items in the stat-dict, it features additional information::
828 files = number of changed files as int"""
830 __slots__ = ("total", "files")
832 def __init__(self, total: Total_TD, files: Dict[PathLike, Files_TD]):
833 self.total = total
834 self.files = files
836 @classmethod
837 def _list_from_string(cls, repo: "Repo", text: str) -> "Stats":
838 """Create a Stat object from output retrieved by git-diff.
840 :return: git.Stat"""
842 hsh: HSH_TD = {
843 "total": {"insertions": 0, "deletions": 0, "lines": 0, "files": 0},
844 "files": {},
845 }
846 for line in text.splitlines():
847 (raw_insertions, raw_deletions, filename) = line.split("\t")
848 insertions = raw_insertions != "-" and int(raw_insertions) or 0
849 deletions = raw_deletions != "-" and int(raw_deletions) or 0
850 hsh["total"]["insertions"] += insertions
851 hsh["total"]["deletions"] += deletions
852 hsh["total"]["lines"] += insertions + deletions
853 hsh["total"]["files"] += 1
854 files_dict: Files_TD = {
855 "insertions": insertions,
856 "deletions": deletions,
857 "lines": insertions + deletions,
858 }
859 hsh["files"][filename.strip()] = files_dict
860 return Stats(hsh["total"], hsh["files"])
863class IndexFileSHA1Writer(object):
865 """Wrapper around a file-like object that remembers the SHA1 of
866 the data written to it. It will write a sha when the stream is closed
867 or if the asked for explicitly using write_sha.
869 Only useful to the indexfile
871 :note: Based on the dulwich project"""
873 __slots__ = ("f", "sha1")
875 def __init__(self, f: IO) -> None:
876 self.f = f
877 self.sha1 = make_sha(b"")
879 def write(self, data: AnyStr) -> int:
880 self.sha1.update(data)
881 return self.f.write(data)
883 def write_sha(self) -> bytes:
884 sha = self.sha1.digest()
885 self.f.write(sha)
886 return sha
888 def close(self) -> bytes:
889 sha = self.write_sha()
890 self.f.close()
891 return sha
893 def tell(self) -> int:
894 return self.f.tell()
897class LockFile(object):
899 """Provides methods to obtain, check for, and release a file based lock which
900 should be used to handle concurrent access to the same file.
902 As we are a utility class to be derived from, we only use protected methods.
904 Locks will automatically be released on destruction"""
906 __slots__ = ("_file_path", "_owns_lock")
908 def __init__(self, file_path: PathLike) -> None:
909 self._file_path = file_path
910 self._owns_lock = False
912 def __del__(self) -> None:
913 self._release_lock()
915 def _lock_file_path(self) -> str:
916 """:return: Path to lockfile"""
917 return "%s.lock" % (self._file_path)
919 def _has_lock(self) -> bool:
920 """:return: True if we have a lock and if the lockfile still exists
921 :raise AssertionError: if our lock-file does not exist"""
922 return self._owns_lock
924 def _obtain_lock_or_raise(self) -> None:
925 """Create a lock file as flag for other instances, mark our instance as lock-holder
927 :raise IOError: if a lock was already present or a lock file could not be written"""
928 if self._has_lock():
929 return
930 lock_file = self._lock_file_path()
931 if osp.isfile(lock_file):
932 raise IOError(
933 "Lock for file %r did already exist, delete %r in case the lock is illegal"
934 % (self._file_path, lock_file)
935 )
937 try:
938 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
939 if is_win:
940 flags |= os.O_SHORT_LIVED
941 fd = os.open(lock_file, flags, 0)
942 os.close(fd)
943 except OSError as e:
944 raise IOError(str(e)) from e
946 self._owns_lock = True
948 def _obtain_lock(self) -> None:
949 """The default implementation will raise if a lock cannot be obtained.
950 Subclasses may override this method to provide a different implementation"""
951 return self._obtain_lock_or_raise()
953 def _release_lock(self) -> None:
954 """Release our lock if we have one"""
955 if not self._has_lock():
956 return
958 # if someone removed our file beforhand, lets just flag this issue
959 # instead of failing, to make it more usable.
960 lfp = self._lock_file_path()
961 try:
962 rmfile(lfp)
963 except OSError:
964 pass
965 self._owns_lock = False
968class BlockingLockFile(LockFile):
970 """The lock file will block until a lock could be obtained, or fail after
971 a specified timeout.
973 :note: If the directory containing the lock was removed, an exception will
974 be raised during the blocking period, preventing hangs as the lock
975 can never be obtained."""
977 __slots__ = ("_check_interval", "_max_block_time")
979 def __init__(
980 self,
981 file_path: PathLike,
982 check_interval_s: float = 0.3,
983 max_block_time_s: int = maxsize,
984 ) -> None:
985 """Configure the instance
987 :param check_interval_s:
988 Period of time to sleep until the lock is checked the next time.
989 By default, it waits a nearly unlimited time
991 :param max_block_time_s: Maximum amount of seconds we may lock"""
992 super(BlockingLockFile, self).__init__(file_path)
993 self._check_interval = check_interval_s
994 self._max_block_time = max_block_time_s
996 def _obtain_lock(self) -> None:
997 """This method blocks until it obtained the lock, or raises IOError if
998 it ran out of time or if the parent directory was not available anymore.
999 If this method returns, you are guaranteed to own the lock"""
1000 starttime = time.time()
1001 maxtime = starttime + float(self._max_block_time)
1002 while True:
1003 try:
1004 super(BlockingLockFile, self)._obtain_lock()
1005 except IOError as e:
1006 # synity check: if the directory leading to the lockfile is not
1007 # readable anymore, raise an exception
1008 curtime = time.time()
1009 if not osp.isdir(osp.dirname(self._lock_file_path())):
1010 msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % (
1011 self._lock_file_path(),
1012 curtime - starttime,
1013 )
1014 raise IOError(msg) from e
1015 # END handle missing directory
1017 if curtime >= maxtime:
1018 msg = "Waited %g seconds for lock at %r" % (
1019 maxtime - starttime,
1020 self._lock_file_path(),
1021 )
1022 raise IOError(msg) from e
1023 # END abort if we wait too long
1024 time.sleep(self._check_interval)
1025 else:
1026 break
1027 # END endless loop
1030class IterableList(List[T_IterableObj]):
1032 """
1033 List of iterable objects allowing to query an object by id or by named index::
1035 heads = repo.heads
1036 heads.master
1037 heads['master']
1038 heads[0]
1040 Iterable parent objects = [Commit, SubModule, Reference, FetchInfo, PushInfo]
1041 Iterable via inheritance = [Head, TagReference, RemoteReference]
1042 ]
1043 It requires an id_attribute name to be set which will be queried from its
1044 contained items to have a means for comparison.
1046 A prefix can be specified which is to be used in case the id returned by the
1047 items always contains a prefix that does not matter to the user, so it
1048 can be left out."""
1050 __slots__ = ("_id_attr", "_prefix")
1052 def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[IterableObj]":
1053 return super(IterableList, cls).__new__(cls)
1055 def __init__(self, id_attr: str, prefix: str = "") -> None:
1056 self._id_attr = id_attr
1057 self._prefix = prefix
1059 def __contains__(self, attr: object) -> bool:
1060 # first try identity match for performance
1061 try:
1062 rval = list.__contains__(self, attr)
1063 if rval:
1064 return rval
1065 except (AttributeError, TypeError):
1066 pass
1067 # END handle match
1069 # otherwise make a full name search
1070 try:
1071 getattr(self, cast(str, attr)) # use cast to silence mypy
1072 return True
1073 except (AttributeError, TypeError):
1074 return False
1075 # END handle membership
1077 def __getattr__(self, attr: str) -> T_IterableObj:
1078 attr = self._prefix + attr
1079 for item in self:
1080 if getattr(item, self._id_attr) == attr:
1081 return item
1082 # END for each item
1083 return list.__getattribute__(self, attr)
1085 def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore
1087 assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str"
1089 if isinstance(index, int):
1090 return list.__getitem__(self, index)
1091 elif isinstance(index, slice):
1092 raise ValueError("Index should be an int or str")
1093 else:
1094 try:
1095 return getattr(self, index)
1096 except AttributeError as e:
1097 raise IndexError("No item found with id %r" % (self._prefix + index)) from e
1098 # END handle getattr
1100 def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None:
1102 assert isinstance(index, (int, str)), "Index of IterableList should be an int or str"
1104 delindex = cast(int, index)
1105 if not isinstance(index, int):
1106 delindex = -1
1107 name = self._prefix + index
1108 for i, item in enumerate(self):
1109 if getattr(item, self._id_attr) == name:
1110 delindex = i
1111 break
1112 # END search index
1113 # END for each item
1114 if delindex == -1:
1115 raise IndexError("Item with name %s not found" % name)
1116 # END handle error
1117 # END get index to delete
1118 list.__delitem__(self, delindex)
1121class IterableClassWatcher(type):
1122 """Metaclass that watches"""
1124 def __init__(cls, name: str, bases: Tuple, clsdict: Dict) -> None:
1125 for base in bases: 1125 ↛ 1126line 1125 didn't jump to line 1126, because the loop on line 1125 never started
1126 if type(base) == IterableClassWatcher:
1127 warnings.warn(
1128 f"GitPython Iterable subclassed by {name}. "
1129 "Iterable is deprecated due to naming clash since v3.1.18"
1130 " and will be removed in 3.1.20, "
1131 "Use IterableObj instead \n",
1132 DeprecationWarning,
1133 stacklevel=2,
1134 )
1137class Iterable(metaclass=IterableClassWatcher):
1139 """Defines an interface for iterable items which is to assure a uniform
1140 way to retrieve and iterate items within the git repository"""
1142 __slots__ = ()
1143 _id_attribute_ = "attribute that most suitably identifies your instance"
1145 @classmethod
1146 def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any:
1147 """
1148 Deprecated, use IterableObj instead.
1149 Find all items of this type - subclasses can specify args and kwargs differently.
1150 If no args are given, subclasses are obliged to return all items if no additional
1151 arguments arg given.
1153 :note: Favor the iter_items method as it will
1155 :return:list(Item,...) list of item instances"""
1156 out_list: Any = IterableList(cls._id_attribute_)
1157 out_list.extend(cls.iter_items(repo, *args, **kwargs))
1158 return out_list
1160 @classmethod
1161 def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any:
1162 # return typed to be compatible with subtypes e.g. Remote
1163 """For more information about the arguments, see list_items
1164 :return: iterator yielding Items"""
1165 raise NotImplementedError("To be implemented by Subclass")
1168@runtime_checkable
1169class IterableObj(Protocol):
1170 """Defines an interface for iterable items which is to assure a uniform
1171 way to retrieve and iterate items within the git repository
1173 Subclasses = [Submodule, Commit, Reference, PushInfo, FetchInfo, Remote]"""
1175 __slots__ = ()
1176 _id_attribute_: str
1178 @classmethod
1179 def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]:
1180 """
1181 Find all items of this type - subclasses can specify args and kwargs differently.
1182 If no args are given, subclasses are obliged to return all items if no additional
1183 arguments arg given.
1185 :note: Favor the iter_items method as it will
1187 :return:list(Item,...) list of item instances"""
1188 out_list: IterableList = IterableList(cls._id_attribute_)
1189 out_list.extend(cls.iter_items(repo, *args, **kwargs))
1190 return out_list
1192 @classmethod
1193 @abstractmethod
1194 def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]:
1195 # return typed to be compatible with subtypes e.g. Remote
1196 """For more information about the arguments, see list_items
1197 :return: iterator yielding Items"""
1198 raise NotImplementedError("To be implemented by Subclass")
1201# } END classes
1204class NullHandler(logging.Handler):
1205 def emit(self, record: object) -> None:
1206 pass