Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/util.py: 29%

538 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# utils.py 

2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

3# 

4# This module is part of GitPython and is released under 

5# the BSD License: http://www.opensource.org/licenses/bsd-license.php 

6 

7from abc import abstractmethod 

8import os.path as osp 

9from .compat import is_win 

10import contextlib 

11from functools import wraps 

12import getpass 

13import logging 

14import os 

15import platform 

16import subprocess 

17import re 

18import shutil 

19import stat 

20from sys import maxsize 

21import time 

22from urllib.parse import urlsplit, urlunsplit 

23import warnings 

24 

25# from git.objects.util import Traversable 

26 

27# typing --------------------------------------------------------- 

28 

29from typing import ( 

30 Any, 

31 AnyStr, 

32 BinaryIO, 

33 Callable, 

34 Dict, 

35 Generator, 

36 IO, 

37 Iterator, 

38 List, 

39 Optional, 

40 Pattern, 

41 Sequence, 

42 Tuple, 

43 TypeVar, 

44 Union, 

45 cast, 

46 TYPE_CHECKING, 

47 overload, 

48) 

49 

50import pathlib 

51 

52if TYPE_CHECKING: 52 ↛ 53line 52 didn't jump to line 53, because the condition on line 52 was never true

53 from git.remote import Remote 

54 from git.repo.base import Repo 

55 from git.config import GitConfigParser, SectionConstraint 

56 from git import Git 

57 

58 # from git.objects.base import IndexObject 

59 

60 

61from .types import ( 

62 Literal, 

63 SupportsIndex, 

64 Protocol, 

65 runtime_checkable, # because behind py version guards 

66 PathLike, 

67 HSH_TD, 

68 Total_TD, 

69 Files_TD, # aliases 

70 Has_id_attribute, 

71) 

72 

73T_IterableObj = TypeVar("T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True) 

74# So IterableList[Head] is subtype of IterableList[IterableObj] 

75 

76# --------------------------------------------------------------------- 

77 

78 

79from gitdb.util import ( # NOQA @IgnorePep8 

80 make_sha, 

81 LockedFD, # @UnusedImport 

82 file_contents_ro, # @UnusedImport 

83 file_contents_ro_filepath, # @UnusedImport 

84 LazyMixin, # @UnusedImport 

85 to_hex_sha, # @UnusedImport 

86 to_bin_sha, # @UnusedImport 

87 bin_to_hex, # @UnusedImport 

88 hex_to_bin, # @UnusedImport 

89) 

90 

91 

92# NOTE: Some of the unused imports might be used/imported by others. 

93# Handle once test-cases are back up and running. 

94# Most of these are unused here, but are for use by git-python modules so these 

95# don't see gitdb all the time. Flake of course doesn't like it. 

96__all__ = [ 

97 "stream_copy", 

98 "join_path", 

99 "to_native_path_linux", 

100 "join_path_native", 

101 "Stats", 

102 "IndexFileSHA1Writer", 

103 "IterableObj", 

104 "IterableList", 

105 "BlockingLockFile", 

106 "LockFile", 

107 "Actor", 

108 "get_user_id", 

109 "assure_directory_exists", 

110 "RemoteProgress", 

111 "CallableRemoteProgress", 

112 "rmtree", 

113 "unbare_repo", 

114 "HIDE_WINDOWS_KNOWN_ERRORS", 

115] 

116 

117log = logging.getLogger(__name__) 

118 

119# types############################################################ 

120 

121 

122#: We need an easy way to see if Appveyor TCs start failing, 

123#: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy, 

124#: till then, we wish to hide them. 

125HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_KNOWN_ERRORS", True) 

126HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_FREEZE_ERRORS", True) 

127 

128# { Utility Methods 

129 

130T = TypeVar("T") 

131 

132 

133def unbare_repo(func: Callable[..., T]) -> Callable[..., T]: 

134 """Methods with this decorator raise InvalidGitRepositoryError if they 

135 encounter a bare repository""" 

136 

137 from .exc import InvalidGitRepositoryError 

138 

139 @wraps(func) 

140 def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T: 

141 if self.repo.bare: 

142 raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__) 

143 # END bare method 

144 return func(self, *args, **kwargs) 

145 

146 # END wrapper 

147 

148 return wrapper 

149 

150 

151@contextlib.contextmanager 

152def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]: 

153 old_dir = os.getcwd() 

154 os.chdir(new_dir) 

155 try: 

156 yield new_dir 

157 finally: 

158 os.chdir(old_dir) 

159 

160 

161def rmtree(path: PathLike) -> None: 

162 """Remove the given recursively. 

163 

164 :note: we use shutil rmtree but adjust its behaviour to see whether files that 

165 couldn't be deleted are read-only. Windows will not remove them in that case""" 

166 

167 def onerror(func: Callable, path: PathLike, exc_info: str) -> None: 

168 # Is the error an access error ? 

169 os.chmod(path, stat.S_IWUSR) 

170 

171 try: 

172 func(path) # Will scream if still not possible to delete. 

173 except Exception as ex: 

174 if HIDE_WINDOWS_KNOWN_ERRORS: 

