Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/repo/base.py: 18%

563 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# repo.py 

2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

3# 

4# This module is part of GitPython and is released under 

5# the BSD License: http://www.opensource.org/licenses/bsd-license.php 

6from __future__ import annotations 

7import logging 

8import os 

9import re 

10import shlex 

11import warnings 

12from gitdb.db.loose import LooseObjectDB 

13 

14from gitdb.exc import BadObject 

15 

16from git.cmd import Git, handle_process_output 

17from git.compat import ( 

18 defenc, 

19 safe_decode, 

20 is_win, 

21) 

22from git.config import GitConfigParser 

23from git.db import GitCmdObjectDB 

24from git.exc import InvalidGitRepositoryError, NoSuchPathError, GitCommandError 

25from git.index import IndexFile 

26from git.objects import Submodule, RootModule, Commit 

27from git.refs import HEAD, Head, Reference, TagReference 

28from git.remote import Remote, add_progress, to_progress_instance 

29from git.util import ( 

30 Actor, 

31 finalize_process, 

32 cygpath, 

33 hex_to_bin, 

34 expand_path, 

35 remove_password_if_present, 

36) 

37import os.path as osp 

38 

39from .fun import ( 

40 rev_parse, 

41 is_git_dir, 

42 find_submodule_git_dir, 

43 touch, 

44 find_worktree_git_dir, 

45) 

46import gc 

47import gitdb 

48 

49# typing ------------------------------------------------------ 

50 

51from git.types import ( 

52 TBD, 

53 PathLike, 

54 Lit_config_levels, 

55 Commit_ish, 

56 Tree_ish, 

57 assert_never, 

58) 

59from typing import ( 

60 Any, 

61 BinaryIO, 

62 Callable, 

63 Dict, 

64 Iterator, 

65 List, 

66 Mapping, 

67 Optional, 

68 Sequence, 

69 TextIO, 

70 Tuple, 

71 Type, 

72 Union, 

73 NamedTuple, 

74 cast, 

75 TYPE_CHECKING, 

76) 

77 

78from git.types import ConfigLevels_Tup, TypedDict 

79 

80if TYPE_CHECKING: 80 ↛ 81line 80 didn't jump to line 81, because the condition on line 80 was never true

81 from git.util import IterableList 

82 from git.refs.symbolic import SymbolicReference 

83 from git.objects import Tree 

84 from git.objects.submodule.base import UpdateProgress 

85 from git.remote import RemoteProgress 

86 

87# ----------------------------------------------------------- 

88 

89log = logging.getLogger(__name__) 

90 

91__all__ = ("Repo",) 

92 

93 

94class BlameEntry(NamedTuple): 

95 commit: Dict[str, "Commit"] 

96 linenos: range 

97 orig_path: Optional[str] 

98 orig_linenos: range 

99 

100 

101class Repo(object): 

102 """Represents a git repository and allows you to query references, 

103 gather commit information, generate diffs, create and clone repositories query 

104 the log. 

105 

106 The following attributes are worth using: 

107 

108 'working_dir' is the working directory of the git command, which is the working tree 

109 directory if available or the .git directory in case of bare repositories 

110 

111 'working_tree_dir' is the working tree directory, but will raise AssertionError 

112 if we are a bare repository. 

113 

114 'git_dir' is the .git repository directory, which is always set.""" 

115 

116 DAEMON_EXPORT_FILE = "git-daemon-export-ok" 

117 

118 git = cast("Git", None) # Must exist, or __del__ will fail in case we raise on `__init__()` 

119 working_dir: Optional[PathLike] = None 

120 _working_tree_dir: Optional[PathLike] = None 

121 git_dir: PathLike = "" 

122 _common_dir: PathLike = "" 

123 

124 # precompiled regex 

125 re_whitespace = re.compile(r"\s+") 

126 re_hexsha_only = re.compile("^[0-9A-Fa-f]{40}$") 

127 re_hexsha_shortened = re.compile("^[0-9A-Fa-f]{4,40}$") 

128 re_envvars = re.compile(r"(\$(\{\s?)?[a-zA-Z_]\w*(\}\s?)?|%\s?[a-zA-Z_]\w*\s?%)") 

129 re_author_committer_start = re.compile(r"^(author|committer)") 

130 re_tab_full_line = re.compile(r"^\t(.*)$") 

131 

132 # invariants 

133 # represents the configuration level of a configuration file 

134 config_level: ConfigLevels_Tup = ("system", "user", "global", "repository") 

135 

136 # Subclass configuration 

137 # Subclasses may easily bring in their own custom types by placing a constructor or type here 

138 GitCommandWrapperType = Git 

139 

140 def __init__( 

141 self, 

142 path: Optional[PathLike] = None, 

143 odbt: Type[LooseObjectDB] = GitCmdObjectDB, 

144 search_parent_directories: bool = False, 

145 expand_vars: bool = True, 

146 ) -> None: 

147 """Create a new Repo instance 

148 

149 :param path: 

150 the path to either the root git directory or the bare git repo:: 

151 

152 repo = Repo("/Users/mtrier/Development/git-python") 

153 repo = Repo("/Users/mtrier/Development/git-python.git") 

154 repo = Repo("~/Development/git-python.git") 

155 repo = Repo("$REPOSITORIES/Development/git-python.git") 

156 repo = Repo("C:\\Users\\mtrier\\Development\\git-python\\.git") 

157 

158 - In *Cygwin*, path may be a `'cygdrive/...'` prefixed path. 

159 - If it evaluates to false, :envvar:`GIT_DIR` is used, and if this also evals to false, 

160 the current-directory is used. 

161 :param odbt: 

162 Object DataBase type - a type which is constructed by providing 

163 the directory containing the database objects, i.e. .git/objects. It will 

164 be used to access all object data 

165 :param search_parent_directories: 

166 if True, all parent directories will be searched for a valid repo as well. 

167 

168 Please note that this was the default behaviour in older versions of GitPython, 

169 which is considered a bug though. 

170 :raise InvalidGitRepositoryError: 

171 :raise NoSuchPathError: 

172 :return: git.Repo""" 

173 

174 epath = path or os.getenv("GIT_DIR") 

175 if not epath: 

176 epath = os.getcwd() 

177 if Git.is_cygwin(): 

178 # Given how the tests are written, this seems more likely to catch 

179 # Cygwin git used from Windows than Windows git used from Cygwin. 

