Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/objects/util.py: 30%

232 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# util.py 

2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

3# 

4# This module is part of GitPython and is released under 

5# the BSD License: http://www.opensource.org/licenses/bsd-license.php 

6"""Module for general utility functions""" 

7# flake8: noqa F401 

8 

9 

10from abc import ABC, abstractmethod 

11import warnings 

12from git.util import IterableList, IterableObj, Actor 

13 

14import re 

15from collections import deque 

16 

17from string import digits 

18import time 

19import calendar 

20from datetime import datetime, timedelta, tzinfo 

21 

22# typing ------------------------------------------------------------ 

23from typing import ( 

24 Any, 

25 Callable, 

26 Deque, 

27 Iterator, 

28 Generic, 

29 NamedTuple, 

30 overload, 

31 Sequence, # NOQA: F401 

32 TYPE_CHECKING, 

33 Tuple, 

34 Type, 

35 TypeVar, 

36 Union, 

37 cast, 

38) 

39 

40from git.types import Has_id_attribute, Literal, _T # NOQA: F401 

41 

42if TYPE_CHECKING: 42 ↛ 43line 42 didn't jump to line 43, because the condition on line 42 was never true

43 from io import BytesIO, StringIO 

44 from .commit import Commit 

45 from .blob import Blob 

46 from .tag import TagObject 

47 from .tree import Tree, TraversedTreeTup 

48 from subprocess import Popen 

49 from .submodule.base import Submodule 

50 from git.types import Protocol, runtime_checkable 

51else: 

52 # Protocol = Generic[_T] # Needed for typing bug #572? 

53 Protocol = ABC 

54 

55 def runtime_checkable(f): 

56 return f 

57 

58 

59class TraverseNT(NamedTuple): 

60 depth: int 

61 item: Union["Traversable", "Blob"] 

62 src: Union["Traversable", None] 

63 

64 

65T_TIobj = TypeVar("T_TIobj", bound="TraversableIterableObj") # for TraversableIterableObj.traverse() 

66 

67TraversedTup = Union[ 

68 Tuple[Union["Traversable", None], "Traversable"], # for commit, submodule 

69 "TraversedTreeTup", 

70] # for tree.traverse() 

71 

72# -------------------------------------------------------------------- 

73 

74__all__ = ( 

75 "get_object_type_by_name", 

76 "parse_date", 

77 "parse_actor_and_date", 

78 "ProcessStreamAdapter", 

79 "Traversable", 

80 "altz_to_utctz_str", 

81 "utctz_to_altz", 

82 "verify_utctz", 

83 "Actor", 

84 "tzoffset", 

85 "utc", 

86) 

87 

88ZERO = timedelta(0) 

89 

90# { Functions 

91 

92 

93def mode_str_to_int(modestr: Union[bytes, str]) -> int: 

94 """ 

95 :param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used 

96 :return: 

97 String identifying a mode compatible to the mode methods ids of the 

98 stat module regarding the rwx permissions for user, group and other, 

99 special flags and file system flags, i.e. whether it is a symlink 

100 for example.""" 

101 mode = 0 

102 for iteration, char in enumerate(reversed(modestr[-6:])): 

103 char = cast(Union[str, int], char) 

104 mode += int(char) << iteration * 3 

105 # END for each char 

106 return mode 

107 

108 

109def get_object_type_by_name( 

110 object_type_name: bytes, 

111) -> Union[Type["Commit"], Type["TagObject"], Type["Tree"], Type["Blob"]]: 

112 """ 

113 :return: type suitable to handle the given object type name. 

114 Use the type to create new instances. 

115 

116 :param object_type_name: Member of TYPES 

117 

118 :raise ValueError: In case object_type_name is unknown""" 

119 if object_type_name == b"commit": 

120 from . import commit 

121 

122 return commit.Commit 

123 elif object_type_name == b"tag": 

124 from . import tag 

125 

126 return tag.TagObject 

127 elif object_type_name == b"blob": 

128 from . import blob 

129 

130 return blob.Blob 

131 elif object_type_name == b"tree": 

132 from . import tree 

133 

134 return tree.Tree 

135 else: 

136 raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) 

137 

138 

139def utctz_to_altz(utctz: str) -> int: 