175 from unittest import SkipTest 

176 

177 raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex 

178 raise 

179 

180 return shutil.rmtree(path, False, onerror) 

181 

182 

183def rmfile(path: PathLike) -> None: 

184 """Ensure file deleted also on *Windows* where read-only files need special treatment.""" 

185 if osp.isfile(path): 

186 if is_win: 

187 os.chmod(path, 0o777) 

188 os.remove(path) 

189 

190 

191def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int: 

192 """Copy all data from the source stream into the destination stream in chunks 

193 of size chunk_size 

194 

195 :return: amount of bytes written""" 

196 br = 0 

197 while True: 

198 chunk = source.read(chunk_size) 

199 destination.write(chunk) 

200 br += len(chunk) 

201 if len(chunk) < chunk_size: 

202 break 

203 # END reading output stream 

204 return br 

205 

206 

207def join_path(a: PathLike, *p: PathLike) -> PathLike: 

208 """Join path tokens together similar to osp.join, but always use 

209 '/' instead of possibly '\' on windows.""" 

210 path = str(a) 

211 for b in p: 

212 b = str(b) 

213 if not b: 

214 continue 

215 if b.startswith("/"): 

216 path += b[1:] 

217 elif path == "" or path.endswith("/"): 

218 path += b 

219 else: 

220 path += "/" + b 

221 # END for each path token to add 

222 return path 

223 

224 

225if is_win: 225 ↛ 227line 225 didn't jump to line 227, because the condition on line 225 was never true

226 

227 def to_native_path_windows(path: PathLike) -> PathLike: 

228 path = str(path) 

229 return path.replace("/", "\\") 

230 

231 def to_native_path_linux(path: PathLike) -> str: 

232 path = str(path) 

233 return path.replace("\\", "/") 

234 

235 __all__.append("to_native_path_windows") 

236 to_native_path = to_native_path_windows 

237else: 

238 # no need for any work on linux 

239 def to_native_path_linux(path: PathLike) -> str: 

240 return str(path) 

241 

242 to_native_path = to_native_path_linux 

243 

244 

245def join_path_native(a: PathLike, *p: PathLike) -> PathLike: 

246 """ 

247 As join path, but makes sure an OS native path is returned. This is only 

248 needed to play it safe on my dear windows and to assure nice paths that only 

249 use '\'""" 

250 return to_native_path(join_path(a, *p)) 

251 

252 

253def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool: 

254 """Assure that the directory pointed to by path exists. 

255 

256 :param is_file: If True, path is assumed to be a file and handled correctly. 

257 Otherwise it must be a directory 

258 :return: True if the directory was created, False if it already existed""" 

259 if is_file: 

260 path = osp.dirname(path) 

261 # END handle file 

262 if not osp.isdir(path): 

263 os.makedirs(path, exist_ok=True) 

264 return True 

265 return False 

266 

267 

268def _get_exe_extensions() -> Sequence[str]: 

269 PATHEXT = os.environ.get("PATHEXT", None) 

270 return ( 

271 tuple(p.upper() for p in PATHEXT.split(os.pathsep)) if PATHEXT else (".BAT", "COM", ".EXE") if is_win else ("") 

272 ) 

273 

274 

275def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: 

276 # From: http://stackoverflow.com/a/377028/548792 

277 winprog_exts = _get_exe_extensions() 

278 

279 def is_exec(fpath: str) -> bool: 

280 return ( 

281 osp.isfile(fpath) 

282 and os.access(fpath, os.X_OK) 

283 and (os.name != "nt" or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts)) 

284 ) 

285 

286 progs = [] 

287 if not path: 

288 path = os.environ["PATH"] 

289 for folder in str(path).split(os.pathsep): 

290 folder = folder.strip('"') 

291 if folder: 

292 exe_path = osp.join(folder, program) 

293 for f in [exe_path] + ["%s%s" % (exe_path, e) for e in winprog_exts]: 

294 if is_exec(f): 

295 progs.append(f) 

296 return progs 

297 

298 

299def _cygexpath(drive: Optional[str], path: str) -> str: 

300 if osp.isabs(path) and not drive: 

301 # Invoked from `cygpath()` directly with `D:Apps\123`? 

302 # It's an error, leave it alone just slashes) 

303 p = path # convert to str if AnyPath given 

304 else: 

305 p = path and osp.normpath(osp.expandvars(osp.expanduser(path))) 

306 if osp.isabs(p): 

307 if drive: 

308 # Confusing, maybe a remote system should expand vars. 

309 p = path 

310 else: 

311 p = cygpath(p) 

312 elif drive: 

313 p = "/proc/cygdrive/%s/%s" % (drive.lower(), p) 

314 p_str = str(p) # ensure it is a str and not AnyPath 

315 return p_str.replace("\\", "/") 

316 

317 

318_cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = ( 318 ↛ exitline 318 didn't jump to the function exit

319 # See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx 

320 # and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths 

321 ( 

322 re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"), 

323 (lambda server, share, rest_path: "//%s/%s/%s" % (server, share, rest_path.replace("\\", "/"))), 

324 False, 

325 ), 

326 (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False), 

327 (re.compile(r"(\w):[/\\](.*)"), (_cygexpath), False), 

328 (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), True), 