180 # Therefore changing to Cygwin-style paths is the relevant operation. 

181 epath = cygpath(epath) 

182 

183 epath = epath or path or os.getcwd() 

184 if not isinstance(epath, str): 

185 epath = str(epath) 

186 if expand_vars and re.search(self.re_envvars, epath): 

187 warnings.warn( 

188 "The use of environment variables in paths is deprecated" 

189 + "\nfor security reasons and may be removed in the future!!" 

190 ) 

191 epath = expand_path(epath, expand_vars) 

192 if epath is not None: 

193 if not os.path.exists(epath): 

194 raise NoSuchPathError(epath) 

195 

196 ## Walk up the path to find the `.git` dir. 

197 # 

198 curpath = epath 

199 while curpath: 

200 # ABOUT osp.NORMPATH 

201 # It's important to normalize the paths, as submodules will otherwise initialize their 

202 # repo instances with paths that depend on path-portions that will not exist after being 

203 # removed. It's just cleaner. 

204 if is_git_dir(curpath): 

205 self.git_dir = curpath 

206 # from man git-config : core.worktree 

207 # Set the path to the root of the working tree. If GIT_COMMON_DIR environment 

208 # variable is set, core.worktree is ignored and not used for determining the 

209 # root of working tree. This can be overridden by the GIT_WORK_TREE environment 

210 # variable. The value can be an absolute path or relative to the path to the .git 

211 # directory, which is either specified by GIT_DIR, or automatically discovered. 

212 # If GIT_DIR is specified but none of GIT_WORK_TREE and core.worktree is specified, 

213 # the current working directory is regarded as the top level of your working tree. 

214 self._working_tree_dir = os.path.dirname(self.git_dir) 

215 if os.environ.get("GIT_COMMON_DIR") is None: 

216 gitconf = self.config_reader("repository") 

217 if gitconf.has_option("core", "worktree"): 

218 self._working_tree_dir = gitconf.get("core", "worktree") 

219 if "GIT_WORK_TREE" in os.environ: 

220 self._working_tree_dir = os.getenv("GIT_WORK_TREE") 

221 break 

222 

223 dotgit = osp.join(curpath, ".git") 

224 sm_gitpath = find_submodule_git_dir(dotgit) 

225 if sm_gitpath is not None: 

226 self.git_dir = osp.normpath(sm_gitpath) 

227 

228 sm_gitpath = find_submodule_git_dir(dotgit) 

229 if sm_gitpath is None: 

230 sm_gitpath = find_worktree_git_dir(dotgit) 

231 

232 if sm_gitpath is not None: 

233 self.git_dir = expand_path(sm_gitpath, expand_vars) 

234 self._working_tree_dir = curpath 

235 break 

236 

237 if not search_parent_directories: 

238 break 

239 curpath, tail = osp.split(curpath) 

240 if not tail: 

241 break 

242 # END while curpath 

243 

244 if self.git_dir is None: 

245 raise InvalidGitRepositoryError(epath) 

246 

247 self._bare = False 

248 try: 

249 self._bare = self.config_reader("repository").getboolean("core", "bare") 

250 except Exception: 

251 # lets not assume the option exists, although it should 

252 pass 

253 

254 try: 

255 common_dir = open(osp.join(self.git_dir, "commondir"), "rt").readlines()[0].strip() 

256 self._common_dir = osp.join(self.git_dir, common_dir) 

257 except OSError: 

258 self._common_dir = "" 

259 

260 # adjust the wd in case we are actually bare - we didn't know that 

261 # in the first place 

262 if self._bare: 

263 self._working_tree_dir = None 

264 # END working dir handling 

265 

266 self.working_dir: Optional[PathLike] = self._working_tree_dir or self.common_dir 

267 self.git = self.GitCommandWrapperType(self.working_dir) 

268 

269 # special handling, in special times 

270 rootpath = osp.join(self.common_dir, "objects") 

271 if issubclass(odbt, GitCmdObjectDB): 

272 self.odb = odbt(rootpath, self.git) 

273 else: 

274 self.odb = odbt(rootpath) 

275 

276 def __enter__(self) -> "Repo": 

277 return self 

278 

279 def __exit__(self, *args: Any) -> None: 

280 self.close() 

281 

282 def __del__(self) -> None: 

283 try: 

284 self.close() 

285 except Exception: 

286 pass 

287 

288 def close(self) -> None: 

289 if self.git: 

290 self.git.clear_cache() 

291 # Tempfiles objects on Windows are holding references to 

292 # open files until they are collected by the garbage 

293 # collector, thus preventing deletion. 

294 # TODO: Find these references and ensure they are closed 

295 # and deleted synchronously rather than forcing a gc 

296 # collection. 

297 if is_win: 

298 gc.collect() 

299 gitdb.util.mman.collect() 

300 if is_win: 

301 gc.collect() 

302 

303 def __eq__(self, rhs: object) -> bool: 

304 if isinstance(rhs, Repo) and self.git_dir: 

305 return self.git_dir == rhs.git_dir 

306 return False 

307 

308 def __ne__(self, rhs: object) -> bool: 

309 return not self.__eq__(rhs) 

310 

311 def __hash__(self) -> int: 

312 return hash(self.git_dir) 

313 

314 # Description property 

315 def _get_description(self) -> str: 

316 if self.git_dir: 

317 filename = osp.join(self.git_dir, "description") 

318 with open(filename, "rb") as fp: 

319 return fp.read().rstrip().decode(defenc) 

320 

321 def _set_description(self, descr: str) -> None: 

322 if self.git_dir: 

323 filename = osp.join(self.git_dir, "description") 

324 with open(filename, "wb") as fp: 

325 fp.write((descr + "\n").encode(defenc)) 

326 

327 description = property(_get_description, _set_description, doc="the project's description") 

328 del _get_description 

329 del _set_description 

330 

331 @property 

332 def working_tree_dir(self) -> Optional[PathLike]: 

333 """:return: The working tree directory of our git repository. If this is a bare repository, None is returned.""" 

334 return self._working_tree_dir 

335 

336 @property 

337 def common_dir(self) -> PathLike: 

338 """ 

339 :return: The git dir that holds everything except possibly HEAD, 

340 FETCH_HEAD, ORIG_HEAD, COMMIT_EDITMSG, index, and logs/.""" 

341 if self._common_dir: 

342 return self._common_dir 

343 elif self.git_dir: 