140 """we convert utctz to the timezone in seconds, it is the format time.altzone 

141 returns. Git stores it as UTC timezone which has the opposite sign as well, 

142 which explains the -1 * ( that was made explicit here ) 

143 :param utctz: git utc timezone string, i.e. +0200""" 

144 return -1 * int(float(utctz) / 100 * 3600) 

145 

146 

147def altz_to_utctz_str(altz: float) -> str: 

148 """As above, but inverses the operation, returning a string that can be used 

149 in commit objects""" 

150 utci = -1 * int((float(altz) / 3600) * 100) 

151 utcs = str(abs(utci)) 

152 utcs = "0" * (4 - len(utcs)) + utcs 

153 prefix = (utci < 0 and "-") or "+" 

154 return prefix + utcs 

155 

156 

157def verify_utctz(offset: str) -> str: 

158 """:raise ValueError: if offset is incorrect 

159 :return: offset""" 

160 fmt_exc = ValueError("Invalid timezone offset format: %s" % offset) 

161 if len(offset) != 5: 

162 raise fmt_exc 

163 if offset[0] not in "+-": 

164 raise fmt_exc 

165 if offset[1] not in digits or offset[2] not in digits or offset[3] not in digits or offset[4] not in digits: 

166 raise fmt_exc 

167 # END for each char 

168 return offset 

169 

170 

171class tzoffset(tzinfo): 

172 def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: 

173 self._offset = timedelta(seconds=-secs_west_of_utc) 

174 self._name = name or "fixed" 

175 

176 def __reduce__(self) -> Tuple[Type["tzoffset"], Tuple[float, str]]: 

177 return tzoffset, (-self._offset.total_seconds(), self._name) 

178 

179 def utcoffset(self, dt: Union[datetime, None]) -> timedelta: 

180 return self._offset 

181 

182 def tzname(self, dt: Union[datetime, None]) -> str: 

183 return self._name 

184 

185 def dst(self, dt: Union[datetime, None]) -> timedelta: 

186 return ZERO 

187 

188 

189utc = tzoffset(0, "UTC") 

190 

191 

192def from_timestamp(timestamp: float, tz_offset: float) -> datetime: 

193 """Converts a timestamp + tz_offset into an aware datetime instance.""" 

194 utc_dt = datetime.fromtimestamp(timestamp, utc) 

195 try: 

196 local_dt = utc_dt.astimezone(tzoffset(tz_offset)) 

197 return local_dt 

198 except ValueError: 

199 return utc_dt 

200 

201 

202def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: 

203 """ 

204 Parse the given date as one of the following 

205 

206 * aware datetime instance 

207 * Git internal format: timestamp offset 

208 * RFC 2822: Thu, 07 Apr 2005 22:13:13 +0200. 

209 * ISO 8601 2005-04-07T22:13:13 

210 The T can be a space as well 

211 

212 :return: Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch 

213 :raise ValueError: If the format could not be understood 

214 :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. 

215 """ 

216 if isinstance(string_date, datetime): 

217 if string_date.tzinfo: 

218 utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None 

219 offset = -int(utcoffset.total_seconds()) 

220 return int(string_date.astimezone(utc).timestamp()), offset 

221 else: 

222 raise ValueError(f"string_date datetime object without tzinfo, {string_date}") 

223 

224 # git time 

225 try: 

226 if string_date.count(" ") == 1 and string_date.rfind(":") == -1: 

227 timestamp, offset_str = string_date.split() 

228 if timestamp.startswith("@"): 

229 timestamp = timestamp[1:] 

230 timestamp_int = int(timestamp) 

231 return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) 

232 else: 

233 offset_str = "+0000" # local time by default 

234 if string_date[-5] in "-+": 

235 offset_str = verify_utctz(string_date[-5:]) 

236 string_date = string_date[:-6] # skip space as well 

237 # END split timezone info 

238 offset = utctz_to_altz(offset_str) 

239 

240 # now figure out the date and time portion - split time 

241 date_formats = [] 

242 splitter = -1 

243 if "," in string_date: 

244 date_formats.append("%a, %d %b %Y") 

245 splitter = string_date.rfind(" ") 

246 else: 

247 # iso plus additional 

248 date_formats.append("%Y-%m-%d") 

249 date_formats.append("%Y.%m.%d") 

250 date_formats.append("%m/%d/%Y") 

