Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/objects/submodule/base.py: 13%
582 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# need a dict to set bloody .name field
2from io import BytesIO
3import logging
4import os
5import stat
6import uuid
8import git
9from git.cmd import Git
10from git.compat import (
11 defenc,
12 is_win,
13)
14from git.config import SectionConstraint, GitConfigParser, cp
15from git.exc import (
16 InvalidGitRepositoryError,
17 NoSuchPathError,
18 RepositoryDirtyError,
19 BadName,
20)
21from git.objects.base import IndexObject, Object
22from git.objects.util import TraversableIterableObj
24from git.util import (
25 join_path_native,
26 to_native_path_linux,
27 RemoteProgress,
28 rmtree,
29 unbare_repo,
30 IterableList,
31)
32from git.util import HIDE_WINDOWS_KNOWN_ERRORS
34import os.path as osp
36from .util import (
37 mkhead,
38 sm_name,
39 sm_section,
40 SubmoduleConfigParser,
41 find_first_remote_branch,
42)
45# typing ----------------------------------------------------------------------
46from typing import Callable, Dict, Mapping, Sequence, TYPE_CHECKING, cast
47from typing import Any, Iterator, Union
49from git.types import Commit_ish, Literal, PathLike, TBD
51if TYPE_CHECKING: 51 ↛ 52line 51 didn't jump to line 52, because the condition on line 51 was never true
52 from git.index import IndexFile
53 from git.repo import Repo
54 from git.refs import Head
57# -----------------------------------------------------------------------------
59__all__ = ["Submodule", "UpdateProgress"]
62log = logging.getLogger("git.objects.submodule.base")
63log.addHandler(logging.NullHandler())
66class UpdateProgress(RemoteProgress):
68 """Class providing detailed progress information to the caller who should
69 derive from it and implement the ``update(...)`` message"""
71 CLONE, FETCH, UPDWKTREE = [1 << x for x in range(RemoteProgress._num_op_codes, RemoteProgress._num_op_codes + 3)]
72 _num_op_codes: int = RemoteProgress._num_op_codes + 3
74 __slots__ = ()
77BEGIN = UpdateProgress.BEGIN
78END = UpdateProgress.END
79CLONE = UpdateProgress.CLONE
80FETCH = UpdateProgress.FETCH
81UPDWKTREE = UpdateProgress.UPDWKTREE
84# IndexObject comes via util module, its a 'hacky' fix thanks to pythons import
85# mechanism which cause plenty of trouble of the only reason for packages and
86# modules is refactoring - subpackages shouldn't depend on parent packages
87class Submodule(IndexObject, TraversableIterableObj):
89 """Implements access to a git submodule. They are special in that their sha
90 represents a commit in the submodule's repository which is to be checked out
91 at the path of this instance.
92 The submodule type does not have a string type associated with it, as it exists
93 solely as a marker in the tree and index.
95 All methods work in bare and non-bare repositories."""
97 _id_attribute_ = "name"
98 k_modules_file = ".gitmodules"
99 k_head_option = "branch"
100 k_head_default = "master"
101 k_default_mode = stat.S_IFDIR | stat.S_IFLNK # submodules are directories with link-status
103 # this is a bogus type for base class compatibility
104 type: Literal["submodule"] = "submodule" # type: ignore
106 __slots__ = ("_parent_commit", "_url", "_branch_path", "_name", "__weakref__")
107 _cache_attrs = ("path", "_url", "_branch_path")
109 def __init__(
110 self,
111 repo: "Repo",
112 binsha: bytes,
113 mode: Union[int, None] = None,
114 path: Union[PathLike, None] = None,
115 name: Union[str, None] = None,
116 parent_commit: Union[Commit_ish, None] = None,
117 url: Union[str, None] = None,
118 branch_path: Union[PathLike, None] = None,
119 ) -> None:
120 """Initialize this instance with its attributes. We only document the ones
121 that differ from ``IndexObject``
123 :param repo: Our parent repository
124 :param binsha: binary sha referring to a commit in the remote repository, see url parameter
125 :param parent_commit: see set_parent_commit()
126 :param url: The url to the remote repository which is the submodule
127 :param branch_path: full (relative) path to ref to checkout when cloning the remote repository"""
128 super(Submodule, self).__init__(repo, binsha, mode, path)
129 self.size = 0
130 self._parent_commit = parent_commit
131 if url is not None:
132 self._url = url
133 if branch_path is not None:
134 # assert isinstance(branch_path, str)
135 self._branch_path = branch_path
136 if name is not None:
137 self._name = name
139 def _set_cache_(self, attr: str) -> None:
140 if attr in ("path", "_url", "_branch_path"):
141 reader: SectionConstraint = self.config_reader()
142 # default submodule values
143 try:
144 self.path = reader.get("path")
145 except cp.NoSectionError as e:
146 if self.repo.working_tree_dir is not None:
147 raise ValueError(
148 "This submodule instance does not exist anymore in '%s' file"
149 % osp.join(self.repo.working_tree_dir, ".gitmodules")
150 ) from e
151 # end
152 self._url = reader.get("url")
153 # git-python extension values - optional
154 self._branch_path = reader.get_value(self.k_head_option, git.Head.to_full_path(self.k_head_default))
155 elif attr == "_name":
156 raise AttributeError("Cannot retrieve the name of a submodule if it was not set initially")
157 else:
158 super(Submodule, self)._set_cache_(attr)
159 # END handle attribute name
161 @classmethod
162 def _get_intermediate_items(cls, item: "Submodule") -> IterableList["Submodule"]:
163 """:return: all the submodules of our module repository"""
164 try:
165 return cls.list_items(item.module())
166 except InvalidGitRepositoryError:
167 return IterableList("")
168 # END handle intermediate items
170 @classmethod
171 def _need_gitfile_submodules(cls, git: Git) -> bool:
172 return git.version_info[:3] >= (1, 7, 5)
174 def __eq__(self, other: Any) -> bool:
175 """Compare with another submodule"""
176 # we may only compare by name as this should be the ID they are hashed with
177 # Otherwise this type wouldn't be hashable
178 # return self.path == other.path and self.url == other.url and super(Submodule, self).__eq__(other)
179 return self._name == other._name
181 def __ne__(self, other: object) -> bool:
182 """Compare with another submodule for inequality"""
183 return not (self == other)
185 def __hash__(self) -> int:
186 """Hash this instance using its logical id, not the sha"""
187 return hash(self._name)
189 def __str__(self) -> str:
190 return self._name
192 def __repr__(self) -> str:
193 return "git.%s(name=%s, path=%s, url=%s, branch_path=%s)" % (
194 type(self).__name__,
195 self._name,
196 self.path,
197 self.url,
198 self.branch_path,
199 )
201 @classmethod
202 def _config_parser(
203 cls, repo: "Repo", parent_commit: Union[Commit_ish, None], read_only: bool
204 ) -> SubmoduleConfigParser:
205 """:return: Config Parser constrained to our submodule in read or write mode
206 :raise IOError: If the .gitmodules file cannot be found, either locally or in the repository
207 at the given parent commit. Otherwise the exception would be delayed until the first
208 access of the config parser"""
209 parent_matches_head = True
210 if parent_commit is not None:
211 try:
212 parent_matches_head = repo.head.commit == parent_commit
213 except ValueError:
214 # We are most likely in an empty repository, so the HEAD doesn't point to a valid ref
215 pass
216 # end handle parent_commit
217 fp_module: Union[str, BytesIO]
218 if not repo.bare and parent_matches_head and repo.working_tree_dir:
219 fp_module = osp.join(repo.working_tree_dir, cls.k_modules_file)
220 else:
221 assert parent_commit is not None, "need valid parent_commit in bare repositories"
222 try:
223 fp_module = cls._sio_modules(parent_commit)
224 except KeyError as e:
225 raise IOError(
226 "Could not find %s file in the tree of parent commit %s" % (cls.k_modules_file, parent_commit)
227 ) from e
228 # END handle exceptions
229 # END handle non-bare working tree
231 if not read_only and (repo.bare or not parent_matches_head):
232 raise ValueError("Cannot write blobs of 'historical' submodule configurations")
233 # END handle writes of historical submodules
235 return SubmoduleConfigParser(fp_module, read_only=read_only)
237 def _clear_cache(self) -> None:
238 # clear the possibly changed values
239 for name in self._cache_attrs:
240 try:
241 delattr(self, name)
242 except AttributeError:
243 pass
244 # END try attr deletion
245 # END for each name to delete
247 @classmethod
248 def _sio_modules(cls, parent_commit: Commit_ish) -> BytesIO:
249 """:return: Configuration file as BytesIO - we only access it through the respective blob's data"""
250 sio = BytesIO(parent_commit.tree[cls.k_modules_file].data_stream.read())
251 sio.name = cls.k_modules_file
252 return sio
254 def _config_parser_constrained(self, read_only: bool) -> SectionConstraint:
255 """:return: Config Parser constrained to our submodule in read or write mode"""
256 try:
257 pc: Union["Commit_ish", None] = self.parent_commit
258 except ValueError:
259 pc = None
260 # end handle empty parent repository
261 parser = self._config_parser(self.repo, pc, read_only)
262 parser.set_submodule(self)
263 return SectionConstraint(parser, sm_section(self.name))
265 @classmethod
266 def _module_abspath(cls, parent_repo: "Repo", path: PathLike, name: str) -> PathLike:
267 if cls._need_gitfile_submodules(parent_repo.git):
268 return osp.join(parent_repo.git_dir, "modules", name)
269 if parent_repo.working_tree_dir:
270 return osp.join(parent_repo.working_tree_dir, path)
271 raise NotADirectoryError()
272 # end
274 @classmethod
275 def _clone_repo(cls, repo: "Repo", url: str, path: PathLike, name: str, **kwargs: Any) -> "Repo":
276 """:return: Repo instance of newly cloned repository
277 :param repo: our parent repository
278 :param url: url to clone from
279 :param path: repository - relative path to the submodule checkout location
280 :param name: canonical of the submodule
281 :param kwrags: additinoal arguments given to git.clone"""
282 module_abspath = cls._module_abspath(repo, path, name)
283 module_checkout_path = module_abspath
284 if cls._need_gitfile_submodules(repo.git):
285 kwargs["separate_git_dir"] = module_abspath
286 module_abspath_dir = osp.dirname(module_abspath)
287 if not osp.isdir(module_abspath_dir):
288 os.makedirs(module_abspath_dir)
289 module_checkout_path = osp.join(str(repo.working_tree_dir), path)
290 # end
292 clone = git.Repo.clone_from(url, module_checkout_path, **kwargs)
293 if cls._need_gitfile_submodules(repo.git):
294 cls._write_git_file_and_module_config(module_checkout_path, module_abspath)
295 # end
296 return clone
298 @classmethod
299 def _to_relative_path(cls, parent_repo: "Repo", path: PathLike) -> PathLike:
300 """:return: a path guaranteed to be relative to the given parent - repository
301 :raise ValueError: if path is not contained in the parent repository's working tree"""
302 path = to_native_path_linux(path)
303 if path.endswith("/"):
304 path = path[:-1]
305 # END handle trailing slash
307 if osp.isabs(path) and parent_repo.working_tree_dir:
308 working_tree_linux = to_native_path_linux(parent_repo.working_tree_dir)
309 if not path.startswith(working_tree_linux):
310 raise ValueError(
311 "Submodule checkout path '%s' needs to be within the parents repository at '%s'"
312 % (working_tree_linux, path)
313 )
314 path = path[len(working_tree_linux.rstrip("/")) + 1 :]
315 if not path:
316 raise ValueError("Absolute submodule path '%s' didn't yield a valid relative path" % path)
317 # end verify converted relative path makes sense
318 # end convert to a relative path
320 return path
322 @classmethod
323 def _write_git_file_and_module_config(cls, working_tree_dir: PathLike, module_abspath: PathLike) -> None:
324 """Writes a .git file containing a(preferably) relative path to the actual git module repository.
325 It is an error if the module_abspath cannot be made into a relative path, relative to the working_tree_dir
326 :note: will overwrite existing files !
327 :note: as we rewrite both the git file as well as the module configuration, we might fail on the configuration
328 and will not roll back changes done to the git file. This should be a non - issue, but may easily be fixed
329 if it becomes one
330 :param working_tree_dir: directory to write the .git file into
331 :param module_abspath: absolute path to the bare repository
332 """
333 git_file = osp.join(working_tree_dir, ".git")
334 rela_path = osp.relpath(module_abspath, start=working_tree_dir)
335 if is_win:
336 if osp.isfile(git_file):
337 os.remove(git_file)
338 with open(git_file, "wb") as fp:
339 fp.write(("gitdir: %s" % rela_path).encode(defenc))
341 with GitConfigParser(osp.join(module_abspath, "config"), read_only=False, merge_includes=False) as writer:
342 writer.set_value(
343 "core",
344 "worktree",
345 to_native_path_linux(osp.relpath(working_tree_dir, start=module_abspath)),
346 )
348 # { Edit Interface
350 @classmethod
351 def add(
352 cls,
353 repo: "Repo",
354 name: str,
355 path: PathLike,
356 url: Union[str, None] = None,
357 branch: Union[str, None] = None,
358 no_checkout: bool = False,
359 depth: Union[int, None] = None,
360 env: Union[Mapping[str, str], None] = None,
361 clone_multi_options: Union[Sequence[TBD], None] = None,
362 ) -> "Submodule":
363 """Add a new submodule to the given repository. This will alter the index
364 as well as the .gitmodules file, but will not create a new commit.
365 If the submodule already exists, no matter if the configuration differs
366 from the one provided, the existing submodule will be returned.
368 :param repo: Repository instance which should receive the submodule
369 :param name: The name/identifier for the submodule
370 :param path: repository-relative or absolute path at which the submodule
371 should be located
372 It will be created as required during the repository initialization.
373 :param url: git-clone compatible URL, see git-clone reference for more information
374 If None, the repository is assumed to exist, and the url of the first
375 remote is taken instead. This is useful if you want to make an existing
376 repository a submodule of anotherone.
377 :param branch: name of branch at which the submodule should (later) be checked out.
378 The given branch must exist in the remote repository, and will be checked
379 out locally as a tracking branch.
380 It will only be written into the configuration if it not None, which is
381 when the checked out branch will be the one the remote HEAD pointed to.
382 The result you get in these situation is somewhat fuzzy, and it is recommended
383 to specify at least 'master' here.
384 Examples are 'master' or 'feature/new'
385 :param no_checkout: if True, and if the repository has to be cloned manually,
386 no checkout will be performed
387 :param depth: Create a shallow clone with a history truncated to the
388 specified number of commits.
389 :param env: Optional dictionary containing the desired environment variables.
390 Note: Provided variables will be used to update the execution
391 environment for `git`. If some variable is not specified in `env`
392 and is defined in `os.environ`, value from `os.environ` will be used.
393 If you want to unset some variable, consider providing empty string
394 as its value.
395 :param clone_multi_options: A list of Clone options. Please see ``git.repo.base.Repo.clone``
396 for details.
397 :return: The newly created submodule instance
398 :note: works atomically, such that no change will be done if the repository
399 update fails for instance"""
401 if repo.bare:
402 raise InvalidGitRepositoryError("Cannot add submodules to bare repositories")
403 # END handle bare repos
405 path = cls._to_relative_path(repo, path)
407 # assure we never put backslashes into the url, as some operating systems
408 # like it ...
409 if url is not None:
410 url = to_native_path_linux(url)
411 # END assure url correctness
413 # INSTANTIATE INTERMEDIATE SM
414 sm = cls(
415 repo,
416 cls.NULL_BIN_SHA,
417 cls.k_default_mode,
418 path,
419 name,
420 url="invalid-temporary",
421 )
422 if sm.exists():
423 # reretrieve submodule from tree
424 try:
425 sm = repo.head.commit.tree[str(path)]
426 sm._name = name
427 return sm
428 except KeyError:
429 # could only be in index
430 index = repo.index
431 entry = index.entries[index.entry_key(path, 0)]
432 sm.binsha = entry.binsha
433 return sm
434 # END handle exceptions
435 # END handle existing
437 # fake-repo - we only need the functionality on the branch instance
438 br = git.Head(repo, git.Head.to_full_path(str(branch) or cls.k_head_default))
439 has_module = sm.module_exists()
440 branch_is_default = branch is None
441 if has_module and url is not None:
442 if url not in [r.url for r in sm.module().remotes]:
443 raise ValueError(
444 "Specified URL '%s' does not match any remote url of the repository at '%s'" % (url, sm.abspath)
445 )
446 # END check url
447 # END verify urls match
449 mrepo: Union[Repo, None] = None
451 if url is None:
452 if not has_module:
453 raise ValueError("A URL was not given and a repository did not exist at %s" % path)
454 # END check url
455 mrepo = sm.module()
456 # assert isinstance(mrepo, git.Repo)
457 urls = [r.url for r in mrepo.remotes]
458 if not urls:
459 raise ValueError("Didn't find any remote url in repository at %s" % sm.abspath)
460 # END verify we have url
461 url = urls[0]
462 else:
463 # clone new repo
464 kwargs: Dict[str, Union[bool, int, str, Sequence[TBD]]] = {"n": no_checkout}
465 if not branch_is_default:
466 kwargs["b"] = br.name
467 # END setup checkout-branch
469 if depth:
470 if isinstance(depth, int):
471 kwargs["depth"] = depth
472 else:
473 raise ValueError("depth should be an integer")
474 if clone_multi_options:
475 kwargs["multi_options"] = clone_multi_options
477 # _clone_repo(cls, repo, url, path, name, **kwargs):
478 mrepo = cls._clone_repo(repo, url, path, name, env=env, **kwargs)
479 # END verify url
481 ## See #525 for ensuring git urls in config-files valid under Windows.
482 url = Git.polish_url(url)
484 # It's important to add the URL to the parent config, to let `git submodule` know.
485 # otherwise there is a '-' character in front of the submodule listing
486 # a38efa84daef914e4de58d1905a500d8d14aaf45 mymodule (v0.9.0-1-ga38efa8)
487 # -a38efa84daef914e4de58d1905a500d8d14aaf45 submodules/intermediate/one
488 writer: Union[GitConfigParser, SectionConstraint]
490 with sm.repo.config_writer() as writer:
491 writer.set_value(sm_section(name), "url", url)
493 # update configuration and index
494 index = sm.repo.index
495 with sm.config_writer(index=index, write=False) as writer:
496 writer.set_value("url", url)
497 writer.set_value("path", path)
499 sm._url = url
500 if not branch_is_default:
501 # store full path
502 writer.set_value(cls.k_head_option, br.path)
503 sm._branch_path = br.path
505 # we deliberately assume that our head matches our index !
506 if mrepo:
507 sm.binsha = mrepo.head.commit.binsha
508 index.add([sm], write=True)
510 return sm
512 def update(
513 self,
514 recursive: bool = False,
515 init: bool = True,
516 to_latest_revision: bool = False,
517 progress: Union["UpdateProgress", None] = None,
518 dry_run: bool = False,
519 force: bool = False,
520 keep_going: bool = False,
521 env: Union[Mapping[str, str], None] = None,
522 clone_multi_options: Union[Sequence[TBD], None] = None,
523 ) -> "Submodule":
524 """Update the repository of this submodule to point to the checkout
525 we point at with the binsha of this instance.
527 :param recursive: if True, we will operate recursively and update child-
528 modules as well.
529 :param init: if True, the module repository will be cloned into place if necessary
530 :param to_latest_revision: if True, the submodule's sha will be ignored during checkout.
531 Instead, the remote will be fetched, and the local tracking branch updated.
532 This only works if we have a local tracking branch, which is the case
533 if the remote repository had a master branch, or of the 'branch' option
534 was specified for this submodule and the branch existed remotely
535 :param progress: UpdateProgress instance or None if no progress should be shown
536 :param dry_run: if True, the operation will only be simulated, but not performed.
537 All performed operations are read - only
538 :param force:
539 If True, we may reset heads even if the repository in question is dirty. Additinoally we will be allowed
540 to set a tracking branch which is ahead of its remote branch back into the past or the location of the
541 remote branch. This will essentially 'forget' commits.
542 If False, local tracking branches that are in the future of their respective remote branches will simply
543 not be moved.
544 :param keep_going: if True, we will ignore but log all errors, and keep going recursively.
545 Unless dry_run is set as well, keep_going could cause subsequent / inherited errors you wouldn't see
546 otherwise.
547 In conjunction with dry_run, it can be useful to anticipate all errors when updating submodules
548 :param env: Optional dictionary containing the desired environment variables.
549 Note: Provided variables will be used to update the execution
550 environment for `git`. If some variable is not specified in `env`
551 and is defined in `os.environ`, value from `os.environ` will be used.
552 If you want to unset some variable, consider providing empty string
553 as its value.
554 :param clone_multi_options: list of Clone options. Please see ``git.repo.base.Repo.clone``
555 for details. Only take effect with `init` option.
556 :note: does nothing in bare repositories
557 :note: method is definitely not atomic if recurisve is True
558 :return: self"""
559 if self.repo.bare:
560 return self
561 # END pass in bare mode
563 if progress is None:
564 progress = UpdateProgress()
565 # END handle progress
566 prefix = ""
567 if dry_run:
568 prefix = "DRY-RUN: "
569 # END handle prefix
571 # to keep things plausible in dry-run mode
572 if dry_run:
573 mrepo = None
574 # END init mrepo
576 try:
577 # ASSURE REPO IS PRESENT AND UPTODATE
578 #####################################
579 try:
580 mrepo = self.module()
581 rmts = mrepo.remotes
582 len_rmts = len(rmts)
583 for i, remote in enumerate(rmts):
584 op = FETCH
585 if i == 0:
586 op |= BEGIN
587 # END handle start
589 progress.update(
590 op,
591 i,
592 len_rmts,
593 prefix + "Fetching remote %s of submodule %r" % (remote, self.name),
594 )
595 # ===============================
596 if not dry_run:
597 remote.fetch(progress=progress)
598 # END handle dry-run
599 # ===============================
600 if i == len_rmts - 1:
601 op |= END
602 # END handle end
603 progress.update(
604 op,
605 i,
606 len_rmts,
607 prefix + "Done fetching remote of submodule %r" % self.name,
608 )
609 # END fetch new data
610 except InvalidGitRepositoryError:
611 mrepo = None
612 if not init:
613 return self
614 # END early abort if init is not allowed
616 # there is no git-repository yet - but delete empty paths
617 checkout_module_abspath = self.abspath
618 if not dry_run and osp.isdir(checkout_module_abspath):
619 try:
620 os.rmdir(checkout_module_abspath)
621 except OSError as e:
622 raise OSError(
623 "Module directory at %r does already exist and is non-empty" % checkout_module_abspath
624 ) from e
625 # END handle OSError
626 # END handle directory removal
628 # don't check it out at first - nonetheless it will create a local
629 # branch according to the remote-HEAD if possible
630 progress.update(
631 BEGIN | CLONE,
632 0,
633 1,
634 prefix
635 + "Cloning url '%s' to '%s' in submodule %r" % (self.url, checkout_module_abspath, self.name),
636 )
637 if not dry_run:
638 mrepo = self._clone_repo(
639 self.repo,
640 self.url,
641 self.path,
642 self.name,
643 n=True,
644 env=env,
645 multi_options=clone_multi_options,
646 )
647 # END handle dry-run
648 progress.update(
649 END | CLONE,
650 0,
651 1,
652 prefix + "Done cloning to %s" % checkout_module_abspath,
653 )
655 if not dry_run:
656 # see whether we have a valid branch to checkout
657 try:
658 mrepo = cast("Repo", mrepo)
659 # find a remote which has our branch - we try to be flexible
660 remote_branch = find_first_remote_branch(mrepo.remotes, self.branch_name)
661 local_branch = mkhead(mrepo, self.branch_path)
663 # have a valid branch, but no checkout - make sure we can figure
664 # that out by marking the commit with a null_sha
665 local_branch.set_object(Object(mrepo, self.NULL_BIN_SHA))
666 # END initial checkout + branch creation
668 # make sure HEAD is not detached
669 mrepo.head.set_reference(
670 local_branch,
671 logmsg="submodule: attaching head to %s" % local_branch,
672 )
673 mrepo.head.reference.set_tracking_branch(remote_branch)
674 except (IndexError, InvalidGitRepositoryError):
675 log.warning("Failed to checkout tracking branch %s", self.branch_path)
676 # END handle tracking branch
678 # NOTE: Have to write the repo config file as well, otherwise
679 # the default implementation will be offended and not update the repository
680 # Maybe this is a good way to assure it doesn't get into our way, but
681 # we want to stay backwards compatible too ... . Its so redundant !
682 with self.repo.config_writer() as writer:
683 writer.set_value(sm_section(self.name), "url", self.url)
684 # END handle dry_run
685 # END handle initialization
687 # DETERMINE SHAS TO CHECKOUT
688 ############################
689 binsha = self.binsha
690 hexsha = self.hexsha
691 if mrepo is not None:
692 # mrepo is only set if we are not in dry-run mode or if the module existed
693 is_detached = mrepo.head.is_detached
694 # END handle dry_run
696 if mrepo is not None and to_latest_revision:
697 msg_base = "Cannot update to latest revision in repository at %r as " % mrepo.working_dir
698 if not is_detached:
699 rref = mrepo.head.reference.tracking_branch()
700 if rref is not None:
701 rcommit = rref.commit
702 binsha = rcommit.binsha
703 hexsha = rcommit.hexsha
704 else:
705 log.error(
706 "%s a tracking branch was not set for local branch '%s'",
707 msg_base,
708 mrepo.head.reference,
709 )
710 # END handle remote ref
711 else:
712 log.error("%s there was no local tracking branch", msg_base)
713 # END handle detached head
714 # END handle to_latest_revision option
716 # update the working tree
717 # handles dry_run
718 if mrepo is not None and mrepo.head.commit.binsha != binsha:
719 # We must assure that our destination sha (the one to point to) is in the future of our current head.
720 # Otherwise, we will reset changes that might have been done on the submodule, but were not yet pushed
721 # We also handle the case that history has been rewritten, leaving no merge-base. In that case
722 # we behave conservatively, protecting possible changes the user had done
723 may_reset = True
724 if mrepo.head.commit.binsha != self.NULL_BIN_SHA:
725 base_commit = mrepo.merge_base(mrepo.head.commit, hexsha)
726 if len(base_commit) == 0 or (base_commit[0] is not None and base_commit[0].hexsha == hexsha):
727 if force:
728 msg = "Will force checkout or reset on local branch that is possibly in the future of"
729 msg += "the commit it will be checked out to, effectively 'forgetting' new commits"
730 log.debug(msg)
731 else:
732 msg = "Skipping %s on branch '%s' of submodule repo '%s' as it contains un-pushed commits"
733 msg %= (
734 is_detached and "checkout" or "reset",
735 mrepo.head,
736 mrepo,
737 )
738 log.info(msg)
739 may_reset = False
740 # end handle force
741 # end handle if we are in the future
743 if may_reset and not force and mrepo.is_dirty(index=True, working_tree=True, untracked_files=True):
744 raise RepositoryDirtyError(mrepo, "Cannot reset a dirty repository")
745 # end handle force and dirty state
746 # end handle empty repo
748 # end verify future/past
749 progress.update(
750 BEGIN | UPDWKTREE,
751 0,
752 1,
753 prefix
754 + "Updating working tree at %s for submodule %r to revision %s" % (self.path, self.name, hexsha),
755 )
757 if not dry_run and may_reset:
758 if is_detached:
759 # NOTE: for now we force, the user is no supposed to change detached
760 # submodules anyway. Maybe at some point this becomes an option, to
761 # properly handle user modifications - see below for future options
762 # regarding rebase and merge.
763 mrepo.git.checkout(hexsha, force=force)
764 else:
765 mrepo.head.reset(hexsha, index=True, working_tree=True)
766 # END handle checkout
767 # if we may reset/checkout
768 progress.update(
769 END | UPDWKTREE,
770 0,
771 1,
772 prefix + "Done updating working tree for submodule %r" % self.name,
773 )
774 # END update to new commit only if needed
775 except Exception as err:
776 if not keep_going:
777 raise
778 log.error(str(err))
779 # end handle keep_going
781 # HANDLE RECURSION
782 ##################
783 if recursive:
784 # in dry_run mode, the module might not exist
785 if mrepo is not None:
786 for submodule in self.iter_items(self.module()):
787 submodule.update(
788 recursive,
789 init,
790 to_latest_revision,
791 progress=progress,
792 dry_run=dry_run,
793 force=force,
794 keep_going=keep_going,
795 )
796 # END handle recursive update
797 # END handle dry run
798 # END for each submodule
800 return self
802 @unbare_repo
803 def move(self, module_path: PathLike, configuration: bool = True, module: bool = True) -> "Submodule":
804 """Move the submodule to a another module path. This involves physically moving
805 the repository at our current path, changing the configuration, as well as
806 adjusting our index entry accordingly.
808 :param module_path: the path to which to move our module in the parent repostory's working tree,
809 given as repository - relative or absolute path. Intermediate directories will be created
810 accordingly. If the path already exists, it must be empty.
811 Trailing(back)slashes are removed automatically
812 :param configuration: if True, the configuration will be adjusted to let
813 the submodule point to the given path.
814 :param module: if True, the repository managed by this submodule
815 will be moved as well. If False, we don't move the submodule's checkout, which may leave
816 the parent repository in an inconsistent state.
817 :return: self
818 :raise ValueError: if the module path existed and was not empty, or was a file
819 :note: Currently the method is not atomic, and it could leave the repository
820 in an inconsistent state if a sub - step fails for some reason
821 """
822 if module + configuration < 1:
823 raise ValueError("You must specify to move at least the module or the configuration of the submodule")
824 # END handle input
826 module_checkout_path = self._to_relative_path(self.repo, module_path)
828 # VERIFY DESTINATION
829 if module_checkout_path == self.path:
830 return self
831 # END handle no change
833 module_checkout_abspath = join_path_native(str(self.repo.working_tree_dir), module_checkout_path)
834 if osp.isfile(module_checkout_abspath):
835 raise ValueError("Cannot move repository onto a file: %s" % module_checkout_abspath)
836 # END handle target files
838 index = self.repo.index
839 tekey = index.entry_key(module_checkout_path, 0)
840 # if the target item already exists, fail
841 if configuration and tekey in index.entries:
842 raise ValueError("Index entry for target path did already exist")
843 # END handle index key already there
845 # remove existing destination
846 if module:
847 if osp.exists(module_checkout_abspath):
848 if len(os.listdir(module_checkout_abspath)):
849 raise ValueError("Destination module directory was not empty")
850 # END handle non-emptiness
852 if osp.islink(module_checkout_abspath):
853 os.remove(module_checkout_abspath)
854 else:
855 os.rmdir(module_checkout_abspath)
856 # END handle link
857 else:
858 # recreate parent directories
859 # NOTE: renames() does that now
860 pass
861 # END handle existence
862 # END handle module
864 # move the module into place if possible
865 cur_path = self.abspath
866 renamed_module = False
867 if module and osp.exists(cur_path):
868 os.renames(cur_path, module_checkout_abspath)
869 renamed_module = True
871 if osp.isfile(osp.join(module_checkout_abspath, ".git")):
872 module_abspath = self._module_abspath(self.repo, self.path, self.name)
873 self._write_git_file_and_module_config(module_checkout_abspath, module_abspath)
874 # end handle git file rewrite
875 # END move physical module
877 # rename the index entry - have to manipulate the index directly as
878 # git-mv cannot be used on submodules ... yeah
879 previous_sm_path = self.path
880 try:
881 if configuration:
882 try:
883 ekey = index.entry_key(self.path, 0)
884 entry = index.entries[ekey]
885 del index.entries[ekey]
886 nentry = git.IndexEntry(entry[:3] + (module_checkout_path,) + entry[4:])
887 index.entries[tekey] = nentry
888 except KeyError as e:
889 raise InvalidGitRepositoryError("Submodule's entry at %r did not exist" % (self.path)) from e
890 # END handle submodule doesn't exist
892 # update configuration
893 with self.config_writer(index=index) as writer: # auto-write
894 writer.set_value("path", module_checkout_path)
895 self.path = module_checkout_path
896 # END handle configuration flag
897 except Exception:
898 if renamed_module:
899 os.renames(module_checkout_abspath, cur_path)
900 # END undo module renaming
901 raise
902 # END handle undo rename
904 # Auto-rename submodule if it's name was 'default', that is, the checkout directory
905 if previous_sm_path == self.name:
906 self.rename(module_checkout_path)
907 # end
909 return self
911 @unbare_repo
912 def remove(
913 self,
914 module: bool = True,
915 force: bool = False,
916 configuration: bool = True,
917 dry_run: bool = False,
918 ) -> "Submodule":
919 """Remove this submodule from the repository. This will remove our entry
920 from the .gitmodules file and the entry in the .git / config file.
922 :param module: If True, the module checkout we point to will be deleted
923 as well. If the module is currently on a commit which is not part
924 of any branch in the remote, if the currently checked out branch
925 working tree, or untracked files,
926 is ahead of its tracking branch, if you have modifications in the
927 In case the removal of the repository fails for these reasons, the
928 submodule status will not have been altered.
929 If this submodule has child - modules on its own, these will be deleted
930 prior to touching the own module.
931 :param force: Enforces the deletion of the module even though it contains
932 modifications. This basically enforces a brute - force file system based
933 deletion.
934 :param configuration: if True, the submodule is deleted from the configuration,
935 otherwise it isn't. Although this should be enabled most of the times,
936 this flag enables you to safely delete the repository of your submodule.
937 :param dry_run: if True, we will not actually do anything, but throw the errors
938 we would usually throw
939 :return: self
940 :note: doesn't work in bare repositories
941 :note: doesn't work atomically, as failure to remove any part of the submodule will leave
942 an inconsistent state
943 :raise InvalidGitRepositoryError: thrown if the repository cannot be deleted
944 :raise OSError: if directories or files could not be removed"""
945 if not (module or configuration):
946 raise ValueError("Need to specify to delete at least the module, or the configuration")
947 # END handle parameters
949 # Recursively remove children of this submodule
950 nc = 0
951 for csm in self.children():
952 nc += 1
953 csm.remove(module, force, configuration, dry_run)
954 del csm
955 # end
956 if configuration and not dry_run and nc > 0:
957 # Assure we don't leave the parent repository in a dirty state, and commit our changes
958 # It's important for recursive, unforced, deletions to work as expected
959 self.module().index.commit("Removed at least one of child-modules of '%s'" % self.name)
960 # end handle recursion
962 # DELETE REPOSITORY WORKING TREE
963 ################################
964 if module and self.module_exists():
965 mod = self.module()
966 git_dir = mod.git_dir
967 if force:
968 # take the fast lane and just delete everything in our module path
969 # TODO: If we run into permission problems, we have a highly inconsistent
970 # state. Delete the .git folders last, start with the submodules first
971 mp = self.abspath
972 method: Union[None, Callable[[PathLike], None]] = None
973 if osp.islink(mp):
974 method = os.remove
975 elif osp.isdir(mp):
976 method = rmtree
977 elif osp.exists(mp):
978 raise AssertionError("Cannot forcibly delete repository as it was neither a link, nor a directory")
979 # END handle brutal deletion
980 if not dry_run:
981 assert method
982 method(mp)
983 # END apply deletion method
984 else:
985 # verify we may delete our module
986 if mod.is_dirty(index=True, working_tree=True, untracked_files=True):
987 raise InvalidGitRepositoryError(
988 "Cannot delete module at %s with any modifications, unless force is specified"
989 % mod.working_tree_dir
990 )
991 # END check for dirt
993 # figure out whether we have new commits compared to the remotes
994 # NOTE: If the user pulled all the time, the remote heads might
995 # not have been updated, so commits coming from the remote look
996 # as if they come from us. But we stay strictly read-only and
997 # don't fetch beforehand.
998 for remote in mod.remotes:
999 num_branches_with_new_commits = 0
1000 rrefs = remote.refs
1001 for rref in rrefs:
1002 num_branches_with_new_commits += len(mod.git.cherry(rref)) != 0
1003 # END for each remote ref
1004 # not a single remote branch contained all our commits
1005 if len(rrefs) and num_branches_with_new_commits == len(rrefs):
1006 raise InvalidGitRepositoryError(
1007 "Cannot delete module at %s as there are new commits" % mod.working_tree_dir
1008 )
1009 # END handle new commits
1010 # have to manually delete references as python's scoping is
1011 # not existing, they could keep handles open ( on windows this is a problem )
1012 if len(rrefs):
1013 del rref # skipcq: PYL-W0631
1014 # END handle remotes
1015 del rrefs
1016 del remote
1017 # END for each remote
1019 # finally delete our own submodule
1020 if not dry_run:
1021 self._clear_cache()
1022 wtd = mod.working_tree_dir
1023 del mod # release file-handles (windows)
1024 import gc
1026 gc.collect()
1027 try:
1028 rmtree(str(wtd))
1029 except Exception as ex:
1030 if HIDE_WINDOWS_KNOWN_ERRORS:
1031 from unittest import SkipTest
1033 raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex
1034 raise
1035 # END delete tree if possible
1036 # END handle force
1038 if not dry_run and osp.isdir(git_dir):
1039 self._clear_cache()
1040 try:
1041 rmtree(git_dir)
1042 except Exception as ex:
1043 if HIDE_WINDOWS_KNOWN_ERRORS:
1044 from unittest import SkipTest
1046 raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex
1047 else:
1048 raise
1049 # end handle separate bare repository
1050 # END handle module deletion
1052 # void our data not to delay invalid access
1053 if not dry_run:
1054 self._clear_cache()
1056 # DELETE CONFIGURATION
1057 ######################
1058 if configuration and not dry_run:
1059 # first the index-entry
1060 parent_index = self.repo.index
1061 try:
1062 del parent_index.entries[parent_index.entry_key(self.path, 0)]
1063 except KeyError:
1064 pass
1065 # END delete entry
1066 parent_index.write()
1068 # now git config - need the config intact, otherwise we can't query
1069 # information anymore
1071 with self.repo.config_writer() as gcp_writer:
1072 gcp_writer.remove_section(sm_section(self.name))
1074 with self.config_writer() as sc_writer:
1075 sc_writer.remove_section()
1076 # END delete configuration
1078 return self
1080 def set_parent_commit(self, commit: Union[Commit_ish, None], check: bool = True) -> "Submodule":
1081 """Set this instance to use the given commit whose tree is supposed to
1082 contain the .gitmodules blob.
1084 :param commit:
1085 Commit'ish reference pointing at the root_tree, or None to always point to the
1086 most recent commit
1087 :param check:
1088 if True, relatively expensive checks will be performed to verify
1089 validity of the submodule.
1090 :raise ValueError: if the commit's tree didn't contain the .gitmodules blob.
1091 :raise ValueError:
1092 if the parent commit didn't store this submodule under the current path
1093 :return: self"""
1094 if commit is None:
1095 self._parent_commit = None
1096 return self
1097 # end handle None
1098 pcommit = self.repo.commit(commit)
1099 pctree = pcommit.tree
1100 if self.k_modules_file not in pctree:
1101 raise ValueError("Tree of commit %s did not contain the %s file" % (commit, self.k_modules_file))
1102 # END handle exceptions
1104 prev_pc = self._parent_commit
1105 self._parent_commit = pcommit
1107 if check:
1108 parser = self._config_parser(self.repo, self._parent_commit, read_only=True)
1109 if not parser.has_section(sm_section(self.name)):
1110 self._parent_commit = prev_pc
1111 raise ValueError("Submodule at path %r did not exist in parent commit %s" % (self.path, commit))
1112 # END handle submodule did not exist
1113 # END handle checking mode
1115 # update our sha, it could have changed
1116 # If check is False, we might see a parent-commit that doesn't even contain the submodule anymore.
1117 # in that case, mark our sha as being NULL
1118 try:
1119 self.binsha = pctree[str(self.path)].binsha
1120 except KeyError:
1121 self.binsha = self.NULL_BIN_SHA
1122 # end
1124 self._clear_cache()
1125 return self
1127 @unbare_repo
1128 def config_writer(
1129 self, index: Union["IndexFile", None] = None, write: bool = True
1130 ) -> SectionConstraint["SubmoduleConfigParser"]:
1131 """:return: a config writer instance allowing you to read and write the data
1132 belonging to this submodule into the .gitmodules file.
1134 :param index: if not None, an IndexFile instance which should be written.
1135 defaults to the index of the Submodule's parent repository.
1136 :param write: if True, the index will be written each time a configuration
1137 value changes.
1138 :note: the parameters allow for a more efficient writing of the index,
1139 as you can pass in a modified index on your own, prevent automatic writing,
1140 and write yourself once the whole operation is complete
1141 :raise ValueError: if trying to get a writer on a parent_commit which does not
1142 match the current head commit
1143 :raise IOError: If the .gitmodules file/blob could not be read"""
1144 writer = self._config_parser_constrained(read_only=False)
1145 if index is not None:
1146 writer.config._index = index
1147 writer.config._auto_write = write
1148 return writer
1150 @unbare_repo
1151 def rename(self, new_name: str) -> "Submodule":
1152 """Rename this submodule
1153 :note: This method takes care of renaming the submodule in various places, such as
1155 * $parent_git_dir / config
1156 * $working_tree_dir / .gitmodules
1157 * (git >= v1.8.0: move submodule repository to new name)
1159 As .gitmodules will be changed, you would need to make a commit afterwards. The changed .gitmodules file
1160 will already be added to the index
1162 :return: this submodule instance
1163 """
1164 if self.name == new_name:
1165 return self
1167 # .git/config
1168 with self.repo.config_writer() as pw:
1169 # As we ourselves didn't write anything about submodules into the parent .git/config,
1170 # we will not require it to exist, and just ignore missing entries.
1171 if pw.has_section(sm_section(self.name)):
1172 pw.rename_section(sm_section(self.name), sm_section(new_name))
1174 # .gitmodules
1175 with self.config_writer(write=True).config as cw:
1176 cw.rename_section(sm_section(self.name), sm_section(new_name))
1178 self._name = new_name
1180 # .git/modules
1181 mod = self.module()
1182 if mod.has_separate_working_tree():
1183 destination_module_abspath = self._module_abspath(self.repo, self.path, new_name)
1184 source_dir = mod.git_dir
1185 # Let's be sure the submodule name is not so obviously tied to a directory
1186 if str(destination_module_abspath).startswith(str(mod.git_dir)):
1187 tmp_dir = self._module_abspath(self.repo, self.path, str(uuid.uuid4()))
1188 os.renames(source_dir, tmp_dir)
1189 source_dir = tmp_dir
1190 # end handle self-containment
1191 os.renames(source_dir, destination_module_abspath)
1192 if mod.working_tree_dir:
1193 self._write_git_file_and_module_config(mod.working_tree_dir, destination_module_abspath)
1194 # end move separate git repository
1196 return self
1198 # } END edit interface
1200 # { Query Interface
1202 @unbare_repo
1203 def module(self) -> "Repo":
1204 """:return: Repo instance initialized from the repository at our submodule path
1205 :raise InvalidGitRepositoryError: if a repository was not available. This could
1206 also mean that it was not yet initialized"""
1207 # late import to workaround circular dependencies
1208 module_checkout_abspath = self.abspath
1209 try:
1210 repo = git.Repo(module_checkout_abspath)
1211 if repo != self.repo:
1212 return repo
1213 # END handle repo uninitialized
1214 except (InvalidGitRepositoryError, NoSuchPathError) as e:
1215 raise InvalidGitRepositoryError("No valid repository at %s" % module_checkout_abspath) from e
1216 else:
1217 raise InvalidGitRepositoryError("Repository at %r was not yet checked out" % module_checkout_abspath)
1218 # END handle exceptions
1220 def module_exists(self) -> bool:
1221 """:return: True if our module exists and is a valid git repository. See module() method"""
1222 try:
1223 self.module()
1224 return True
1225 except Exception:
1226 return False
1227 # END handle exception
1229 def exists(self) -> bool:
1230 """
1231 :return: True if the submodule exists, False otherwise. Please note that
1232 a submodule may exist ( in the .gitmodules file) even though its module
1233 doesn't exist on disk"""
1234 # keep attributes for later, and restore them if we have no valid data
1235 # this way we do not actually alter the state of the object
1236 loc = locals()
1237 for attr in self._cache_attrs:
1238 try:
1239 if hasattr(self, attr):
1240 loc[attr] = getattr(self, attr)
1241 # END if we have the attribute cache
1242 except (cp.NoSectionError, ValueError):
1243 # on PY3, this can happen apparently ... don't know why this doesn't happen on PY2
1244 pass
1245 # END for each attr
1246 self._clear_cache()
1248 try:
1249 try:
1250 self.path
1251 return True
1252 except Exception:
1253 return False
1254 # END handle exceptions
1255 finally:
1256 for attr in self._cache_attrs:
1257 if attr in loc:
1258 setattr(self, attr, loc[attr])
1259 # END if we have a cache
1260 # END reapply each attribute
1261 # END handle object state consistency
1263 @property
1264 def branch(self) -> "Head":
1265 """:return: The branch instance that we are to checkout
1266 :raise InvalidGitRepositoryError: if our module is not yet checked out"""
1267 return mkhead(self.module(), self._branch_path)
1269 @property
1270 def branch_path(self) -> PathLike:
1271 """
1272 :return: full(relative) path as string to the branch we would checkout
1273 from the remote and track"""
1274 return self._branch_path
1276 @property
1277 def branch_name(self) -> str:
1278 """:return: the name of the branch, which is the shortest possible branch name"""
1279 # use an instance method, for this we create a temporary Head instance
1280 # which uses a repository that is available at least ( it makes no difference )
1281 return git.Head(self.repo, self._branch_path).name
1283 @property
1284 def url(self) -> str:
1285 """:return: The url to the repository which our module - repository refers to"""
1286 return self._url
1288 @property
1289 def parent_commit(self) -> "Commit_ish":
1290 """:return: Commit instance with the tree containing the .gitmodules file
1291 :note: will always point to the current head's commit if it was not set explicitly"""
1292 if self._parent_commit is None:
1293 return self.repo.commit()
1294 return self._parent_commit
1296 @property
1297 def name(self) -> str:
1298 """:return: The name of this submodule. It is used to identify it within the
1299 .gitmodules file.
1300 :note: by default, the name is the path at which to find the submodule, but
1301 in git - python it should be a unique identifier similar to the identifiers
1302 used for remotes, which allows to change the path of the submodule
1303 easily
1304 """
1305 return self._name
1307 def config_reader(self) -> SectionConstraint[SubmoduleConfigParser]:
1308 """
1309 :return: ConfigReader instance which allows you to qurey the configuration values
1310 of this submodule, as provided by the .gitmodules file
1311 :note: The config reader will actually read the data directly from the repository
1312 and thus does not need nor care about your working tree.
1313 :note: Should be cached by the caller and only kept as long as needed
1314 :raise IOError: If the .gitmodules file/blob could not be read"""
1315 return self._config_parser_constrained(read_only=True)
1317 def children(self) -> IterableList["Submodule"]:
1318 """
1319 :return: IterableList(Submodule, ...) an iterable list of submodules instances
1320 which are children of this submodule or 0 if the submodule is not checked out"""
1321 return self._get_intermediate_items(self)
1323 # } END query interface
1325 # { Iterable Interface
1327 @classmethod
1328 def iter_items(
1329 cls,
1330 repo: "Repo",
1331 parent_commit: Union[Commit_ish, str] = "HEAD",
1332 *Args: Any,
1333 **kwargs: Any,
1334 ) -> Iterator["Submodule"]:
1335 """:return: iterator yielding Submodule instances available in the given repository"""
1336 try:
1337 pc = repo.commit(parent_commit) # parent commit instance
1338 parser = cls._config_parser(repo, pc, read_only=True)
1339 except (IOError, BadName):
1340 return iter([])
1341 # END handle empty iterator
1343 for sms in parser.sections():
1344 n = sm_name(sms)
1345 p = parser.get(sms, "path")
1346 u = parser.get(sms, "url")
1347 b = cls.k_head_default
1348 if parser.has_option(sms, cls.k_head_option):
1349 b = str(parser.get(sms, cls.k_head_option))
1350 # END handle optional information
1352 # get the binsha
1353 index = repo.index
1354 try:
1355 rt = pc.tree # root tree
1356 sm = rt[p]
1357 except KeyError:
1358 # try the index, maybe it was just added
1359 try:
1360 entry = index.entries[index.entry_key(p, 0)]
1361 sm = Submodule(repo, entry.binsha, entry.mode, entry.path)
1362 except KeyError:
1363 # The submodule doesn't exist, probably it wasn't
1364 # removed from the .gitmodules file.
1365 continue
1366 # END handle keyerror
1367 # END handle critical error
1369 # fill in remaining info - saves time as it doesn't have to be parsed again
1370 sm._name = n
1371 if pc != repo.commit():
1372 sm._parent_commit = pc
1373 # end set only if not most recent !
1374 sm._branch_path = git.Head.to_full_path(b)
1375 sm._url = u
1377 yield sm
1378 # END for each section
1380 # } END iterable interface