344 return self.git_dir 

345 else: 

346 # or could return "" 

347 raise InvalidGitRepositoryError() 

348 

349 @property 

350 def bare(self) -> bool: 

351 """:return: True if the repository is bare""" 

352 return self._bare 

353 

354 @property 

355 def heads(self) -> "IterableList[Head]": 

356 """A list of ``Head`` objects representing the branch heads in 

357 this repo 

358 

359 :return: ``git.IterableList(Head, ...)``""" 

360 return Head.list_items(self) 

361 

362 @property 

363 def references(self) -> "IterableList[Reference]": 

364 """A list of Reference objects representing tags, heads and remote references. 

365 

366 :return: IterableList(Reference, ...)""" 

367 return Reference.list_items(self) 

368 

369 # alias for references 

370 refs = references 

371 

372 # alias for heads 

373 branches = heads 

374 

375 @property 

376 def index(self) -> "IndexFile": 

377 """:return: IndexFile representing this repository's index. 

378 :note: This property can be expensive, as the returned ``IndexFile`` will be 

379 reinitialized. It's recommended to re-use the object.""" 

380 return IndexFile(self) 

381 

382 @property 

383 def head(self) -> "HEAD": 

384 """:return: HEAD Object pointing to the current head reference""" 

385 return HEAD(self, "HEAD") 

386 

387 @property 

388 def remotes(self) -> "IterableList[Remote]": 

389 """A list of Remote objects allowing to access and manipulate remotes 

390 :return: ``git.IterableList(Remote, ...)``""" 

391 return Remote.list_items(self) 

392 

393 def remote(self, name: str = "origin") -> "Remote": 

394 """:return: Remote with the specified name 

395 :raise ValueError: if no remote with such a name exists""" 

396 r = Remote(self, name) 

397 if not r.exists(): 

398 raise ValueError("Remote named '%s' didn't exist" % name) 

399 return r 

400 

401 # { Submodules 

402 

403 @property 

404 def submodules(self) -> "IterableList[Submodule]": 

405 """ 

406 :return: git.IterableList(Submodule, ...) of direct submodules 

407 available from the current head""" 

408 return Submodule.list_items(self) 

409 

410 def submodule(self, name: str) -> "Submodule": 

411 """:return: Submodule with the given name 

412 :raise ValueError: If no such submodule exists""" 

413 try: 

414 return self.submodules[name] 

415 except IndexError as e: 

416 raise ValueError("Didn't find submodule named %r" % name) from e 

417 # END exception handling 

418 

419 def create_submodule(self, *args: Any, **kwargs: Any) -> Submodule: 

420 """Create a new submodule 

421 

422 :note: See the documentation of Submodule.add for a description of the 

423 applicable parameters 

424 :return: created submodules""" 

425 return Submodule.add(self, *args, **kwargs) 

426 

427 def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]: 

428 """An iterator yielding Submodule instances, see Traversable interface 

429 for a description of args and kwargs 

430 :return: Iterator""" 

431 return RootModule(self).traverse(*args, **kwargs) 

432 

433 def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]: 

434 """Update the submodules, keeping the repository consistent as it will 

435 take the previous state into consideration. For more information, please 

436 see the documentation of RootModule.update""" 

437 return RootModule(self).update(*args, **kwargs) 

438 

439 # }END submodules 

440 

441 @property 

442 def tags(self) -> "IterableList[TagReference]": 

443 """A list of ``Tag`` objects that are available in this repo 

444 :return: ``git.IterableList(TagReference, ...)``""" 

445 return TagReference.list_items(self) 

446 

447 def tag(self, path: PathLike) -> TagReference: 

448 """:return: TagReference Object, reference pointing to a Commit or Tag 

449 :param path: path to the tag reference, i.e. 0.1.5 or tags/0.1.5""" 

450 full_path = self._to_full_tag_path(path) 

451 return TagReference(self, full_path) 

452 

453 @staticmethod 

454 def _to_full_tag_path(path: PathLike) -> str: 

455 path_str = str(path) 

456 if path_str.startswith(TagReference._common_path_default + "/"): 

457 return path_str 

458 if path_str.startswith(TagReference._common_default + "/"): 

459 return Reference._common_path_default + "/" + path_str 

460 else: 

461 return TagReference._common_path_default + "/" + path_str 

462 

463 def create_head( 

464 self, 

465 path: PathLike, 

466 commit: Union["SymbolicReference", "str"] = "HEAD", 

467 force: bool = False, 

468 logmsg: Optional[str] = None, 

469 ) -> "Head": 

470 """Create a new head within the repository. 

471 For more documentation, please see the Head.create method. 

472 

473 :return: newly created Head Reference""" 

474 return Head.create(self, path, commit, logmsg, force) 

475 

476 def delete_head(self, *heads: "Union[str, Head]", **kwargs: Any) -> None: 

477 """Delete the given heads 

478 

479 :param kwargs: Additional keyword arguments to be passed to git-branch""" 

480 return Head.delete(self, *heads, **kwargs) 

481 

482 def create_tag( 

483 self, 

484 path: PathLike, 

485 ref: str = "HEAD", 

486 message: Optional[str] = None, 

487 force: bool = False, 

488 **kwargs: Any, 

489 ) -> TagReference: 

490 """Create a new tag reference. 

491 For more documentation, please see the TagReference.create method. 

492 

493 :return: TagReference object""" 

494 return TagReference.create(self, path, ref, message, force, **kwargs) 

495 

496 def delete_tag(self, *tags: TagReference) -> None: 

497 """Delete the given tag references""" 

498 return TagReference.delete(self, *tags) 

499 

500 def create_remote(self, name: str, url: str, **kwargs: Any) -> Remote: 

501 """Create a new remote. 

502 

503 For more information, please see the documentation of the Remote.create 

504 methods 

505 

506 :return: Remote reference""" 

507 return Remote.create(self, name, url, **kwargs) 

508 

509 def delete_remote(self, remote: "Remote") -> str: 

510 """Delete the given remote.""" 

511 return Remote.remove(self, remote) 

512 

513 def _get_config_path(self, config_level: Lit_config_levels) -> str: 

514 # we do not support an absolute path of the gitconfig on windows , 

515 # use the global config instead 

516 if is_win and config_level == "system": 

517 config_level = "global" 

518 

519 if config_level == "system": 

520 return "/etc/gitconfig" 

521 elif config_level == "user": 