251 date_formats.append("%d.%m.%Y") 

252 

253 splitter = string_date.rfind("T") 

254 if splitter == -1: 

255 splitter = string_date.rfind(" ") 

256 # END handle 'T' and ' ' 

257 # END handle rfc or iso 

258 

259 assert splitter > -1 

260 

261 # split date and time 

262 time_part = string_date[splitter + 1 :] # skip space 

263 date_part = string_date[:splitter] 

264 

265 # parse time 

266 tstruct = time.strptime(time_part, "%H:%M:%S") 

267 

268 for fmt in date_formats: 

269 try: 

270 dtstruct = time.strptime(date_part, fmt) 

271 utctime = calendar.timegm( 

272 ( 

273 dtstruct.tm_year, 

274 dtstruct.tm_mon, 

275 dtstruct.tm_mday, 

276 tstruct.tm_hour, 

277 tstruct.tm_min, 

278 tstruct.tm_sec, 

279 dtstruct.tm_wday, 

280 dtstruct.tm_yday, 

281 tstruct.tm_isdst, 

282 ) 

283 ) 

284 return int(utctime), offset 

285 except ValueError: 

286 continue 

287 # END exception handling 

288 # END for each fmt 

289 

290 # still here ? fail 

291 raise ValueError("no format matched") 

292 # END handle format 

293 except Exception as e: 

294 raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e 

295 # END handle exceptions 

296 

297 

298# precompiled regex 

299_re_actor_epoch = re.compile(r"^.+? (.*) (\d+) ([+-]\d+).*$") 

300_re_only_actor = re.compile(r"^.+? (.*)$") 

301 

302 

303def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: 

304 """Parse out the actor (author or committer) info from a line like:: 

305 

306 author Tom Preston-Werner <tom@mojombo.com> 1191999972 -0700 

307 

308 :return: [Actor, int_seconds_since_epoch, int_timezone_offset]""" 

309 actor, epoch, offset = "", "0", "0" 

310 m = _re_actor_epoch.search(line) 

311 if m: 

312 actor, epoch, offset = m.groups() 

313 else: 

314 m = _re_only_actor.search(line) 

315 actor = m.group(1) if m else line or "" 

316 return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset)) 

317 

318 

319# } END functions 

320 

321 

322# { Classes 

323 

324 

325class ProcessStreamAdapter(object): 

326 

327 """Class wireing all calls to the contained Process instance. 

328 

329 Use this type to hide the underlying process to provide access only to a specified 

330 stream. The process is usually wrapped into an AutoInterrupt class to kill 

331 it if the instance goes out of scope.""" 

332 

333 __slots__ = ("_proc", "_stream") 

334 

335 def __init__(self, process: "Popen", stream_name: str) -> None: 

336 self._proc = process 

337 self._stream: StringIO = getattr(process, stream_name) # guessed type 

338 

339 def __getattr__(self, attr: str) -> Any: 

340 return getattr(self._stream, attr) 

341 

342 

343@runtime_checkable 

344class Traversable(Protocol): 

345 

346 """Simple interface to perform depth-first or breadth-first traversals 

347 into one direction. 

348 Subclasses only need to implement one function. 

349 Instances of the Subclass must be hashable 

350 

351 Defined subclasses = [Commit, Tree, SubModule] 

352 """ 

353 

354 __slots__ = () 

355 

356 @classmethod 

357 @abstractmethod 

358 def _get_intermediate_items(cls, item: Any) -> Sequence["Traversable"]: 

359 """ 

360 Returns: 

361 Tuple of items connected to the given item. 

362 Must be implemented in subclass 

363 

364 class Commit:: (cls, Commit) -> Tuple[Commit, ...] 

365 class Submodule:: (cls, Submodule) -> Iterablelist[Submodule] 

366 class Tree:: (cls, Tree) -> Tuple[Tree, ...] 

367 """ 

368 raise NotImplementedError("To be implemented in subclass") 

369 

370 @abstractmethod 

371 def list_traverse(self, *args: Any, **kwargs: Any) -> Any: 

372 """ """ 

373 warnings.warn( 

374 "list_traverse() method should only be called from subclasses." 

375 "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" 

376 "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", 

377 DeprecationWarning, 

378 stacklevel=2, 

379 ) 