329 (re.compile(r"(\w{2,}:.*)"), (lambda url: url), False), # remote URL, do nothing 

330) 

331 

332 

333def cygpath(path: str) -> str: 

334 """Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment.""" 

335 path = str(path) # ensure is str and not AnyPath. 

336 # Fix to use Paths when 3.5 dropped. or to be just str if only for urls? 

337 if not path.startswith(("/cygdrive", "//", "/proc/cygdrive")): 

338 for regex, parser, recurse in _cygpath_parsers: 

339 match = regex.match(path) 

340 if match: 

341 path = parser(*match.groups()) 

342 if recurse: 

343 path = cygpath(path) 

344 break 

345 else: 

346 path = _cygexpath(None, path) 

347 

348 return path 

349 

350 

351_decygpath_regex = re.compile(r"(?:/proc)?/cygdrive/(\w)(/.*)?") 

352 

353 

354def decygpath(path: PathLike) -> str: 

355 path = str(path) 

356 m = _decygpath_regex.match(path) 

357 if m: 

358 drive, rest_path = m.groups() 

359 path = "%s:%s" % (drive.upper(), rest_path or "") 

360 

361 return path.replace("/", "\\") 

362 

363 

364#: Store boolean flags denoting if a specific Git executable 

365#: is from a Cygwin installation (since `cache_lru()` unsupported on PY2). 

366_is_cygwin_cache: Dict[str, Optional[bool]] = {} 

367 

368 

369@overload 

370def is_cygwin_git(git_executable: None) -> Literal[False]: 

371 ... 

372 

373 

374@overload 

375def is_cygwin_git(git_executable: PathLike) -> bool: 

376 ... 

377 

378 

379def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool: 

380 if is_win: 

381 # is_win seems to be true only for Windows-native pythons 

382 # cygwin has os.name = posix, I think 

383 return False 

384 

385 if git_executable is None: 

386 return False 

387 

388 git_executable = str(git_executable) 

389 is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool] 

390 if is_cygwin is None: 

391 is_cygwin = False 

392 try: 

393 git_dir = osp.dirname(git_executable) 

394 if not git_dir: 

395 res = py_where(git_executable) 

396 git_dir = osp.dirname(res[0]) if res else "" 

397 

398 # Just a name given, not a real path. 

399 uname_cmd = osp.join(git_dir, "uname") 

400 process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True) 

401 uname_out, _ = process.communicate() 

402 # retcode = process.poll() 

403 is_cygwin = "CYGWIN" in uname_out 

404 except Exception as ex: 

405 log.debug("Failed checking if running in CYGWIN due to: %r", ex) 

406 _is_cygwin_cache[git_executable] = is_cygwin 

407 

408 return is_cygwin 

409 

410 

411def get_user_id() -> str: 

412 """:return: string identifying the currently active system user as name@node""" 

413 return "%s@%s" % (getpass.getuser(), platform.node()) 

414 

415 

416def finalize_process(proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any) -> None: 

417 """Wait for the process (clone, fetch, pull or push) and handle its errors accordingly""" 

418 # TODO: No close proc-streams?? 

419 proc.wait(**kwargs) 

420 

421 

422@overload 

423def expand_path(p: None, expand_vars: bool = ...) -> None: 

424 ... 

425 

426 

427@overload 

428def expand_path(p: PathLike, expand_vars: bool = ...) -> str: 

429 # improve these overloads when 3.5 dropped 

430 ... 

431 

432 

433def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]: 

434 if isinstance(p, pathlib.Path): 434 ↛ 435line 434 didn't jump to line 435, because the condition on line 434 was never true

435 return p.resolve() 

436 try: 

437 p = osp.expanduser(p) # type: ignore 

438 if expand_vars: 

439 p = osp.expandvars(p) # type: ignore 

440 return osp.normpath(osp.abspath(p)) # type: ignore 

441 except Exception: 

442 return None 

443 

444 

445def remove_password_if_present(cmdline: Sequence[str]) -> List[str]: 

446 """ 

447 Parse any command line argument and if on of the element is an URL with a 

448 username and/or password, replace them by stars (in-place). 

449 

450 If nothing found just returns the command line as-is. 

451 

452 This should be used for every log line that print a command line, as well as 

453 exception messages. 

454 """ 

455 new_cmdline = [] 

456 for index, to_parse in enumerate(cmdline): 

457 new_cmdline.append(to_parse) 

458 try: 

459 url = urlsplit(to_parse) 

460 # Remove password from the URL if present 

461 if url.password is None and url.username is None: 461 ↛ 464line 461 didn't jump to line 464, because the condition on line 461 was never false

462 continue 

463 

464 if url.password is not None: 

465 url = url._replace(netloc=url.netloc.replace(url.password, "*****")) 

466 if url.username is not None: 

467 url = url._replace(netloc=url.netloc.replace(url.username, "*****")) 

468 new_cmdline[index] = urlunsplit(url) 

469 except ValueError: 

470 # This is not a valid URL 