522 config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config") 

523 return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config"))) 

524 elif config_level == "global": 

525 return osp.normpath(osp.expanduser("~/.gitconfig")) 

526 elif config_level == "repository": 

527 repo_dir = self._common_dir or self.git_dir 

528 if not repo_dir: 

529 raise NotADirectoryError 

530 else: 

531 return osp.normpath(osp.join(repo_dir, "config")) 

532 else: 

533 

534 assert_never( 

535 config_level, # type:ignore[unreachable] 

536 ValueError(f"Invalid configuration level: {config_level!r}"), 

537 ) 

538 

539 def config_reader( 

540 self, 

541 config_level: Optional[Lit_config_levels] = None, 

542 ) -> GitConfigParser: 

543 """ 

544 :return: 

545 GitConfigParser allowing to read the full git configuration, but not to write it 

546 

547 The configuration will include values from the system, user and repository 

548 configuration files. 

549 

550 :param config_level: 

551 For possible values, see config_writer method 

552 If None, all applicable levels will be used. Specify a level in case 

553 you know which file you wish to read to prevent reading multiple files. 

554 :note: On windows, system configuration cannot currently be read as the path is 

555 unknown, instead the global path will be used.""" 

556 files = None 

557 if config_level is None: 

558 files = [ 

559 self._get_config_path(cast(Lit_config_levels, f)) 

560 for f in self.config_level 

561 if cast(Lit_config_levels, f) 

562 ] 

563 else: 

564 files = [self._get_config_path(config_level)] 

565 return GitConfigParser(files, read_only=True, repo=self) 

566 

567 def config_writer(self, config_level: Lit_config_levels = "repository") -> GitConfigParser: 

568 """ 

569 :return: 

570 GitConfigParser allowing to write values of the specified configuration file level. 

571 Config writers should be retrieved, used to change the configuration, and written 

572 right away as they will lock the configuration file in question and prevent other's 

573 to write it. 

574 

575 :param config_level: 

576 One of the following values 

577 system = system wide configuration file 

578 global = user level configuration file 

579 repository = configuration file for this repository only""" 

580 return GitConfigParser(self._get_config_path(config_level), read_only=False, repo=self) 

581 

582 def commit(self, rev: Union[str, Commit_ish, None] = None) -> Commit: 

583 """The Commit object for the specified revision 

584 

585 :param rev: revision specifier, see git-rev-parse for viable options. 

586 :return: ``git.Commit`` 

587 """ 

588 if rev is None: 

589 return self.head.commit 

590 return self.rev_parse(str(rev) + "^0") 

591 

592 def iter_trees(self, *args: Any, **kwargs: Any) -> Iterator["Tree"]: 

593 """:return: Iterator yielding Tree objects 

594 :note: Takes all arguments known to iter_commits method""" 

595 return (c.tree for c in self.iter_commits(*args, **kwargs)) 

596 

597 def tree(self, rev: Union[Tree_ish, str, None] = None) -> "Tree": 

598 """The Tree object for the given treeish revision 

599 Examples:: 

600 

601 repo.tree(repo.heads[0]) 

602 

603 :param rev: is a revision pointing to a Treeish ( being a commit or tree ) 

604 :return: ``git.Tree`` 

605 

606 :note: 

607 If you need a non-root level tree, find it by iterating the root tree. Otherwise 

608 it cannot know about its path relative to the repository root and subsequent 

609 operations might have unexpected results.""" 

610 if rev is None: 

611 return self.head.commit.tree 

612 return self.rev_parse(str(rev) + "^{tree}") 

613 

614 def iter_commits( 

615 self, 

616 rev: Union[str, Commit, "SymbolicReference", None] = None, 

617 paths: Union[PathLike, Sequence[PathLike]] = "", 

618 **kwargs: Any, 

619 ) -> Iterator[Commit]: 

620 """A list of Commit objects representing the history of a given ref/commit 

621 

622 :param rev: 

623 revision specifier, see git-rev-parse for viable options. 

624 If None, the active branch will be used. 

625 

626 :param paths: 

627 is an optional path or a list of paths; if set only commits that include the path 

628 or paths will be returned 

629 

630 :param kwargs: 

631 Arguments to be passed to git-rev-list - common ones are 

632 max_count and skip 

633 

634 :note: to receive only commits between two named revisions, use the 

635 "revA...revB" revision specifier 

636 

637 :return: ``git.Commit[]``""" 

638 if rev is None: 

639 rev = self.head.commit 

640 

641 return Commit.iter_items(self, rev, paths, **kwargs) 

642 

643 def merge_base(self, *rev: TBD, **kwargs: Any) -> List[Union[Commit_ish, None]]: 

644 """Find the closest common ancestor for the given revision (e.g. Commits, Tags, References, etc) 

645 

646 :param rev: At least two revs to find the common ancestor for. 

647 :param kwargs: Additional arguments to be passed to the repo.git.merge_base() command which does all the work. 

648 :return: A list of Commit objects. If --all was not specified as kwarg, the list will have at max one Commit, 

649 or is empty if no common merge base exists. 

650 :raises ValueError: If not at least two revs are provided 

651 """ 

652 if len(rev) < 2: 

653 raise ValueError("Please specify at least two revs, got only %i" % len(rev)) 

654 # end handle input 

655 

656 res: List[Union[Commit_ish, None]] = [] 

657 try: 

658 lines = self.git.merge_base(*rev, **kwargs).splitlines() # List[str] 

659 except GitCommandError as err: 

660 if err.status == 128: 

661 raise 

662 # end handle invalid rev 

663 # Status code 1 is returned if there is no merge-base 

664 # (see https://github.com/git/git/blob/master/builtin/merge-base.c#L16) 

665 return res 

666 # end exception handling 

667 

668 for line in lines: 

669 res.append(self.commit(line)) 

670 # end for each merge-base 

671 

672 return res 

673 

674 def is_ancestor(self, ancestor_rev: "Commit", rev: "Commit") -> bool: 

675 """Check if a commit is an ancestor of another 

676 

677 :param ancestor_rev: Rev which should be an ancestor 

678 :param rev: Rev to test against ancestor_rev 

679 :return: ``True``, ancestor_rev is an ancestor to rev. 

680 """ 

681 try: 

682 self.git.merge_base(ancestor_rev, rev, is_ancestor=True) 

683 except GitCommandError as err: 

684 if err.status == 1: 

685 return False 