380 return self._list_traverse(*args, **kwargs) 

381 

382 def _list_traverse( 

383 self, as_edge: bool = False, *args: Any, **kwargs: Any 

384 ) -> IterableList[Union["Commit", "Submodule", "Tree", "Blob"]]: 

385 """ 

386 :return: IterableList with the results of the traversal as produced by 

387 traverse() 

388 Commit -> IterableList['Commit'] 

389 Submodule -> IterableList['Submodule'] 

390 Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] 

391 """ 

392 # Commit and Submodule have id.__attribute__ as IterableObj 

393 # Tree has id.__attribute__ inherited from IndexObject 

394 if isinstance(self, Has_id_attribute): 

395 id = self._id_attribute_ 

396 else: 

397 id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ 

398 # could add _id_attribute_ to Traversable, or make all Traversable also Iterable? 

399 

400 if not as_edge: 

401 out: IterableList[Union["Commit", "Submodule", "Tree", "Blob"]] = IterableList(id) 

402 out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) 

403 return out 

404 # overloads in subclasses (mypy doesn't allow typing self: subclass) 

405 # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] 

406 else: 

407 # Raise deprecationwarning, doesn't make sense to use this 

408 out_list: IterableList = IterableList(self.traverse(*args, **kwargs)) 

409 return out_list 

410 

411 @abstractmethod 

412 def traverse(self, *args: Any, **kwargs: Any) -> Any: 

413 """ """ 

414 warnings.warn( 

415 "traverse() method should only be called from subclasses." 

416 "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" 

417 "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", 

418 DeprecationWarning, 

419 stacklevel=2, 

420 ) 

421 return self._traverse(*args, **kwargs) 

422 

423 def _traverse( 423 ↛ exitline 423 didn't jump to the function exit

424 self, 

425 predicate: Callable[[Union["Traversable", "Blob", TraversedTup], int], bool] = lambda i, d: True, 

426 prune: Callable[[Union["Traversable", "Blob", TraversedTup], int], bool] = lambda i, d: False, 

427 depth: int = -1, 

428 branch_first: bool = True, 

429 visit_once: bool = True, 

430 ignore_self: int = 1, 

431 as_edge: bool = False, 

432 ) -> Union[Iterator[Union["Traversable", "Blob"]], Iterator[TraversedTup]]: 

433 """:return: iterator yielding of items found when traversing self 

434 :param predicate: f(i,d) returns False if item i at depth d should not be included in the result 

435 

436 :param prune: 

437 f(i,d) return True if the search should stop at item i at depth d. 

438 Item i will not be returned. 

439 

440 :param depth: 

441 define at which level the iteration should not go deeper 

442 if -1, there is no limit 

443 if 0, you would effectively only get self, the root of the iteration 

444 i.e. if 1, you would only get the first level of predecessors/successors 

445 

446 :param branch_first: 

447 if True, items will be returned branch first, otherwise depth first 

448 

449 :param visit_once: 

450 if True, items will only be returned once, although they might be encountered 

451 several times. Loops are prevented that way. 

452 

453 :param ignore_self: 

454 if True, self will be ignored and automatically pruned from 

455 the result. Otherwise it will be the first item to be returned. 

456 If as_edge is True, the source of the first edge is None 

457 

458 :param as_edge: 

459 if True, return a pair of items, first being the source, second the 

460 destination, i.e. tuple(src, dest) with the edge spanning from 

461 source to destination""" 

462 

463 """ 

464 Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] 

465 Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]] 

466 Tree -> Iterator[Union[Blob, Tree, Submodule, 

467 Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]] 

468 

469 ignore_self=True is_edge=True -> Iterator[item] 

470 ignore_self=True is_edge=False --> Iterator[item] 

471 ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]] 

472 ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]""" 

473 

474 visited = set() 

475 stack: Deque[TraverseNT] = deque() 

476 stack.append(TraverseNT(0, self, None)) # self is always depth level 0 

477 

478 def addToStack( 

479 stack: Deque[TraverseNT], 

480 src_item: "Traversable", 

481 branch_first: bool, 

482 depth: int, 

483 ) -> None: 

484 lst = self._get_intermediate_items(item) 

485 if not lst: # empty list 

486 return None 

487 if branch_first: 

488 stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) 

489 else: 

490 reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) 

491 stack.extend(reviter) 

