Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/repo/base.py: 18%
563 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# repo.py
2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
3#
4# This module is part of GitPython and is released under
5# the BSD License: http://www.opensource.org/licenses/bsd-license.php
6from __future__ import annotations
7import logging
8import os
9import re
10import shlex
11import warnings
12from gitdb.db.loose import LooseObjectDB
14from gitdb.exc import BadObject
16from git.cmd import Git, handle_process_output
17from git.compat import (
18 defenc,
19 safe_decode,
20 is_win,
21)
22from git.config import GitConfigParser
23from git.db import GitCmdObjectDB
24from git.exc import InvalidGitRepositoryError, NoSuchPathError, GitCommandError
25from git.index import IndexFile
26from git.objects import Submodule, RootModule, Commit
27from git.refs import HEAD, Head, Reference, TagReference
28from git.remote import Remote, add_progress, to_progress_instance
29from git.util import (
30 Actor,
31 finalize_process,
32 cygpath,
33 hex_to_bin,
34 expand_path,
35 remove_password_if_present,
36)
37import os.path as osp
39from .fun import (
40 rev_parse,
41 is_git_dir,
42 find_submodule_git_dir,
43 touch,
44 find_worktree_git_dir,
45)
46import gc
47import gitdb
49# typing ------------------------------------------------------
51from git.types import (
52 TBD,
53 PathLike,
54 Lit_config_levels,
55 Commit_ish,
56 Tree_ish,
57 assert_never,
58)
59from typing import (
60 Any,
61 BinaryIO,
62 Callable,
63 Dict,
64 Iterator,
65 List,
66 Mapping,
67 Optional,
68 Sequence,
69 TextIO,
70 Tuple,
71 Type,
72 Union,
73 NamedTuple,
74 cast,
75 TYPE_CHECKING,
76)
78from git.types import ConfigLevels_Tup, TypedDict
80if TYPE_CHECKING: 80 ↛ 81line 80 didn't jump to line 81, because the condition on line 80 was never true
81 from git.util import IterableList
82 from git.refs.symbolic import SymbolicReference
83 from git.objects import Tree
84 from git.objects.submodule.base import UpdateProgress
85 from git.remote import RemoteProgress
87# -----------------------------------------------------------
89log = logging.getLogger(__name__)
91__all__ = ("Repo",)
94class BlameEntry(NamedTuple):
95 commit: Dict[str, "Commit"]
96 linenos: range
97 orig_path: Optional[str]
98 orig_linenos: range
101class Repo(object):
102 """Represents a git repository and allows you to query references,
103 gather commit information, generate diffs, create and clone repositories query
104 the log.
106 The following attributes are worth using:
108 'working_dir' is the working directory of the git command, which is the working tree
109 directory if available or the .git directory in case of bare repositories
111 'working_tree_dir' is the working tree directory, but will raise AssertionError
112 if we are a bare repository.
114 'git_dir' is the .git repository directory, which is always set."""
116 DAEMON_EXPORT_FILE = "git-daemon-export-ok"
118 git = cast("Git", None) # Must exist, or __del__ will fail in case we raise on `__init__()`
119 working_dir: Optional[PathLike] = None
120 _working_tree_dir: Optional[PathLike] = None
121 git_dir: PathLike = ""
122 _common_dir: PathLike = ""
124 # precompiled regex
125 re_whitespace = re.compile(r"\s+")
126 re_hexsha_only = re.compile("^[0-9A-Fa-f]{40}$")
127 re_hexsha_shortened = re.compile("^[0-9A-Fa-f]{4,40}$")
128 re_envvars = re.compile(r"(\$(\{\s?)?[a-zA-Z_]\w*(\}\s?)?|%\s?[a-zA-Z_]\w*\s?%)")
129 re_author_committer_start = re.compile(r"^(author|committer)")
130 re_tab_full_line = re.compile(r"^\t(.*)$")
132 # invariants
133 # represents the configuration level of a configuration file
134 config_level: ConfigLevels_Tup = ("system", "user", "global", "repository")
136 # Subclass configuration
137 # Subclasses may easily bring in their own custom types by placing a constructor or type here
138 GitCommandWrapperType = Git
140 def __init__(
141 self,
142 path: Optional[PathLike] = None,
143 odbt: Type[LooseObjectDB] = GitCmdObjectDB,
144 search_parent_directories: bool = False,
145 expand_vars: bool = True,
146 ) -> None:
147 """Create a new Repo instance
149 :param path:
150 the path to either the root git directory or the bare git repo::
152 repo = Repo("/Users/mtrier/Development/git-python")
153 repo = Repo("/Users/mtrier/Development/git-python.git")
154 repo = Repo("~/Development/git-python.git")
155 repo = Repo("$REPOSITORIES/Development/git-python.git")
156 repo = Repo("C:\\Users\\mtrier\\Development\\git-python\\.git")
158 - In *Cygwin*, path may be a `'cygdrive/...'` prefixed path.
159 - If it evaluates to false, :envvar:`GIT_DIR` is used, and if this also evals to false,
160 the current-directory is used.
161 :param odbt:
162 Object DataBase type - a type which is constructed by providing
163 the directory containing the database objects, i.e. .git/objects. It will
164 be used to access all object data
165 :param search_parent_directories:
166 if True, all parent directories will be searched for a valid repo as well.
168 Please note that this was the default behaviour in older versions of GitPython,
169 which is considered a bug though.
170 :raise InvalidGitRepositoryError:
171 :raise NoSuchPathError:
172 :return: git.Repo"""
174 epath = path or os.getenv("GIT_DIR")
175 if not epath:
176 epath = os.getcwd()
177 if Git.is_cygwin():
178 # Given how the tests are written, this seems more likely to catch
179 # Cygwin git used from Windows than Windows git used from Cygwin.
180 # Therefore changing to Cygwin-style paths is the relevant operation.
181 epath = cygpath(epath)
183 epath = epath or path or os.getcwd()
184 if not isinstance(epath, str):
185 epath = str(epath)
186 if expand_vars and re.search(self.re_envvars, epath):
187 warnings.warn(
188 "The use of environment variables in paths is deprecated"
189 + "\nfor security reasons and may be removed in the future!!"
190 )
191 epath = expand_path(epath, expand_vars)
192 if epath is not None:
193 if not os.path.exists(epath):
194 raise NoSuchPathError(epath)
196 ## Walk up the path to find the `.git` dir.
197 #
198 curpath = epath
199 while curpath:
200 # ABOUT osp.NORMPATH
201 # It's important to normalize the paths, as submodules will otherwise initialize their
202 # repo instances with paths that depend on path-portions that will not exist after being
203 # removed. It's just cleaner.
204 if is_git_dir(curpath):
205 self.git_dir = curpath
206 # from man git-config : core.worktree
207 # Set the path to the root of the working tree. If GIT_COMMON_DIR environment
208 # variable is set, core.worktree is ignored and not used for determining the
209 # root of working tree. This can be overridden by the GIT_WORK_TREE environment
210 # variable. The value can be an absolute path or relative to the path to the .git
211 # directory, which is either specified by GIT_DIR, or automatically discovered.
212 # If GIT_DIR is specified but none of GIT_WORK_TREE and core.worktree is specified,
213 # the current working directory is regarded as the top level of your working tree.
214 self._working_tree_dir = os.path.dirname(self.git_dir)
215 if os.environ.get("GIT_COMMON_DIR") is None:
216 gitconf = self.config_reader("repository")
217 if gitconf.has_option("core", "worktree"):
218 self._working_tree_dir = gitconf.get("core", "worktree")
219 if "GIT_WORK_TREE" in os.environ:
220 self._working_tree_dir = os.getenv("GIT_WORK_TREE")
221 break
223 dotgit = osp.join(curpath, ".git")
224 sm_gitpath = find_submodule_git_dir(dotgit)
225 if sm_gitpath is not None:
226 self.git_dir = osp.normpath(sm_gitpath)
228 sm_gitpath = find_submodule_git_dir(dotgit)
229 if sm_gitpath is None:
230 sm_gitpath = find_worktree_git_dir(dotgit)
232 if sm_gitpath is not None:
233 self.git_dir = expand_path(sm_gitpath, expand_vars)
234 self._working_tree_dir = curpath
235 break
237 if not search_parent_directories:
238 break
239 curpath, tail = osp.split(curpath)
240 if not tail:
241 break
242 # END while curpath
244 if self.git_dir is None:
245 raise InvalidGitRepositoryError(epath)
247 self._bare = False
248 try:
249 self._bare = self.config_reader("repository").getboolean("core", "bare")
250 except Exception:
251 # lets not assume the option exists, although it should
252 pass
254 try:
255 common_dir = open(osp.join(self.git_dir, "commondir"), "rt").readlines()[0].strip()
256 self._common_dir = osp.join(self.git_dir, common_dir)
257 except OSError:
258 self._common_dir = ""
260 # adjust the wd in case we are actually bare - we didn't know that
261 # in the first place
262 if self._bare:
263 self._working_tree_dir = None
264 # END working dir handling
266 self.working_dir: Optional[PathLike] = self._working_tree_dir or self.common_dir
267 self.git = self.GitCommandWrapperType(self.working_dir)
269 # special handling, in special times
270 rootpath = osp.join(self.common_dir, "objects")
271 if issubclass(odbt, GitCmdObjectDB):
272 self.odb = odbt(rootpath, self.git)
273 else:
274 self.odb = odbt(rootpath)
276 def __enter__(self) -> "Repo":
277 return self
279 def __exit__(self, *args: Any) -> None:
280 self.close()
282 def __del__(self) -> None:
283 try:
284 self.close()
285 except Exception:
286 pass
288 def close(self) -> None:
289 if self.git:
290 self.git.clear_cache()
291 # Tempfiles objects on Windows are holding references to
292 # open files until they are collected by the garbage
293 # collector, thus preventing deletion.
294 # TODO: Find these references and ensure they are closed
295 # and deleted synchronously rather than forcing a gc
296 # collection.
297 if is_win:
298 gc.collect()
299 gitdb.util.mman.collect()
300 if is_win:
301 gc.collect()
303 def __eq__(self, rhs: object) -> bool:
304 if isinstance(rhs, Repo) and self.git_dir:
305 return self.git_dir == rhs.git_dir
306 return False
308 def __ne__(self, rhs: object) -> bool:
309 return not self.__eq__(rhs)
311 def __hash__(self) -> int:
312 return hash(self.git_dir)
314 # Description property
315 def _get_description(self) -> str:
316 if self.git_dir:
317 filename = osp.join(self.git_dir, "description")
318 with open(filename, "rb") as fp:
319 return fp.read().rstrip().decode(defenc)
321 def _set_description(self, descr: str) -> None:
322 if self.git_dir:
323 filename = osp.join(self.git_dir, "description")
324 with open(filename, "wb") as fp:
325 fp.write((descr + "\n").encode(defenc))
327 description = property(_get_description, _set_description, doc="the project's description")
328 del _get_description
329 del _set_description
331 @property
332 def working_tree_dir(self) -> Optional[PathLike]:
333 """:return: The working tree directory of our git repository. If this is a bare repository, None is returned."""
334 return self._working_tree_dir
336 @property
337 def common_dir(self) -> PathLike:
338 """
339 :return: The git dir that holds everything except possibly HEAD,
340 FETCH_HEAD, ORIG_HEAD, COMMIT_EDITMSG, index, and logs/."""
341 if self._common_dir:
342 return self._common_dir
343 elif self.git_dir:
344 return self.git_dir
345 else:
346 # or could return ""
347 raise InvalidGitRepositoryError()
349 @property
350 def bare(self) -> bool:
351 """:return: True if the repository is bare"""
352 return self._bare
354 @property
355 def heads(self) -> "IterableList[Head]":
356 """A list of ``Head`` objects representing the branch heads in
357 this repo
359 :return: ``git.IterableList(Head, ...)``"""
360 return Head.list_items(self)
362 @property
363 def references(self) -> "IterableList[Reference]":
364 """A list of Reference objects representing tags, heads and remote references.
366 :return: IterableList(Reference, ...)"""
367 return Reference.list_items(self)
369 # alias for references
370 refs = references
372 # alias for heads
373 branches = heads
375 @property
376 def index(self) -> "IndexFile":
377 """:return: IndexFile representing this repository's index.
378 :note: This property can be expensive, as the returned ``IndexFile`` will be
379 reinitialized. It's recommended to re-use the object."""
380 return IndexFile(self)
382 @property
383 def head(self) -> "HEAD":
384 """:return: HEAD Object pointing to the current head reference"""
385 return HEAD(self, "HEAD")
387 @property
388 def remotes(self) -> "IterableList[Remote]":
389 """A list of Remote objects allowing to access and manipulate remotes
390 :return: ``git.IterableList(Remote, ...)``"""
391 return Remote.list_items(self)
393 def remote(self, name: str = "origin") -> "Remote":
394 """:return: Remote with the specified name
395 :raise ValueError: if no remote with such a name exists"""
396 r = Remote(self, name)
397 if not r.exists():
398 raise ValueError("Remote named '%s' didn't exist" % name)
399 return r
401 # { Submodules
403 @property
404 def submodules(self) -> "IterableList[Submodule]":
405 """
406 :return: git.IterableList(Submodule, ...) of direct submodules
407 available from the current head"""
408 return Submodule.list_items(self)
410 def submodule(self, name: str) -> "Submodule":
411 """:return: Submodule with the given name
412 :raise ValueError: If no such submodule exists"""
413 try:
414 return self.submodules[name]
415 except IndexError as e:
416 raise ValueError("Didn't find submodule named %r" % name) from e
417 # END exception handling
419 def create_submodule(self, *args: Any, **kwargs: Any) -> Submodule:
420 """Create a new submodule
422 :note: See the documentation of Submodule.add for a description of the
423 applicable parameters
424 :return: created submodules"""
425 return Submodule.add(self, *args, **kwargs)
427 def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]:
428 """An iterator yielding Submodule instances, see Traversable interface
429 for a description of args and kwargs
430 :return: Iterator"""
431 return RootModule(self).traverse(*args, **kwargs)
433 def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]:
434 """Update the submodules, keeping the repository consistent as it will
435 take the previous state into consideration. For more information, please
436 see the documentation of RootModule.update"""
437 return RootModule(self).update(*args, **kwargs)
439 # }END submodules
441 @property
442 def tags(self) -> "IterableList[TagReference]":
443 """A list of ``Tag`` objects that are available in this repo
444 :return: ``git.IterableList(TagReference, ...)``"""
445 return TagReference.list_items(self)
447 def tag(self, path: PathLike) -> TagReference:
448 """:return: TagReference Object, reference pointing to a Commit or Tag
449 :param path: path to the tag reference, i.e. 0.1.5 or tags/0.1.5"""
450 full_path = self._to_full_tag_path(path)
451 return TagReference(self, full_path)
453 @staticmethod
454 def _to_full_tag_path(path: PathLike) -> str:
455 path_str = str(path)
456 if path_str.startswith(TagReference._common_path_default + "/"):
457 return path_str
458 if path_str.startswith(TagReference._common_default + "/"):
459 return Reference._common_path_default + "/" + path_str
460 else:
461 return TagReference._common_path_default + "/" + path_str
463 def create_head(
464 self,
465 path: PathLike,
466 commit: Union["SymbolicReference", "str"] = "HEAD",
467 force: bool = False,
468 logmsg: Optional[str] = None,
469 ) -> "Head":
470 """Create a new head within the repository.
471 For more documentation, please see the Head.create method.
473 :return: newly created Head Reference"""
474 return Head.create(self, path, commit, logmsg, force)
476 def delete_head(self, *heads: "Union[str, Head]", **kwargs: Any) -> None:
477 """Delete the given heads
479 :param kwargs: Additional keyword arguments to be passed to git-branch"""
480 return Head.delete(self, *heads, **kwargs)
482 def create_tag(
483 self,
484 path: PathLike,
485 ref: str = "HEAD",
486 message: Optional[str] = None,
487 force: bool = False,
488 **kwargs: Any,
489 ) -> TagReference:
490 """Create a new tag reference.
491 For more documentation, please see the TagReference.create method.
493 :return: TagReference object"""
494 return TagReference.create(self, path, ref, message, force, **kwargs)
496 def delete_tag(self, *tags: TagReference) -> None:
497 """Delete the given tag references"""
498 return TagReference.delete(self, *tags)
500 def create_remote(self, name: str, url: str, **kwargs: Any) -> Remote:
501 """Create a new remote.
503 For more information, please see the documentation of the Remote.create
504 methods
506 :return: Remote reference"""
507 return Remote.create(self, name, url, **kwargs)
509 def delete_remote(self, remote: "Remote") -> str:
510 """Delete the given remote."""
511 return Remote.remove(self, remote)
513 def _get_config_path(self, config_level: Lit_config_levels) -> str:
514 # we do not support an absolute path of the gitconfig on windows ,
515 # use the global config instead
516 if is_win and config_level == "system":
517 config_level = "global"
519 if config_level == "system":
520 return "/etc/gitconfig"
521 elif config_level == "user":
522 config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config")
523 return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config")))
524 elif config_level == "global":
525 return osp.normpath(osp.expanduser("~/.gitconfig"))
526 elif config_level == "repository":
527 repo_dir = self._common_dir or self.git_dir
528 if not repo_dir:
529 raise NotADirectoryError
530 else:
531 return osp.normpath(osp.join(repo_dir, "config"))
532 else:
534 assert_never(
535 config_level, # type:ignore[unreachable]
536 ValueError(f"Invalid configuration level: {config_level!r}"),
537 )
539 def config_reader(
540 self,
541 config_level: Optional[Lit_config_levels] = None,
542 ) -> GitConfigParser:
543 """
544 :return:
545 GitConfigParser allowing to read the full git configuration, but not to write it
547 The configuration will include values from the system, user and repository
548 configuration files.
550 :param config_level:
551 For possible values, see config_writer method
552 If None, all applicable levels will be used. Specify a level in case
553 you know which file you wish to read to prevent reading multiple files.
554 :note: On windows, system configuration cannot currently be read as the path is
555 unknown, instead the global path will be used."""
556 files = None
557 if config_level is None:
558 files = [
559 self._get_config_path(cast(Lit_config_levels, f))
560 for f in self.config_level
561 if cast(Lit_config_levels, f)
562 ]
563 else:
564 files = [self._get_config_path(config_level)]
565 return GitConfigParser(files, read_only=True, repo=self)
567 def config_writer(self, config_level: Lit_config_levels = "repository") -> GitConfigParser:
568 """
569 :return:
570 GitConfigParser allowing to write values of the specified configuration file level.
571 Config writers should be retrieved, used to change the configuration, and written
572 right away as they will lock the configuration file in question and prevent other's
573 to write it.
575 :param config_level:
576 One of the following values
577 system = system wide configuration file
578 global = user level configuration file
579 repository = configuration file for this repository only"""
580 return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self)
582 def commit(self, rev: Union[str, Commit_ish, None] = None) -> Commit:
583 """The Commit object for the specified revision
585 :param rev: revision specifier, see git-rev-parse for viable options.
586 :return: ``git.Commit``
587 """
588 if rev is None:
589 return self.head.commit
590 return self.rev_parse(str(rev) + "^0")
592 def iter_trees(self, *args: Any, **kwargs: Any) -> Iterator["Tree"]:
593 """:return: Iterator yielding Tree objects
594 :note: Takes all arguments known to iter_commits method"""
595 return (c.tree for c in self.iter_commits(*args, **kwargs))
597 def tree(self, rev: Union[Tree_ish, str, None] = None) -> "Tree":
598 """The Tree object for the given treeish revision
599 Examples::
601 repo.tree(repo.heads[0])
603 :param rev: is a revision pointing to a Treeish ( being a commit or tree )
604 :return: ``git.Tree``
606 :note:
607 If you need a non-root level tree, find it by iterating the root tree. Otherwise
608 it cannot know about its path relative to the repository root and subsequent
609 operations might have unexpected results."""
610 if rev is None:
611 return self.head.commit.tree
612 return self.rev_parse(str(rev) + "^{tree}")
614 def iter_commits(
615 self,
616 rev: Union[str, Commit, "SymbolicReference", None] = None,
617 paths: Union[PathLike, Sequence[PathLike]] = "",
618 **kwargs: Any,
619 ) -> Iterator[Commit]:
620 """A list of Commit objects representing the history of a given ref/commit
622 :param rev:
623 revision specifier, see git-rev-parse for viable options.
624 If None, the active branch will be used.
626 :param paths:
627 is an optional path or a list of paths; if set only commits that include the path
628 or paths will be returned
630 :param kwargs:
631 Arguments to be passed to git-rev-list - common ones are
632 max_count and skip
634 :note: to receive only commits between two named revisions, use the
635 "revA...revB" revision specifier
637 :return: ``git.Commit[]``"""
638 if rev is None:
639 rev = self.head.commit
641 return Commit.iter_items(self, rev, paths, **kwargs)
643 def merge_base(self, *rev: TBD, **kwargs: Any) -> List[Union[Commit_ish, None]]:
644 """Find the closest common ancestor for the given revision (e.g. Commits, Tags, References, etc)
646 :param rev: At least two revs to find the common ancestor for.
647 :param kwargs: Additional arguments to be passed to the repo.git.merge_base() command which does all the work.
648 :return: A list of Commit objects. If --all was not specified as kwarg, the list will have at max one Commit,
649 or is empty if no common merge base exists.
650 :raises ValueError: If not at least two revs are provided
651 """
652 if len(rev) < 2:
653 raise ValueError("Please specify at least two revs, got only %i" % len(rev))
654 # end handle input
656 res: List[Union[Commit_ish, None]] = []
657 try:
658 lines = self.git.merge_base(*rev, **kwargs).splitlines() # List[str]
659 except GitCommandError as err:
660 if err.status == 128:
661 raise
662 # end handle invalid rev
663 # Status code 1 is returned if there is no merge-base
664 # (see https://github.com/git/git/blob/master/builtin/merge-base.c#L16)
665 return res
666 # end exception handling
668 for line in lines:
669 res.append(self.commit(line))
670 # end for each merge-base
672 return res
674 def is_ancestor(self, ancestor_rev: "Commit", rev: "Commit") -> bool:
675 """Check if a commit is an ancestor of another
677 :param ancestor_rev: Rev which should be an ancestor
678 :param rev: Rev to test against ancestor_rev
679 :return: ``True``, ancestor_rev is an ancestor to rev.
680 """
681 try:
682 self.git.merge_base(ancestor_rev, rev, is_ancestor=True)
683 except GitCommandError as err:
684 if err.status == 1:
685 return False
686 raise
687 return True
689 def is_valid_object(self, sha: str, object_type: Union[str, None] = None) -> bool:
690 try:
691 complete_sha = self.odb.partial_to_complete_sha_hex(sha)
692 object_info = self.odb.info(complete_sha)
693 if object_type:
694 if object_info.type == object_type.encode():
695 return True
696 else:
697 log.debug(
698 "Commit hash points to an object of type '%s'. Requested were objects of type '%s'",
699 object_info.type.decode(),
700 object_type,
701 )
702 return False
703 else:
704 return True
705 except BadObject:
706 log.debug("Commit hash is invalid.")
707 return False
709 def _get_daemon_export(self) -> bool:
710 if self.git_dir:
711 filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE)
712 return osp.exists(filename)
714 def _set_daemon_export(self, value: object) -> None:
715 if self.git_dir:
716 filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE)
717 fileexists = osp.exists(filename)
718 if value and not fileexists:
719 touch(filename)
720 elif not value and fileexists:
721 os.unlink(filename)
723 daemon_export = property(
724 _get_daemon_export,
725 _set_daemon_export,
726 doc="If True, git-daemon may export this repository",
727 )
728 del _get_daemon_export
729 del _set_daemon_export
731 def _get_alternates(self) -> List[str]:
732 """The list of alternates for this repo from which objects can be retrieved
734 :return: list of strings being pathnames of alternates"""
735 if self.git_dir:
736 alternates_path = osp.join(self.git_dir, "objects", "info", "alternates")
738 if osp.exists(alternates_path):
739 with open(alternates_path, "rb") as f:
740 alts = f.read().decode(defenc)
741 return alts.strip().splitlines()
742 return []
744 def _set_alternates(self, alts: List[str]) -> None:
745 """Sets the alternates
747 :param alts:
748 is the array of string paths representing the alternates at which
749 git should look for objects, i.e. /home/user/repo/.git/objects
751 :raise NoSuchPathError:
752 :note:
753 The method does not check for the existence of the paths in alts
754 as the caller is responsible."""
755 alternates_path = osp.join(self.common_dir, "objects", "info", "alternates")
756 if not alts:
757 if osp.isfile(alternates_path):
758 os.remove(alternates_path)
759 else:
760 with open(alternates_path, "wb") as f:
761 f.write("\n".join(alts).encode(defenc))
763 alternates = property(
764 _get_alternates,
765 _set_alternates,
766 doc="Retrieve a list of alternates paths or set a list paths to be used as alternates",
767 )
769 def is_dirty(
770 self,
771 index: bool = True,
772 working_tree: bool = True,
773 untracked_files: bool = False,
774 submodules: bool = True,
775 path: Optional[PathLike] = None,
776 ) -> bool:
777 """
778 :return:
779 ``True``, the repository is considered dirty. By default it will react
780 like a git-status without untracked files, hence it is dirty if the
781 index or the working copy have changes."""
782 if self._bare:
783 # Bare repositories with no associated working directory are
784 # always considered to be clean.
785 return False
787 # start from the one which is fastest to evaluate
788 default_args = ["--abbrev=40", "--full-index", "--raw"]
789 if not submodules:
790 default_args.append("--ignore-submodules")
791 if path:
792 default_args.extend(["--", str(path)])
793 if index:
794 # diff index against HEAD
795 if osp.isfile(self.index.path) and len(self.git.diff("--cached", *default_args)):
796 return True
797 # END index handling
798 if working_tree:
799 # diff index against working tree
800 if len(self.git.diff(*default_args)):
801 return True
802 # END working tree handling
803 if untracked_files:
804 if len(self._get_untracked_files(path, ignore_submodules=not submodules)):
805 return True
806 # END untracked files
807 return False
809 @property
810 def untracked_files(self) -> List[str]:
811 """
812 :return:
813 list(str,...)
815 Files currently untracked as they have not been staged yet. Paths
816 are relative to the current working directory of the git command.
818 :note:
819 ignored files will not appear here, i.e. files mentioned in .gitignore
820 :note:
821 This property is expensive, as no cache is involved. To process the result, please
822 consider caching it yourself."""
823 return self._get_untracked_files()
825 def _get_untracked_files(self, *args: Any, **kwargs: Any) -> List[str]:
826 # make sure we get all files, not only untracked directories
827 proc = self.git.status(*args, porcelain=True, untracked_files=True, as_process=True, **kwargs)
828 # Untracked files prefix in porcelain mode
829 prefix = "?? "
830 untracked_files = []
831 for line in proc.stdout:
832 line = line.decode(defenc)
833 if not line.startswith(prefix):
834 continue
835 filename = line[len(prefix) :].rstrip("\n")
836 # Special characters are escaped
837 if filename[0] == filename[-1] == '"':
838 filename = filename[1:-1]
839 # WHATEVER ... it's a mess, but works for me
840 filename = filename.encode("ascii").decode("unicode_escape").encode("latin1").decode(defenc)
841 untracked_files.append(filename)
842 finalize_process(proc)
843 return untracked_files
845 def ignored(self, *paths: PathLike) -> List[str]:
846 """Checks if paths are ignored via .gitignore
847 Doing so using the "git check-ignore" method.
849 :param paths: List of paths to check whether they are ignored or not
850 :return: subset of those paths which are ignored
851 """
852 try:
853 proc: str = self.git.check_ignore(*paths)
854 except GitCommandError:
855 return []
856 return proc.replace("\\\\", "\\").replace('"', "").split("\n")
858 @property
859 def active_branch(self) -> Head:
860 """The name of the currently active branch.
862 :raises TypeError: If HEAD is detached
863 :return: Head to the active branch"""
864 # reveal_type(self.head.reference) # => Reference
865 return self.head.reference
867 def blame_incremental(self, rev: str | HEAD, file: str, **kwargs: Any) -> Iterator["BlameEntry"]:
868 """Iterator for blame information for the given file at the given revision.
870 Unlike .blame(), this does not return the actual file's contents, only
871 a stream of BlameEntry tuples.
873 :param rev: revision specifier, see git-rev-parse for viable options.
874 :return: lazy iterator of BlameEntry tuples, where the commit
875 indicates the commit to blame for the line, and range
876 indicates a span of line numbers in the resulting file.
878 If you combine all line number ranges outputted by this command, you
879 should get a continuous range spanning all line numbers in the file.
880 """
882 data: bytes = self.git.blame(rev, "--", file, p=True, incremental=True, stdout_as_string=False, **kwargs)
883 commits: Dict[bytes, Commit] = {}
885 stream = (line for line in data.split(b"\n") if line)
886 while True:
887 try:
888 line = next(stream) # when exhausted, causes a StopIteration, terminating this function
889 except StopIteration:
890 return
891 split_line = line.split()
892 hexsha, orig_lineno_b, lineno_b, num_lines_b = split_line
893 lineno = int(lineno_b)
894 num_lines = int(num_lines_b)
895 orig_lineno = int(orig_lineno_b)
896 if hexsha not in commits:
897 # Now read the next few lines and build up a dict of properties
898 # for this commit
899 props: Dict[bytes, bytes] = {}
900 while True:
901 try:
902 line = next(stream)
903 except StopIteration:
904 return
905 if line == b"boundary":
906 # "boundary" indicates a root commit and occurs
907 # instead of the "previous" tag
908 continue
910 tag, value = line.split(b" ", 1)
911 props[tag] = value
912 if tag == b"filename":
913 # "filename" formally terminates the entry for --incremental
914 orig_filename = value
915 break
917 c = Commit(
918 self,
919 hex_to_bin(hexsha),
920 author=Actor(
921 safe_decode(props[b"author"]),
922 safe_decode(props[b"author-mail"].lstrip(b"<").rstrip(b">")),
923 ),
924 authored_date=int(props[b"author-time"]),
925 committer=Actor(
926 safe_decode(props[b"committer"]),
927 safe_decode(props[b"committer-mail"].lstrip(b"<").rstrip(b">")),
928 ),
929 committed_date=int(props[b"committer-time"]),
930 )
931 commits[hexsha] = c
932 else:
933 # Discard all lines until we find "filename" which is
934 # guaranteed to be the last line
935 while True:
936 try:
937 line = next(stream) # will fail if we reach the EOF unexpectedly
938 except StopIteration:
939 return
940 tag, value = line.split(b" ", 1)
941 if tag == b"filename":
942 orig_filename = value
943 break
945 yield BlameEntry(
946 commits[hexsha],
947 range(lineno, lineno + num_lines),
948 safe_decode(orig_filename),
949 range(orig_lineno, orig_lineno + num_lines),
950 )
952 def blame(
953 self,
954 rev: Union[str, HEAD],
955 file: str,
956 incremental: bool = False,
957 rev_opts: Optional[List[str]] = None,
958 **kwargs: Any
959 ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None:
960 """The blame information for the given file at the given revision.
962 :param rev: revision specifier, see git-rev-parse for viable options.
963 :return:
964 list: [git.Commit, list: [<line>]]
965 A list of lists associating a Commit object with a list of lines that
966 changed within the given commit. The Commit objects will be given in order
967 of appearance."""
968 if incremental:
969 return self.blame_incremental(rev, file, **kwargs)
970 rev_opts = rev_opts or []
971 data: bytes = self.git.blame(rev, *rev_opts, "--", file, p=True, stdout_as_string=False, **kwargs)
972 commits: Dict[str, Commit] = {}
973 blames: List[List[Commit | List[str | bytes] | None]] = []
975 class InfoTD(TypedDict, total=False):
976 sha: str
977 id: str
978 filename: str
979 summary: str
980 author: str
981 author_email: str
982 author_date: int
983 committer: str
984 committer_email: str
985 committer_date: int
987 info: InfoTD = {}
989 keepends = True
990 for line_bytes in data.splitlines(keepends):
991 try:
992 line_str = line_bytes.rstrip().decode(defenc)
993 except UnicodeDecodeError:
994 firstpart = ""
995 parts = []
996 is_binary = True
997 else:
998 # As we don't have an idea when the binary data ends, as it could contain multiple newlines
999 # in the process. So we rely on being able to decode to tell us what is is.
1000 # This can absolutely fail even on text files, but even if it does, we should be fine treating it
1001 # as binary instead
1002 parts = self.re_whitespace.split(line_str, 1)
1003 firstpart = parts[0]
1004 is_binary = False
1005 # end handle decode of line
1007 if self.re_hexsha_only.search(firstpart):
1008 # handles
1009 # 634396b2f541a9f2d58b00be1a07f0c358b999b3 1 1 7 - indicates blame-data start
1010 # 634396b2f541a9f2d58b00be1a07f0c358b999b3 2 2 - indicates
1011 # another line of blame with the same data
1012 digits = parts[-1].split(" ")
1013 if len(digits) == 3:
1014 info = {"id": firstpart}
1015 blames.append([None, []])
1016 elif info["id"] != firstpart:
1017 info = {"id": firstpart}
1018 blames.append([commits.get(firstpart), []])
1019 # END blame data initialization
1020 else:
1021 m = self.re_author_committer_start.search(firstpart)
1022 if m:
1023 # handles:
1024 # author Tom Preston-Werner
1025 # author-mail <tom@mojombo.com>
1026 # author-time 1192271832
1027 # author-tz -0700
1028 # committer Tom Preston-Werner
1029 # committer-mail <tom@mojombo.com>
1030 # committer-time 1192271832
1031 # committer-tz -0700 - IGNORED BY US
1032 role = m.group(0)
1033 if role == "author":
1034 if firstpart.endswith("-mail"):
1035 info["author_email"] = parts[-1]
1036 elif firstpart.endswith("-time"):
1037 info["author_date"] = int(parts[-1])
1038 elif role == firstpart:
1039 info["author"] = parts[-1]
1040 elif role == "committer":
1041 if firstpart.endswith("-mail"):
1042 info["committer_email"] = parts[-1]
1043 elif firstpart.endswith("-time"):
1044 info["committer_date"] = int(parts[-1])
1045 elif role == firstpart:
1046 info["committer"] = parts[-1]
1047 # END distinguish mail,time,name
1048 else:
1049 # handle
1050 # filename lib/grit.rb
1051 # summary add Blob
1052 # <and rest>
1053 if firstpart.startswith("filename"):
1054 info["filename"] = parts[-1]
1055 elif firstpart.startswith("summary"):
1056 info["summary"] = parts[-1]
1057 elif firstpart == "":
1058 if info:
1059 sha = info["id"]
1060 c = commits.get(sha)
1061 if c is None:
1062 c = Commit(
1063 self,
1064 hex_to_bin(sha),
1065 author=Actor._from_string(f"{info['author']} {info['author_email']}"),
1066 authored_date=info["author_date"],
1067 committer=Actor._from_string(f"{info['committer']} {info['committer_email']}"),
1068 committed_date=info["committer_date"],
1069 )
1070 commits[sha] = c
1071 blames[-1][0] = c
1072 # END if commit objects needs initial creation
1074 if blames[-1][1] is not None:
1075 line: str | bytes
1076 if not is_binary:
1077 if line_str and line_str[0] == "\t":
1078 line_str = line_str[1:]
1079 line = line_str
1080 else:
1081 line = line_bytes
1082 # NOTE: We are actually parsing lines out of binary data, which can lead to the
1083 # binary being split up along the newline separator. We will append this to the
1084 # blame we are currently looking at, even though it should be concatenated with
1085 # the last line we have seen.
1086 blames[-1][1].append(line)
1088 info = {"id": sha}
1089 # END if we collected commit info
1090 # END distinguish filename,summary,rest
1091 # END distinguish author|committer vs filename,summary,rest
1092 # END distinguish hexsha vs other information
1093 return blames
1095 @classmethod
1096 def init(
1097 cls,
1098 path: Union[PathLike, None] = None,
1099 mkdir: bool = True,
1100 odbt: Type[GitCmdObjectDB] = GitCmdObjectDB,
1101 expand_vars: bool = True,
1102 **kwargs: Any,
1103 ) -> "Repo":
1104 """Initialize a git repository at the given path if specified
1106 :param path:
1107 is the full path to the repo (traditionally ends with /<name>.git)
1108 or None in which case the repository will be created in the current
1109 working directory
1111 :param mkdir:
1112 if specified will create the repository directory if it doesn't
1113 already exists. Creates the directory with a mode=0755.
1114 Only effective if a path is explicitly given
1116 :param odbt:
1117 Object DataBase type - a type which is constructed by providing
1118 the directory containing the database objects, i.e. .git/objects.
1119 It will be used to access all object data
1121 :param expand_vars:
1122 if specified, environment variables will not be escaped. This
1123 can lead to information disclosure, allowing attackers to
1124 access the contents of environment variables
1126 :param kwargs:
1127 keyword arguments serving as additional options to the git-init command
1129 :return: ``git.Repo`` (the newly created repo)"""
1130 if path:
1131 path = expand_path(path, expand_vars)
1132 if mkdir and path and not osp.exists(path):
1133 os.makedirs(path, 0o755)
1135 # git command automatically chdir into the directory
1136 git = cls.GitCommandWrapperType(path)
1137 git.init(**kwargs)
1138 return cls(path, odbt=odbt)
1140 @classmethod
1141 def _clone(
1142 cls,
1143 git: "Git",
1144 url: PathLike,
1145 path: PathLike,
1146 odb_default_type: Type[GitCmdObjectDB],
1147 progress: Union["RemoteProgress", "UpdateProgress", Callable[..., "RemoteProgress"], None] = None,
1148 multi_options: Optional[List[str]] = None,
1149 **kwargs: Any,
1150 ) -> "Repo":
1151 odbt = kwargs.pop("odbt", odb_default_type)
1153 # when pathlib.Path or other classbased path is passed
1154 if not isinstance(path, str):
1155 path = str(path)
1157 ## A bug win cygwin's Git, when `--bare` or `--separate-git-dir`
1158 # it prepends the cwd or(?) the `url` into the `path, so::
1159 # git clone --bare /cygwin/d/foo.git C:\\Work
1160 # becomes::
1161 # git clone --bare /cygwin/d/foo.git /cygwin/d/C:\\Work
1162 #
1163 clone_path = Git.polish_url(path) if Git.is_cygwin() and "bare" in kwargs else path
1164 sep_dir = kwargs.get("separate_git_dir")
1165 if sep_dir:
1166 kwargs["separate_git_dir"] = Git.polish_url(sep_dir)
1167 multi = None
1168 if multi_options:
1169 multi = shlex.split(" ".join(multi_options))
1170 proc = git.clone(
1171 multi,
1172 Git.polish_url(str(url)),
1173 clone_path,
1174 with_extended_output=True,
1175 as_process=True,
1176 v=True,
1177 universal_newlines=True,
1178 **add_progress(kwargs, git, progress),
1179 )
1180 if progress:
1181 handle_process_output(
1182 proc,
1183 None,
1184 to_progress_instance(progress).new_message_handler(),
1185 finalize_process,
1186 decode_streams=False,
1187 )
1188 else:
1189 (stdout, stderr) = proc.communicate()
1190 cmdline = getattr(proc, "args", "")
1191 cmdline = remove_password_if_present(cmdline)
1193 log.debug("Cmd(%s)'s unused stdout: %s", cmdline, stdout)
1194 finalize_process(proc, stderr=stderr)
1196 # our git command could have a different working dir than our actual
1197 # environment, hence we prepend its working dir if required
1198 if not osp.isabs(path):
1199 path = osp.join(git._working_dir, path) if git._working_dir is not None else path
1201 repo = cls(path, odbt=odbt)
1203 # retain env values that were passed to _clone()
1204 repo.git.update_environment(**git.environment())
1206 # adjust remotes - there may be operating systems which use backslashes,
1207 # These might be given as initial paths, but when handling the config file
1208 # that contains the remote from which we were clones, git stops liking it
1209 # as it will escape the backslashes. Hence we undo the escaping just to be
1210 # sure
1211 if repo.remotes:
1212 with repo.remotes[0].config_writer as writer:
1213 writer.set_value("url", Git.polish_url(repo.remotes[0].url))
1214 # END handle remote repo
1215 return repo
1217 def clone(
1218 self,
1219 path: PathLike,
1220 progress: Optional[Callable] = None,
1221 multi_options: Optional[List[str]] = None,
1222 **kwargs: Any,
1223 ) -> "Repo":
1224 """Create a clone from this repository.
1226 :param path: is the full path of the new repo (traditionally ends with ./<name>.git).
1227 :param progress: See 'git.remote.Remote.push'.
1228 :param multi_options: A list of Clone options that can be provided multiple times. One
1229 option per list item which is passed exactly as specified to clone.
1230 For example ['--config core.filemode=false', '--config core.ignorecase',
1231 '--recurse-submodule=repo1_path', '--recurse-submodule=repo2_path']
1232 :param kwargs:
1233 * odbt = ObjectDatabase Type, allowing to determine the object database
1234 implementation used by the returned Repo instance
1235 * All remaining keyword arguments are given to the git-clone command
1237 :return: ``git.Repo`` (the newly cloned repo)"""
1238 return self._clone(
1239 self.git,
1240 self.common_dir,
1241 path,
1242 type(self.odb),
1243 progress,
1244 multi_options,
1245 **kwargs,
1246 )
1248 @classmethod
1249 def clone_from(
1250 cls,
1251 url: PathLike,
1252 to_path: PathLike,
1253 progress: Optional[Callable] = None,
1254 env: Optional[Mapping[str, str]] = None,
1255 multi_options: Optional[List[str]] = None,
1256 **kwargs: Any,
1257 ) -> "Repo":
1258 """Create a clone from the given URL
1260 :param url: valid git url, see http://www.kernel.org/pub/software/scm/git/docs/git-clone.html#URLS
1261 :param to_path: Path to which the repository should be cloned to
1262 :param progress: See 'git.remote.Remote.push'.
1263 :param env: Optional dictionary containing the desired environment variables.
1264 Note: Provided variables will be used to update the execution
1265 environment for `git`. If some variable is not specified in `env`
1266 and is defined in `os.environ`, value from `os.environ` will be used.
1267 If you want to unset some variable, consider providing empty string
1268 as its value.
1269 :param multi_options: See ``clone`` method
1270 :param kwargs: see the ``clone`` method
1271 :return: Repo instance pointing to the cloned directory"""
1272 git = cls.GitCommandWrapperType(os.getcwd())
1273 if env is not None:
1274 git.update_environment(**env)
1275 return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs)
1277 def archive(
1278 self,
1279 ostream: Union[TextIO, BinaryIO],
1280 treeish: Optional[str] = None,
1281 prefix: Optional[str] = None,
1282 **kwargs: Any,
1283 ) -> Repo:
1284 """Archive the tree at the given revision.
1286 :param ostream: file compatible stream object to which the archive will be written as bytes
1287 :param treeish: is the treeish name/id, defaults to active branch
1288 :param prefix: is the optional prefix to prepend to each filename in the archive
1289 :param kwargs: Additional arguments passed to git-archive
1291 * Use the 'format' argument to define the kind of format. Use
1292 specialized ostreams to write any format supported by python.
1293 * You may specify the special **path** keyword, which may either be a repository-relative
1294 path to a directory or file to place into the archive, or a list or tuple of multiple paths.
1296 :raise GitCommandError: in case something went wrong
1297 :return: self"""
1298 if treeish is None:
1299 treeish = self.head.commit
1300 if prefix and "prefix" not in kwargs:
1301 kwargs["prefix"] = prefix
1302 kwargs["output_stream"] = ostream
1303 path = kwargs.pop("path", [])
1304 path = cast(Union[PathLike, List[PathLike], Tuple[PathLike, ...]], path)
1305 if not isinstance(path, (tuple, list)):
1306 path = [path]
1307 # end assure paths is list
1308 self.git.archive(treeish, *path, **kwargs)
1309 return self
1311 def has_separate_working_tree(self) -> bool:
1312 """
1313 :return: True if our git_dir is not at the root of our working_tree_dir, but a .git file with a
1314 platform agnositic symbolic link. Our git_dir will be wherever the .git file points to
1315 :note: bare repositories will always return False here
1316 """
1317 if self.bare:
1318 return False
1319 if self.working_tree_dir:
1320 return osp.isfile(osp.join(self.working_tree_dir, ".git"))
1321 else:
1322 return False # or raise Error?
1324 rev_parse = rev_parse
1326 def __repr__(self) -> str:
1327 clazz = self.__class__
1328 return "<%s.%s %r>" % (clazz.__module__, clazz.__name__, self.git_dir)
1330 def currently_rebasing_on(self) -> Commit | None:
1331 """
1332 :return: The commit which is currently being replayed while rebasing.
1334 None if we are not currently rebasing.
1335 """
1336 if self.git_dir:
1337 rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD")
1338 if not osp.isfile(rebase_head_file):
1339 return None
1340 return self.commit(open(rebase_head_file, "rt").readline().strip())