686 raise 

687 return True 

688 

689 def is_valid_object(self, sha: str, object_type: Union[str, None] = None) -> bool: 

690 try: 

691 complete_sha = self.odb.partial_to_complete_sha_hex(sha) 

692 object_info = self.odb.info(complete_sha) 

693 if object_type: 

694 if object_info.type == object_type.encode(): 

695 return True 

696 else: 

697 log.debug( 

698 "Commit hash points to an object of type '%s'. Requested were objects of type '%s'", 

699 object_info.type.decode(), 

700 object_type, 

701 ) 

702 return False 

703 else: 

704 return True 

705 except BadObject: 

706 log.debug("Commit hash is invalid.") 

707 return False 

708 

709 def _get_daemon_export(self) -> bool: 

710 if self.git_dir: 

711 filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) 

712 return osp.exists(filename) 

713 

714 def _set_daemon_export(self, value: object) -> None: 

715 if self.git_dir: 

716 filename = osp.join(self.git_dir, self.DAEMON_EXPORT_FILE) 

717 fileexists = osp.exists(filename) 

718 if value and not fileexists: 

719 touch(filename) 

720 elif not value and fileexists: 

721 os.unlink(filename) 

722 

723 daemon_export = property( 

724 _get_daemon_export, 

725 _set_daemon_export, 

726 doc="If True, git-daemon may export this repository", 

727 ) 

728 del _get_daemon_export 

729 del _set_daemon_export 

730 

731 def _get_alternates(self) -> List[str]: 

732 """The list of alternates for this repo from which objects can be retrieved 

733 

734 :return: list of strings being pathnames of alternates""" 

735 if self.git_dir: 

736 alternates_path = osp.join(self.git_dir, "objects", "info", "alternates") 

737 

738 if osp.exists(alternates_path): 

739 with open(alternates_path, "rb") as f: 

740 alts = f.read().decode(defenc) 

741 return alts.strip().splitlines() 

742 return [] 

743 

744 def _set_alternates(self, alts: List[str]) -> None: 

745 """Sets the alternates 

746 

747 :param alts: 

748 is the array of string paths representing the alternates at which 

749 git should look for objects, i.e. /home/user/repo/.git/objects 

750 

751 :raise NoSuchPathError: 

752 :note: 

753 The method does not check for the existence of the paths in alts 

754 as the caller is responsible.""" 

755 alternates_path = osp.join(self.common_dir, "objects", "info", "alternates") 

756 if not alts: 

757 if osp.isfile(alternates_path): 

758 os.remove(alternates_path) 

759 else: 

760 with open(alternates_path, "wb") as f: 

761 f.write("\n".join(alts).encode(defenc)) 

762 

763 alternates = property( 

764 _get_alternates, 

765 _set_alternates, 

766 doc="Retrieve a list of alternates paths or set a list paths to be used as alternates", 

767 ) 

768 

769 def is_dirty( 

770 self, 

771 index: bool = True, 

772 working_tree: bool = True, 

773 untracked_files: bool = False, 

774 submodules: bool = True, 

775 path: Optional[PathLike] = None, 

776 ) -> bool: 

777 """ 

778 :return: 

779 ``True``, the repository is considered dirty. By default it will react 

780 like a git-status without untracked files, hence it is dirty if the 

781 index or the working copy have changes.""" 

782 if self._bare: 

783 # Bare repositories with no associated working directory are 

784 # always considered to be clean. 

785 return False 

786 

787 # start from the one which is fastest to evaluate 

788 default_args = ["--abbrev=40", "--full-index", "--raw"] 

789 if not submodules: 

790 default_args.append("--ignore-submodules") 

791 if path: 

792 default_args.extend(["--", str(path)]) 

793 if index: 

794 # diff index against HEAD 

795 if osp.isfile(self.index.path) and len(self.git.diff("--cached", *default_args)): 

796 return True 

797 # END index handling 

798 if working_tree: 

799 # diff index against working tree 

800 if len(self.git.diff(*default_args)): 

801 return True 

802 # END working tree handling 

803 if untracked_files: 

804 if len(self._get_untracked_files(path, ignore_submodules=not submodules)): 

805 return True 

806 # END untracked files 

807 return False 

808 

809 @property 

810 def untracked_files(self) -> List[str]: 

811 """ 

812 :return: 

813 list(str,...) 

814 

815 Files currently untracked as they have not been staged yet. Paths 

816 are relative to the current working directory of the git command. 

817 

818 :note: 

819 ignored files will not appear here, i.e. files mentioned in .gitignore 

820 :note: 

821 This property is expensive, as no cache is involved. To process the result, please 

822 consider caching it yourself.""" 

823 return self._get_untracked_files() 

824 

825 def _get_untracked_files(self, *args: Any, **kwargs: Any) -> List[str]: 

826 # make sure we get all files, not only untracked directories 

827 proc = self.git.status(*args, porcelain=True, untracked_files=True, as_process=True, **kwargs) 

828 # Untracked files prefix in porcelain mode 

829 prefix = "?? " 

830 untracked_files = [] 

831 for line in proc.stdout: 

832 line = line.decode(defenc) 

833 if not line.startswith(prefix): 

834 continue 

835 filename = line[len(prefix) :].rstrip("\n") 

836 # Special characters are escaped 

837 if filename[0] == filename[-1] == '"': 

838 filename = filename[1:-1] 

839 # WHATEVER ... it's a mess, but works for me 

840 filename = filename.encode("ascii").decode("unicode_escape").encode("latin1").decode(defenc) 

841 untracked_files.append(filename) 

842 finalize_process(proc) 

843 return untracked_files 

844 

845 def ignored(self, *paths: PathLike) -> List[str]: 

846 """Checks if paths are ignored via .gitignore 

847 Doing so using the "git check-ignore" method. 

848 

849 :param paths: List of paths to check whether they are ignored or not 

850 :return: subset of those paths which are ignored 

851 """ 

852 try: 

853 proc: str = self.git.check_ignore(*paths) 

854 except GitCommandError: 

855 return [] 

856 return proc.replace("\\\\", "\\").replace('"', "").split("\n") 

857 

858 @property 

859 def active_branch(self) -> Head: 

860 """The name of the currently active branch. 

861 

862 :raises TypeError: If HEAD is detached 

863 :return: Head to the active branch""" 

864 # reveal_type(self.head.reference) # => Reference 

865 return self.head.reference 