471 continue 

472 return new_cmdline 

473 

474 

475# } END utilities 

476 

477# { Classes 

478 

479 

480class RemoteProgress(object): 

481 """ 

482 Handler providing an interface to parse progress information emitted by git-push 

483 and git-fetch and to dispatch callbacks allowing subclasses to react to the progress. 

484 """ 

485 

486 _num_op_codes: int = 9 

487 ( 

488 BEGIN, 

489 END, 

490 COUNTING, 

491 COMPRESSING, 

492 WRITING, 

493 RECEIVING, 

494 RESOLVING, 

495 FINDING_SOURCES, 

496 CHECKING_OUT, 

497 ) = [1 << x for x in range(_num_op_codes)] 

498 STAGE_MASK = BEGIN | END 

499 OP_MASK = ~STAGE_MASK 

500 

501 DONE_TOKEN = "done." 

502 TOKEN_SEPARATOR = ", " 

503 

504 __slots__ = ( 

505 "_cur_line", 

506 "_seen_ops", 

507 "error_lines", # Lines that started with 'error:' or 'fatal:'. 

508 "other_lines", 

509 ) # Lines not denoting progress (i.e.g. push-infos). 

510 re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)") 

511 re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)") 

512 

513 def __init__(self) -> None: 

514 self._seen_ops: List[int] = [] 

515 self._cur_line: Optional[str] = None 

516 self.error_lines: List[str] = [] 

517 self.other_lines: List[str] = [] 

518 

519 def _parse_progress_line(self, line: AnyStr) -> None: 

520 """Parse progress information from the given line as retrieved by git-push 

521 or git-fetch. 

522 

523 - Lines that do not contain progress info are stored in :attr:`other_lines`. 

524 - Lines that seem to contain an error (i.e. start with error: or fatal:) are stored 

525 in :attr:`error_lines`.""" 

526 # handle 

527 # Counting objects: 4, done. 

528 # Compressing objects: 50% (1/2) 

529 # Compressing objects: 100% (2/2) 

530 # Compressing objects: 100% (2/2), done. 

531 if isinstance(line, bytes): # mypy argues about ternary assignment 

532 line_str = line.decode("utf-8") 

533 else: 

534 line_str = line 

535 self._cur_line = line_str 

536 

537 if self._cur_line.startswith(("error:", "fatal:")): 

538 self.error_lines.append(self._cur_line) 

539 return 

540 

541 # find escape characters and cut them away - regex will not work with 

542 # them as they are non-ascii. As git might expect a tty, it will send them 

543 last_valid_index = None 

544 for i, c in enumerate(reversed(line_str)): 

545 if ord(c) < 32: 

546 # its a slice index 

547 last_valid_index = -i - 1 

548 # END character was non-ascii 

549 # END for each character in line 

550 if last_valid_index is not None: 

551 line_str = line_str[:last_valid_index] 

552 # END cut away invalid part 

553 line_str = line_str.rstrip() 

554 

555 cur_count, max_count = None, None 

556 match = self.re_op_relative.match(line_str) 

557 if match is None: 

558 match = self.re_op_absolute.match(line_str) 

559 

560 if not match: 

561 self.line_dropped(line_str) 

562 self.other_lines.append(line_str) 

563 return 

564 # END could not get match 

565 

566 op_code = 0 

567 _remote, op_name, _percent, cur_count, max_count, message = match.groups() 

568 

569 # get operation id 

570 if op_name == "Counting objects": 

571 op_code |= self.COUNTING 

572 elif op_name == "Compressing objects": 

573 op_code |= self.COMPRESSING 

574 elif op_name == "Writing objects": 

575 op_code |= self.WRITING 

576 elif op_name == "Receiving objects": 

577 op_code |= self.RECEIVING 

578 elif op_name == "Resolving deltas": 

579 op_code |= self.RESOLVING 

580 elif op_name == "Finding sources": 

581 op_code |= self.FINDING_SOURCES 

582 elif op_name == "Checking out files": 

583 op_code |= self.CHECKING_OUT 

584 else: 

585 # Note: On windows it can happen that partial lines are sent 

586 # Hence we get something like "CompreReceiving objects", which is 

587 # a blend of "Compressing objects" and "Receiving objects". 

588 # This can't really be prevented, so we drop the line verbosely 

589 # to make sure we get informed in case the process spits out new 

590 # commands at some point. 

591 self.line_dropped(line_str) 

592 # Note: Don't add this line to the other lines, as we have to silently 

593 # drop it 

594 return None 

595 # END handle op code 

596 

597 # figure out stage 

598 if op_code not in self._seen_ops: 

599 self._seen_ops.append(op_code) 

600 op_code |= self.BEGIN 

601 # END begin opcode 

602 

603 if message is None: 

604 message = "" 

605 # END message handling 

606 

607 message = message.strip() 

608 if message.endswith(self.DONE_TOKEN): 

609 op_code |= self.END 

610 message = message[: -len(self.DONE_TOKEN)] 

611 # END end message handling 

612 message = message.strip(self.TOKEN_SEPARATOR) 

613 