492 

493 # END addToStack local method 

494 

495 while stack: 

496 d, item, src = stack.pop() # depth of item, item, item_source 

497 

498 if visit_once and item in visited: 

499 continue 

500 

501 if visit_once: 

502 visited.add(item) 

503 

504 rval: Union[TraversedTup, "Traversable", "Blob"] 

505 if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item) 

506 rval = (src, item) 

507 else: 

508 rval = item 

509 

510 if prune(rval, d): 

511 continue 

512 

513 skipStartItem = ignore_self and (item is self) 

514 if not skipStartItem and predicate(rval, d): 

515 yield rval 

516 

517 # only continue to next level if this is appropriate ! 

518 nd = d + 1 

519 if depth > -1 and nd > depth: 

520 continue 

521 

522 addToStack(stack, item, branch_first, nd) 

523 # END for each item on work stack 

524 

525 

526@runtime_checkable 

527class Serializable(Protocol): 

528 

529 """Defines methods to serialize and deserialize objects from and into a data stream""" 

530 

531 __slots__ = () 

532 

533 # @abstractmethod 

534 def _serialize(self, stream: "BytesIO") -> "Serializable": 

535 """Serialize the data of this object into the given data stream 

536 :note: a serialized object would ``_deserialize`` into the same object 

537 :param stream: a file-like object 

538 :return: self""" 

539 raise NotImplementedError("To be implemented in subclass") 

540 

541 # @abstractmethod 

542 def _deserialize(self, stream: "BytesIO") -> "Serializable": 

543 """Deserialize all information regarding this object from the stream 

544 :param stream: a file-like object 

545 :return: self""" 

546 raise NotImplementedError("To be implemented in subclass") 

547 

548 

549class TraversableIterableObj(IterableObj, Traversable): 

550 __slots__ = () 

551 

552 TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] 

553 

554 def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: 

555 return super(TraversableIterableObj, self)._list_traverse(*args, **kwargs) 

556 

557 @overload # type: ignore 

558 def traverse(self: T_TIobj) -> Iterator[T_TIobj]: 

559 ... 

560 

561 @overload 

562 def traverse( 

563 self: T_TIobj, 

564 predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], 

565 prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], 

566 depth: int, 

567 branch_first: bool, 

568 visit_once: bool, 

569 ignore_self: Literal[True], 

570 as_edge: Literal[False], 

571 ) -> Iterator[T_TIobj]: 

572 ... 

573 

574 @overload 

575 def traverse( 

576 self: T_TIobj, 

577 predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], 

578 prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], 

579 depth: int, 

580 branch_first: bool, 

581 visit_once: bool, 

582 ignore_self: Literal[False], 

583 as_edge: Literal[True], 

584 ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: 

585 ... 

586 

587 @overload 

588 def traverse( 

589 self: T_TIobj, 

590 predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], 

591 prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], 

592 depth: int, 

593 branch_first: bool, 

594 visit_once: bool, 

595 ignore_self: Literal[True], 

596 as_edge: Literal[True], 

597 ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: 

598 ... 

599 

600 def traverse( 600 ↛ exitline 600 didn't jump to the function exit

601 self: T_TIobj, 

602 predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: True, 

603 prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: False, 

604 depth: int = -1, 

605 branch_first: bool = True, 

606 visit_once: bool = True, 

607 ignore_self: int = 1, 

608 as_edge: bool = False, 

609 ) -> Union[Iterator[T_TIobj], Iterator[Tuple[T_TIobj, T_TIobj]], Iterator[TIobj_tuple]]: 

610 """For documentation, see util.Traversable._traverse()""" 

611 

612 """ 

613 # To typecheck instead of using cast. 

614 import itertools 

615 from git.types import TypeGuard 

616 def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]: 

617 for x in inp[1]: 

618 if not isinstance(x, tuple) and len(x) != 2: 

619 if all(isinstance(inner, Commit) for inner in x): 

620 continue 

621 return True 

622 

623 ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge) 

624 ret_tup = itertools.tee(ret, 2) 

625 assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" 

626 return ret_tup[0] 

627 """ 

628 return cast( 

629 Union[Iterator[T_TIobj], Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], 

630 super(TraversableIterableObj, self)._traverse( 

631 predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore 

632 ), 

633 )