866 

867 def blame_incremental(self, rev: str | HEAD, file: str, **kwargs: Any) -> Iterator["BlameEntry"]: 

868 """Iterator for blame information for the given file at the given revision. 

869 

870 Unlike .blame(), this does not return the actual file's contents, only 

871 a stream of BlameEntry tuples. 

872 

873 :param rev: revision specifier, see git-rev-parse for viable options. 

874 :return: lazy iterator of BlameEntry tuples, where the commit 

875 indicates the commit to blame for the line, and range 

876 indicates a span of line numbers in the resulting file. 

877 

878 If you combine all line number ranges outputted by this command, you 

879 should get a continuous range spanning all line numbers in the file. 

880 """ 

881 

882 data: bytes = self.git.blame(rev, "--", file, p=True, incremental=True, stdout_as_string=False, **kwargs) 

883 commits: Dict[bytes, Commit] = {} 

884 

885 stream = (line for line in data.split(b"\n") if line) 

886 while True: 

887 try: 

888 line = next(stream) # when exhausted, causes a StopIteration, terminating this function 

889 except StopIteration: 

890 return 

891 split_line = line.split() 

892 hexsha, orig_lineno_b, lineno_b, num_lines_b = split_line 

893 lineno = int(lineno_b) 

894 num_lines = int(num_lines_b) 

895 orig_lineno = int(orig_lineno_b) 

896 if hexsha not in commits: 

897 # Now read the next few lines and build up a dict of properties 

898 # for this commit 

899 props: Dict[bytes, bytes] = {} 

900 while True: 

901 try: 

902 line = next(stream) 

903 except StopIteration: 

904 return 

905 if line == b"boundary": 

906 # "boundary" indicates a root commit and occurs 

907 # instead of the "previous" tag 

908 continue 

909 

910 tag, value = line.split(b" ", 1) 

911 props[tag] = value 

912 if tag == b"filename": 

913 # "filename" formally terminates the entry for --incremental 

914 orig_filename = value 

915 break 

916 

917 c = Commit( 

918 self, 

919 hex_to_bin(hexsha), 

920 author=Actor( 

921 safe_decode(props[b"author"]), 

922 safe_decode(props[b"author-mail"].lstrip(b"<").rstrip(b">")), 

923 ), 

924 authored_date=int(props[b"author-time"]), 

925 committer=Actor( 

926 safe_decode(props[b"committer"]), 

927 safe_decode(props[b"committer-mail"].lstrip(b"<").rstrip(b">")), 

928 ), 

929 committed_date=int(props[b"committer-time"]), 

930 ) 

931 commits[hexsha] = c 

932 else: 

933 # Discard all lines until we find "filename" which is 

934 # guaranteed to be the last line 

935 while True: 

936 try: 

937 line = next(stream) # will fail if we reach the EOF unexpectedly 

938 except StopIteration: 

939 return 

940 tag, value = line.split(b" ", 1) 

941 if tag == b"filename": 

942 orig_filename = value 

943 break 

944 

945 yield BlameEntry( 

946 commits[hexsha], 

947 range(lineno, lineno + num_lines), 

948 safe_decode(orig_filename), 

949 range(orig_lineno, orig_lineno + num_lines), 

950 ) 

951 