614 self.update( 

615 op_code, 

616 cur_count and float(cur_count), 

617 max_count and float(max_count), 

618 message, 

619 ) 

620 

621 def new_message_handler(self) -> Callable[[str], None]: 

622 """ 

623 :return: 

624 a progress handler suitable for handle_process_output(), passing lines on to this Progress 

625 handler in a suitable format""" 

626 

627 def handler(line: AnyStr) -> None: 

628 return self._parse_progress_line(line.rstrip()) 

629 

630 # end 

631 return handler 

632 

633 def line_dropped(self, line: str) -> None: 

634 """Called whenever a line could not be understood and was therefore dropped.""" 

635 pass 

636 

637 def update( 

638 self, 

639 op_code: int, 

640 cur_count: Union[str, float], 

641 max_count: Union[str, float, None] = None, 

642 message: str = "", 

643 ) -> None: 

644 """Called whenever the progress changes 

645 

646 :param op_code: 

647 Integer allowing to be compared against Operation IDs and stage IDs. 

648 

649 Stage IDs are BEGIN and END. BEGIN will only be set once for each Operation 

650 ID as well as END. It may be that BEGIN and END are set at once in case only 

651 one progress message was emitted due to the speed of the operation. 

652 Between BEGIN and END, none of these flags will be set 

653 

654 Operation IDs are all held within the OP_MASK. Only one Operation ID will 

655 be active per call. 

656 :param cur_count: Current absolute count of items 

657 

658 :param max_count: 

659 The maximum count of items we expect. It may be None in case there is 

660 no maximum number of items or if it is (yet) unknown. 

661 

662 :param message: 

663 In case of the 'WRITING' operation, it contains the amount of bytes 

664 transferred. It may possibly be used for other purposes as well. 

665 

666 You may read the contents of the current line in self._cur_line""" 

667 pass 

668 

669 

670class CallableRemoteProgress(RemoteProgress): 

671 """An implementation forwarding updates to any callable""" 

672 

673 __slots__ = "_callable" 

674 

675 def __init__(self, fn: Callable) -> None: 

676 self._callable = fn 

677 super(CallableRemoteProgress, self).__init__() 

678 

679 def update(self, *args: Any, **kwargs: Any) -> None: 

680 self._callable(*args, **kwargs) 

681 

682 

683class Actor(object): 

684 """Actors hold information about a person acting on the repository. They 

685 can be committers and authors or anything with a name and an email as 

686 mentioned in the git log entries.""" 

687 

688 # PRECOMPILED REGEX 

689 name_only_regex = re.compile(r"<(.*)>") 

690 name_email_regex = re.compile(r"(.*) <(.*?)>") 

691 

692 # ENVIRONMENT VARIABLES 

693 # read when creating new commits 

694 env_author_name = "GIT_AUTHOR_NAME" 

695 env_author_email = "GIT_AUTHOR_EMAIL" 

696 env_committer_name = "GIT_COMMITTER_NAME" 

697 env_committer_email = "GIT_COMMITTER_EMAIL" 

698 

699 # CONFIGURATION KEYS 

700 conf_name = "name" 

701 conf_email = "email" 

702 

703 __slots__ = ("name", "email") 

704 

705 def __init__(self, name: Optional[str], email: Optional[str]) -> None: 

706 self.name = name 

707 self.email = email 

708 

709 def __eq__(self, other: Any) -> bool: 

710 return self.name == other.name and self.email == other.email 

711 

712 def __ne__(self, other: Any) -> bool: 

713 return not (self == other) 

714 

715 def __hash__(self) -> int: 

716 return hash((self.name, self.email)) 

717 

718 def __str__(self) -> str: 

719 return self.name if self.name else "" 

720 

721 def __repr__(self) -> str: 

722 return '<git.Actor "%s <%s>">' % (self.name, self.email) 

723 

724 @classmethod 

725 def _from_string(cls, string: str) -> "Actor": 

726 """Create an Actor from a string. 

727 :param string: is the string, which is expected to be in regular git format 

728 

729 John Doe <jdoe@example.com> 

730 

731 :return: Actor""" 

732 m = cls.name_email_regex.search(string) 

733 if m: 

734 name, email = m.groups() 

735 return Actor(name, email) 

736 else: 

737 m = cls.name_only_regex.search(string) 

738 if m: 

739 return Actor(m.group(1), None) 

740 # assume best and use the whole string as name 

741 return Actor(string, None) 

742 # END special case name 

743 # END handle name/email matching 

744 

745 @classmethod 

746 def _main_actor( 

747 cls, 

748 env_name: str, 

749 env_email: str, 

750 config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None, 

751 ) -> "Actor": 

752 actor = Actor("", "") 

753 user_id = None # We use this to avoid multiple calls to getpass.getuser() 

754 

755 def default_email() -> str: 

756 nonlocal user_id 

757 if not user_id: 

758 user_id = get_user_id() 

759 return user_id 

760 

761 def default_name() -> str: 

762 return default_email().split("@")[0] 

763 

764 for attr, evar, cvar, default in ( 

765 ("name", env_name, cls.conf_name, default_name), 

766 ("email", env_email, cls.conf_email, default_email), 

767 ): 

768 try: 

769 val = os.environ[evar] 

770 setattr(actor, attr, val) 

771 except KeyError: 

772 if config_reader is not None: 

773 try: 

774 val = config_reader.get("user", cvar) 

775 except Exception: 

776 val = default() 

777 setattr(actor, attr, val) 

778 # END config-reader handling 

779 if not getattr(actor, attr): 

780 setattr(actor, attr, default()) 

781 # END handle name 

782 # END for each item to retrieve 

783 return actor 

784 

785 @classmethod 

786 def committer(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": 

787 """ 

788 :return: Actor instance corresponding to the configured committer. It behaves 

789 similar to the git implementation, such that the environment will override 

790 configuration values of config_reader. If no value is set at all, it will be 

791 generated 

792 :param config_reader: ConfigReader to use to retrieve the values from in case 

793 they are not set in the environment""" 

794 return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader) 

795 

796 @classmethod 

797 def author(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": 

798 """Same as committer(), but defines the main author. It may be specified in the environment, 

799 but defaults to the committer""" 

800 return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader) 

801 

802 

803class Stats(object): 

804 

805 """ 

806 Represents stat information as presented by git at the end of a merge. It is 

807 created from the output of a diff operation. 

808 

809 ``Example``:: 

810 

811 c = Commit( sha1 ) 

812 s = c.stats 

813 s.total # full-stat-dict 

814 s.files # dict( filepath : stat-dict ) 

815 

816 ``stat-dict`` 

817 

818 A dictionary with the following keys and values:: 

819 

820 deletions = number of deleted lines as int 

821 insertions = number of inserted lines as int 

822 lines = total number of lines changed as int, or deletions + insertions 

823 

824 ``full-stat-dict`` 

825 

826 In addition to the items in the stat-dict, it features additional information:: 

827 

828 files = number of changed files as int""" 

829 

830 __slots__ = ("total", "files") 

831 

832 def __init__(self, total: Total_TD, files: Dict[PathLike, Files_TD]): 

833 self.total = total 

834 self.files = files 

835 

836 @classmethod 

837 def _list_from_string(cls, repo: "Repo", text: str) -> "Stats": 

838 """Create a Stat object from output retrieved by git-diff. 

839 

840 :return: git.Stat""" 

841 

842 hsh: HSH_TD = { 

843 "total": {"insertions": 0, "deletions": 0, "lines": 0, "files": 0}, 

844 "files": {}, 

845 } 

846 for line in text.splitlines(): 

847 (raw_insertions, raw_deletions, filename) = line.split("\t") 

848 insertions = raw_insertions != "-" and int(raw_insertions) or 0 

849 deletions = raw_deletions != "-" and int(raw_deletions) or 0 

850 hsh["total"]["insertions"] += insertions 

851 hsh["total"]["deletions"] += deletions 

852 hsh["total"]["lines"] += insertions + deletions 

853 hsh["total"]["files"] += 1 

854 files_dict: Files_TD = { 

855 "insertions": insertions, 

856 "deletions": deletions, 

857 "lines": insertions + deletions, 

858 } 

859 hsh["files"][filename.strip()] = files_dict 

860 return Stats(hsh["total"], hsh["files"]) 

861 

862 

863class IndexFileSHA1Writer(object): 

864 

865 """Wrapper around a file-like object that remembers the SHA1 of 

866 the data written to it. It will write a sha when the stream is closed 

867 or if the asked for explicitly using write_sha. 

868 

869 Only useful to the indexfile 

870 

871 :note: Based on the dulwich project""" 

872 

873 __slots__ = ("f", "sha1") 

874 

875 def __init__(self, f: IO) -> None: 

876 self.f = f 

877 self.sha1 = make_sha(b"") 

878 

879 def write(self, data: AnyStr) -> int: 

880 self.sha1.update(data) 

881 return self.f.write(data) 

882 

883 def write_sha(self) -> bytes: 

884 sha = self.sha1.digest() 

885 self.f.write(sha) 

886 return sha 

887 

888 def close(self) -> bytes: 

889 sha = self.write_sha() 

890 self.f.close() 

891 return sha 

892 

893 def tell(self) -> int: 

894 return self.f.tell() 

895 

896 

897class LockFile(object): 

898 

899 """Provides methods to obtain, check for, and release a file based lock which 

900 should be used to handle concurrent access to the same file. 

901 

902 As we are a utility class to be derived from, we only use protected methods. 

903 

904 Locks will automatically be released on destruction""" 

905 

906 __slots__ = ("_file_path", "_owns_lock") 

907 

908 def __init__(self, file_path: PathLike) -> None: 

909 self._file_path = file_path 

910 self._owns_lock = False 

911 

912 def __del__(self) -> None: 

913 self._release_lock() 

914 

915 def _lock_file_path(self) -> str: 

916 """:return: Path to lockfile""" 

917 return "%s.lock" % (self._file_path) 

918 

919 def _has_lock(self) -> bool: 

920 """:return: True if we have a lock and if the lockfile still exists 

921 :raise AssertionError: if our lock-file does not exist""" 