952 def blame( 

953 self, 

954 rev: Union[str, HEAD], 

955 file: str, 

956 incremental: bool = False, 

957 rev_opts: Optional[List[str]] = None, 

958 **kwargs: Any 

959 ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None: 

960 """The blame information for the given file at the given revision. 

961 

962 :param rev: revision specifier, see git-rev-parse for viable options. 

963 :return: 

964 list: [git.Commit, list: [<line>]] 

965 A list of lists associating a Commit object with a list of lines that 

966 changed within the given commit. The Commit objects will be given in order 

967 of appearance.""" 

968 if incremental: 

969 return self.blame_incremental(rev, file, **kwargs) 

970 rev_opts = rev_opts or [] 

971 data: bytes = self.git.blame(rev, *rev_opts, "--", file, p=True, stdout_as_string=False, **kwargs) 

972 commits: Dict[str, Commit] = {} 

973 blames: List[List[Commit | List[str | bytes] | None]] = [] 

974 

975 class InfoTD(TypedDict, total=False): 

976 sha: str 

977 id: str 

978 filename: str 

979 summary: str 

980 author: str 

981 author_email: str 

982 author_date: int 

983 committer: str 

984 committer_email: str 

985 committer_date: int 

986 

987 info: InfoTD = {} 

988 

989 keepends = True 

990 for line_bytes in data.splitlines(keepends): 

991 try: 

992 line_str = line_bytes.rstrip().decode(defenc) 

993 except UnicodeDecodeError: 

994 firstpart = "" 

995 parts = [] 

996 is_binary = True 

997 else: 

998 # As we don't have an idea when the binary data ends, as it could contain multiple newlines 

999 # in the process. So we rely on being able to decode to tell us what is is. 

1000 # This can absolutely fail even on text files, but even if it does, we should be fine treating it 

1001 # as binary instead 

1002 parts = self.re_whitespace.split(line_str, 1) 

1003 firstpart = parts[0] 

1004 is_binary = False 

1005 # end handle decode of line 

1006 

1007 if self.re_hexsha_only.search(firstpart): 

1008 # handles 

1009 # 634396b2f541a9f2d58b00be1a07f0c358b999b3 1 1 7 - indicates blame-data start 

1010 # 634396b2f541a9f2d58b00be1a07f0c358b999b3 2 2 - indicates 

1011 # another line of blame with the same data 

1012 digits = parts[-1].split(" ") 

1013 if len(digits) == 3: 

1014 info = {"id": firstpart} 

1015 blames.append([None, []]) 

1016 elif info["id"] != firstpart: 

1017 info = {"id": firstpart} 

1018 blames.append([commits.get(firstpart), []]) 

1019 # END blame data initialization 

1020 else: 

1021 m = self.re_author_committer_start.search(firstpart) 

1022 if m: 

1023 # handles: 

1024 # author Tom Preston-Werner 

1025 # author-mail <tom@mojombo.com> 

1026 # author-time 1192271832 

1027 # author-tz -0700 

1028 # committer Tom Preston-Werner 

1029 # committer-mail <tom@mojombo.com> 

1030 # committer-time 1192271832 

1031 # committer-tz -0700 - IGNORED BY US 

1032 role = m.group(0) 

1033 if role == "author": 

1034 if firstpart.endswith("-mail"): 

1035 info["author_email"] = parts[-1] 

1036 elif firstpart.endswith("-time"): 

1037 info["author_date"] = int(parts[-1]) 

1038 elif role == firstpart: 

1039 info["author"] = parts[-1] 

1040 elif role == "committer": 

1041 if firstpart.endswith("-mail"): 

1042 info["committer_email"] = parts[-1] 

1043 elif firstpart.endswith("-time"): 

1044 info["committer_date"] = int(parts[-1]) 

1045 elif role == firstpart: 

1046 info["committer"] = parts[-1] 

1047 # END distinguish mail,time,name 

1048 else: 

1049 # handle 

1050 # filename lib/grit.rb 

1051 # summary add Blob 

1052 # <and rest> 

1053 if firstpart.startswith("filename"): 

1054 info["filename"] = parts[-1] 

1055 elif firstpart.startswith("summary"): 

1056 info["summary"] = parts[-1] 

1057 elif firstpart == "": 

1058 if info: 

1059 sha = info["id"] 

1060 c = commits.get(sha) 

1061 if c is None: 

1062 c = Commit( 

1063 self, 

1064 hex_to_bin(sha), 

1065 author=Actor._from_string(f"{info['author']} {info['author_email']}"), 

1066 authored_date=info["author_date"], 

1067 committer=Actor._from_string(f"{info['committer']} {info['committer_email']}"), 

1068 committed_date=info["committer_date"], 

1069 ) 

1070 commits[sha] = c 

1071 blames[-1][0] = c 

1072 # END if commit objects needs initial creation 

1073 

1074 if blames[-1][1] is not None: 

1075 line: str | bytes 

1076 if not is_binary: 

1077 if line_str and line_str[0] == "\t": 

1078 line_str = line_str[1:] 

1079 line = line_str 

1080 else: 

1081 line = line_bytes 

1082 # NOTE: We are actually parsing lines out of binary data, which can lead to the 

1083 # binary being split up along the newline separator. We will append this to the 

1084 # blame we are currently looking at, even though it should be concatenated with 

1085 # the last line we have seen. 

1086 blames[-1][1].append(line) 

1087 

1088 info = {"id": sha} 

1089 # END if we collected commit info 

1090 # END distinguish filename,summary,rest 

1091 # END distinguish author|committer vs filename,summary,rest 

1092 # END distinguish hexsha vs other information 

1093 return blames 

1094 

1095 @classmethod 

1096 def init( 

1097 cls, 

1098 path: Union[PathLike, None] = None, 

1099 mkdir: bool = True, 

1100 odbt: Type[GitCmdObjectDB] = GitCmdObjectDB, 

1101 expand_vars: bool = True, 

1102 **kwargs: Any, 

1103 ) -> "Repo": 

1104 """Initialize a git repository at the given path if specified 

1105 

1106 :param path: 

1107 is the full path to the repo (traditionally ends with /<name>.git) 

1108 or None in which case the repository will be created in the current 

1109 working directory 

1110 

1111 :param mkdir: 

1112 if specified will create the repository directory if it doesn't 

1113 already exists. Creates the directory with a mode=0755. 

1114 Only effective if a path is explicitly given 

1115 

1116 :param odbt: 

1117 Object DataBase type - a type which is constructed by providing 

1118 the directory containing the database objects, i.e. .git/objects. 

1119 It will be used to access all object data 

1120 

1121 :param expand_vars: 

1122 if specified, environment variables will not be escaped. This 

1123 can lead to information disclosure, allowing attackers to 

1124 access the contents of environment variables 

1125 

1126 :param kwargs: 

1127 keyword arguments serving as additional options to the git-init command 

1128 

1129 :return: ``git.Repo`` (the newly created repo)""" 

1130 if path: 

1131 path = expand_path(path, expand_vars) 

1132 if mkdir and path and not osp.exists(path): 

1133 os.makedirs(path, 0o755) 

1134 

1135 # git command automatically chdir into the directory 

1136 git = cls.GitCommandWrapperType(path) 

1137 git.init(**kwargs) 

1138 return cls(path, odbt=odbt) 

1139 

1140 @classmethod 

1141 def _clone( 

1142 cls, 

1143 git: "Git", 

1144 url: PathLike, 

1145 path: PathLike, 

1146 odb_default_type: Type[GitCmdObjectDB], 

1147 progress: Union["RemoteProgress", "UpdateProgress", Callable[..., "RemoteProgress"], None] = None, 

1148 multi_options: Optional[List[str]] = None, 

1149 **kwargs: Any, 

1150 ) -> "Repo": 

1151 odbt = kwargs.pop("odbt", odb_default_type) 

1152 

1153 # when pathlib.Path or other classbased path is passed 

1154 if not isinstance(path, str): 

1155 path = str(path) 

1156 

1157 ## A bug win cygwin's Git, when `--bare` or `--separate-git-dir` 

1158 # it prepends the cwd or(?) the `url` into the `path, so:: 

1159 # git clone --bare /cygwin/d/foo.git C:\\Work 

1160 # becomes:: 

1161 # git clone --bare /cygwin/d/foo.git /cygwin/d/C:\\Work 

1162 # 

1163 clone_path = Git.polish_url(path) if Git.is_cygwin() and "bare" in kwargs else path 

1164 sep_dir = kwargs.get("separate_git_dir") 

1165 if sep_dir: 

1166 kwargs["separate_git_dir"] = Git.polish_url(sep_dir) 

1167 multi = None 

1168 if multi_options: 

1169 multi = shlex.split(" ".join(multi_options)) 

1170 proc = git.clone( 

1171 multi, 

1172 Git.polish_url(str(url)), 

1173 clone_path, 

1174 with_extended_output=True, 

1175 as_process=True, 

1176 v=True, 

1177 universal_newlines=True, 

1178 **add_progress(kwargs, git, progress), 

1179 ) 

1180 if progress: 

1181 handle_process_output( 

1182 proc, 

1183 None, 

1184 to_progress_instance(progress).new_message_handler(), 

1185 finalize_process, 

1186 decode_streams=False, 

1187 ) 

1188 else: 

1189 (stdout, stderr) = proc.communicate() 

1190 cmdline = getattr(proc, "args", "") 

1191 cmdline = remove_password_if_present(cmdline) 

1192 

1193 log.debug("Cmd(%s)'s unused stdout: %s", cmdline, stdout) 

1194 finalize_process(proc, stderr=stderr) 

1195 

1196 # our git command could have a different working dir than our actual 

1197 # environment, hence we prepend its working dir if required 

1198 if not osp.isabs(path): 

1199 path = osp.join(git._working_dir, path) if git._working_dir is not None else path 

1200 

1201 repo = cls(path, odbt=odbt) 

1202 

1203 # retain env values that were passed to _clone() 

1204 repo.git.update_environment(**git.environment()) 

1205 

1206 # adjust remotes - there may be operating systems which use backslashes, 

1207 # These might be given as initial paths, but when handling the config file 

1208 # that contains the remote from which we were clones, git stops liking it 

1209 # as it will escape the backslashes. Hence we undo the escaping just to be 

1210 # sure 

1211 if repo.remotes: 

1212 with repo.remotes[0].config_writer as writer: 

1213 writer.set_value("url", Git.polish_url(repo.remotes[0].url)) 

1214 # END handle remote repo 

1215 return repo 

1216 

1217 def clone( 

1218 self, 

1219 path: PathLike, 

1220 progress: Optional[Callable] = None, 

1221 multi_options: Optional[List[str]] = None, 

1222 **kwargs: Any, 

1223 ) -> "Repo": 

1224 """Create a clone from this repository. 

1225 

1226 :param path: is the full path of the new repo (traditionally ends with ./<name>.git). 

1227 :param progress: See 'git.remote.Remote.push'. 

1228 :param multi_options: A list of Clone options that can be provided multiple times. One 

1229 option per list item which is passed exactly as specified to clone. 

1230 For example ['--config core.filemode=false', '--config core.ignorecase', 

1231 '--recurse-submodule=repo1_path', '--recurse-submodule=repo2_path'] 

1232 :param kwargs: 

1233 * odbt = ObjectDatabase Type, allowing to determine the object database 

1234 implementation used by the returned Repo instance 

1235 * All remaining keyword arguments are given to the git-clone command 

1236 

1237 :return: ``git.Repo`` (the newly cloned repo)""" 

1238 return self._clone( 

1239 self.git, 

1240 self.common_dir, 

1241 path, 

1242 type(self.odb), 

1243 progress, 

1244 multi_options, 

1245 **kwargs, 

1246 ) 

1247 

1248 @classmethod 

1249 def clone_from( 

1250 cls, 

1251 url: PathLike, 

1252 to_path: PathLike, 

1253 progress: Optional[Callable] = None, 

1254 env: Optional[Mapping[str, str]] = None, 

1255 multi_options: Optional[List[str]] = None, 

1256 **kwargs: Any, 

1257 ) -> "Repo": 

1258 """Create a clone from the given URL 

1259 

1260 :param url: valid git url, see http://www.kernel.org/pub/software/scm/git/docs/git-clone.html#URLS 

1261 :param to_path: Path to which the repository should be cloned to 

1262 :param progress: See 'git.remote.Remote.push'. 

1263 :param env: Optional dictionary containing the desired environment variables. 

1264 Note: Provided variables will be used to update the execution 

1265 environment for `git`. If some variable is not specified in `env` 

1266 and is defined in `os.environ`, value from `os.environ` will be used. 

1267 If you want to unset some variable, consider providing empty string 

1268 as its value. 

1269 :param multi_options: See ``clone`` method 

1270 :param kwargs: see the ``clone`` method 

1271 :return: Repo instance pointing to the cloned directory""" 

1272 git = cls.GitCommandWrapperType(os.getcwd()) 

1273 if env is not None: 

1274 git.update_environment(**env) 

1275 return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs) 