922 return self._owns_lock 

923 

924 def _obtain_lock_or_raise(self) -> None: 

925 """Create a lock file as flag for other instances, mark our instance as lock-holder 

926 

927 :raise IOError: if a lock was already present or a lock file could not be written""" 

928 if self._has_lock(): 

929 return 

930 lock_file = self._lock_file_path() 

931 if osp.isfile(lock_file): 

932 raise IOError( 

933 "Lock for file %r did already exist, delete %r in case the lock is illegal" 

934 % (self._file_path, lock_file) 

935 ) 

936 

937 try: 

938 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL 

939 if is_win: 

940 flags |= os.O_SHORT_LIVED 

941 fd = os.open(lock_file, flags, 0) 

942 os.close(fd) 

943 except OSError as e: 

944 raise IOError(str(e)) from e 

945 

946 self._owns_lock = True 

947 

948 def _obtain_lock(self) -> None: 

949 """The default implementation will raise if a lock cannot be obtained. 

950 Subclasses may override this method to provide a different implementation""" 

951 return self._obtain_lock_or_raise() 

952 

953 def _release_lock(self) -> None: 

954 """Release our lock if we have one""" 

955 if not self._has_lock(): 

956 return 

957 

958 # if someone removed our file beforhand, lets just flag this issue 

959 # instead of failing, to make it more usable. 

960 lfp = self._lock_file_path() 

961 try: 

962 rmfile(lfp) 

963 except OSError: 

964 pass 

965 self._owns_lock = False 

966 

967 

968class BlockingLockFile(LockFile): 

969 

970 """The lock file will block until a lock could be obtained, or fail after 

971 a specified timeout. 

972 

973 :note: If the directory containing the lock was removed, an exception will 

974 be raised during the blocking period, preventing hangs as the lock 

975 can never be obtained.""" 

976 

977 __slots__ = ("_check_interval", "_max_block_time") 

978 

979 def __init__( 

980 self, 

981 file_path: PathLike, 

982 check_interval_s: float = 0.3, 

983 max_block_time_s: int = maxsize, 

984 ) -> None: 

985 """Configure the instance 

986 

987 :param check_interval_s: 

988 Period of time to sleep until the lock is checked the next time. 

989 By default, it waits a nearly unlimited time 

990 

991 :param max_block_time_s: Maximum amount of seconds we may lock""" 

992 super(BlockingLockFile, self).__init__(file_path) 

993 self._check_interval = check_interval_s 

994 self._max_block_time = max_block_time_s 

995 

996 def _obtain_lock(self) -> None: 

997 """This method blocks until it obtained the lock, or raises IOError if 

998 it ran out of time or if the parent directory was not available anymore. 

999 If this method returns, you are guaranteed to own the lock""" 

1000 starttime = time.time() 

1001 maxtime = starttime + float(self._max_block_time) 

1002 while True: 

1003 try: 

1004 super(BlockingLockFile, self)._obtain_lock() 

1005 except IOError as e: 

1006 # synity check: if the directory leading to the lockfile is not 

1007 # readable anymore, raise an exception 

1008 curtime = time.time() 

1009 if not osp.isdir(osp.dirname(self._lock_file_path())): 

1010 msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % ( 

1011 self._lock_file_path(), 

1012 curtime - starttime, 

1013 ) 

1014 raise IOError(msg) from e 

1015 # END handle missing directory 

1016 

1017 if curtime >= maxtime: 

1018 msg = "Waited %g seconds for lock at %r" % ( 

1019 maxtime - starttime, 

1020 self._lock_file_path(), 

1021 ) 

1022 raise IOError(msg) from e 

1023 # END abort if we wait too long 

1024 time.sleep(self._check_interval) 

1025 else: 

1026 break 

1027 # END endless loop 

1028 

1029 

1030class IterableList(List[T_IterableObj]): 

1031 

1032 """ 

1033 List of iterable objects allowing to query an object by id or by named index:: 

1034 

1035 heads = repo.heads 

1036 heads.master 

1037 heads['master'] 

1038 heads[0] 

1039 

1040 Iterable parent objects = [Commit, SubModule, Reference, FetchInfo, PushInfo] 

1041 Iterable via inheritance = [Head, TagReference, RemoteReference] 

1042 ] 

1043 It requires an id_attribute name to be set which will be queried from its 

1044 contained items to have a means for comparison. 

1045 

1046 A prefix can be specified which is to be used in case the id returned by the 

1047 items always contains a prefix that does not matter to the user, so it 

1048 can be left out.""" 

1049 

1050 __slots__ = ("_id_attr", "_prefix") 

1051 

1052 def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[IterableObj]": 

1053 return super(IterableList, cls).__new__(cls) 

1054 

1055 def __init__(self, id_attr: str, prefix: str = "") -> None: 

1056 self._id_attr = id_attr 

1057 self._prefix = prefix 

1058 

1059 def __contains__(self, attr: object) -> bool: 

1060 # first try identity match for performance 

1061 try: 

1062 rval = list.__contains__(self, attr) 

1063 if rval: 

1064 return rval 

1065 except (AttributeError, TypeError): 

1066 pass 

1067 # END handle match 

1068 

1069 # otherwise make a full name search 

1070 try: 

1071 getattr(self, cast(str, attr)) # use cast to silence mypy 

1072 return True 

1073 except (AttributeError, TypeError): 

1074 return False 

1075 # END handle membership 

1076 

1077 def __getattr__(self, attr: str) -> T_IterableObj: 

1078 attr = self._prefix + attr 

1079 for item in self: 

1080 if getattr(item, self._id_attr) == attr: 

1081 return item 

1082 # END for each item 

1083 return list.__getattribute__(self, attr) 

1084 

1085 def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore 

1086 

1087 assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str" 

1088 

1089 if isinstance(index, int): 

1090 return list.__getitem__(self, index) 

1091 elif isinstance(index, slice): 

1092 raise ValueError("Index should be an int or str") 

1093 else: 

1094 try: 

1095 return getattr(self, index) 

1096 except AttributeError as e: 

1097 raise IndexError("No item found with id %r" % (self._prefix + index)) from e 

1098 # END handle getattr 

1099 

1100 def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None: 

1101 

1102 assert isinstance(index, (int, str)), "Index of IterableList should be an int or str" 

1103 

1104 delindex = cast(int, index) 

1105 if not isinstance(index, int): 

1106 delindex = -1 

1107 name = self._prefix + index 

1108 for i, item in enumerate(self): 

1109 if getattr(item, self._id_attr) == name: 

1110 delindex = i 

1111 break 

1112 # END search index 

1113 # END for each item 

1114 if delindex == -1: 

1115 raise IndexError("Item with name %s not found" % name) 

1116 # END handle error 

1117 # END get index to delete 

1118 list.__delitem__(self, delindex) 

1119 

1120 

1121class IterableClassWatcher(type): 

1122 """Metaclass that watches""" 

1123 

1124 def __init__(cls, name: str, bases: Tuple, clsdict: Dict) -> None: 

1125 for base in bases: 1125 ↛ 1126line 1125 didn't jump to line 1126, because the loop on line 1125 never started

1126 if type(base) == IterableClassWatcher: 

1127 warnings.warn( 

1128 f"GitPython Iterable subclassed by {name}. " 

1129 "Iterable is deprecated due to naming clash since v3.1.18" 

1130 " and will be removed in 3.1.20, " 

1131 "Use IterableObj instead \n", 

1132 DeprecationWarning, 

1133 stacklevel=2, 

1134 ) 

1135 

1136 

1137class Iterable(metaclass=IterableClassWatcher): 

1138 

1139 """Defines an interface for iterable items which is to assure a uniform 

1140 way to retrieve and iterate items within the git repository""" 

1141 

1142 __slots__ = () 

1143 _id_attribute_ = "attribute that most suitably identifies your instance" 

1144 

1145 @classmethod 

1146 def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: 

1147 """ 

1148 Deprecated, use IterableObj instead. 

1149 Find all items of this type - subclasses can specify args and kwargs differently. 

1150 If no args are given, subclasses are obliged to return all items if no additional 

1151 arguments arg given. 

1152 

1153 :note: Favor the iter_items method as it will 

1154 

1155 :return:list(Item,...) list of item instances""" 

1156 out_list: Any = IterableList(cls._id_attribute_) 

1157 out_list.extend(cls.iter_items(repo, *args, **kwargs)) 

1158 return out_list 

1159 

1160 @classmethod 

1161 def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: 

1162 # return typed to be compatible with subtypes e.g. Remote 

1163 """For more information about the arguments, see list_items 

1164 :return: iterator yielding Items""" 

1165 raise NotImplementedError("To be implemented by Subclass") 

1166 

1167 

1168@runtime_checkable 

1169class IterableObj(Protocol): 

1170 """Defines an interface for iterable items which is to assure a uniform 

1171 way to retrieve and iterate items within the git repository 

1172 

1173 Subclasses = [Submodule, Commit, Reference, PushInfo, FetchInfo, Remote]""" 

1174 

1175 __slots__ = () 

1176 _id_attribute_: str 

1177 

1178 @classmethod 

1179 def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]: 

1180 """ 

1181 Find all items of this type - subclasses can specify args and kwargs differently. 

1182 If no args are given, subclasses are obliged to return all items if no additional 

1183 arguments arg given. 

1184 

1185 :note: Favor the iter_items method as it will 

1186 

1187 :return:list(Item,...) list of item instances""" 

1188 out_list: IterableList = IterableList(cls._id_attribute_) 

1189 out_list.extend(cls.iter_items(repo, *args, **kwargs)) 

1190 return out_list 

1191 

1192 @classmethod 

1193 @abstractmethod 

1194 def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]: 

1195 # return typed to be compatible with subtypes e.g. Remote 

1196 """For more information about the arguments, see list_items 

1197 :return: iterator yielding Items""" 

1198 raise NotImplementedError("To be implemented by Subclass") 

1199 

1200 

1201# } END classes 

1202 

1203 

1204class NullHandler(logging.Handler): 

1205 def emit(self, record: object) -> None: 

1206 pass