1276 

1277 def archive( 

1278 self, 

1279 ostream: Union[TextIO, BinaryIO], 

1280 treeish: Optional[str] = None, 

1281 prefix: Optional[str] = None, 

1282 **kwargs: Any, 

1283 ) -> Repo: 

1284 """Archive the tree at the given revision. 

1285 

1286 :param ostream: file compatible stream object to which the archive will be written as bytes 

1287 :param treeish: is the treeish name/id, defaults to active branch 

1288 :param prefix: is the optional prefix to prepend to each filename in the archive 

1289 :param kwargs: Additional arguments passed to git-archive 

1290 

1291 * Use the 'format' argument to define the kind of format. Use 

1292 specialized ostreams to write any format supported by python. 

1293 * You may specify the special **path** keyword, which may either be a repository-relative 

1294 path to a directory or file to place into the archive, or a list or tuple of multiple paths. 

1295 

1296 :raise GitCommandError: in case something went wrong 

1297 :return: self""" 

1298 if treeish is None: 

1299 treeish = self.head.commit 

1300 if prefix and "prefix" not in kwargs: 

1301 kwargs["prefix"] = prefix 

1302 kwargs["output_stream"] = ostream 

1303 path = kwargs.pop("path", []) 

1304 path = cast(Union[PathLike, List[PathLike], Tuple[PathLike, ...]], path) 

1305 if not isinstance(path, (tuple, list)): 

1306 path = [path] 

1307 # end assure paths is list 

1308 self.git.archive(treeish, *path, **kwargs) 

1309 return self 

1310 

1311 def has_separate_working_tree(self) -> bool: 

1312 """ 

1313 :return: True if our git_dir is not at the root of our working_tree_dir, but a .git file with a 

1314 platform agnositic symbolic link. Our git_dir will be wherever the .git file points to 

1315 :note: bare repositories will always return False here 

1316 """ 

1317 if self.bare: 

1318 return False 

1319 if self.working_tree_dir: 

1320 return osp.isfile(osp.join(self.working_tree_dir, ".git")) 

1321 else: 

1322 return False # or raise Error? 

1323 

1324 rev_parse = rev_parse 

1325 

1326 def __repr__(self) -> str: 

1327 clazz = self.__class__ 

1328 return "<%s.%s %r>" % (clazz.__module__, clazz.__name__, self.git_dir) 

1329 

1330 def currently_rebasing_on(self) -> Commit | None: 

1331 """ 

1332 :return: The commit which is currently being replayed while rebasing. 

1333 

1334 None if we are not currently rebasing. 

1335 """ 

1336 if self.git_dir: 

1337 rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD") 

1338 if not osp.isfile(rebase_head_file): 

1339 return None 

1340 return self.commit(open(rebase_head_file, "rt").readline().strip())