Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/objects/util.py: 30%
232 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# util.py
2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
3#
4# This module is part of GitPython and is released under
5# the BSD License: http://www.opensource.org/licenses/bsd-license.php
6"""Module for general utility functions"""
7# flake8: noqa F401
10from abc import ABC, abstractmethod
11import warnings
12from git.util import IterableList, IterableObj, Actor
14import re
15from collections import deque
17from string import digits
18import time
19import calendar
20from datetime import datetime, timedelta, tzinfo
22# typing ------------------------------------------------------------
23from typing import (
24 Any,
25 Callable,
26 Deque,
27 Iterator,
28 Generic,
29 NamedTuple,
30 overload,
31 Sequence, # NOQA: F401
32 TYPE_CHECKING,
33 Tuple,
34 Type,
35 TypeVar,
36 Union,
37 cast,
38)
40from git.types import Has_id_attribute, Literal, _T # NOQA: F401
42if TYPE_CHECKING: 42 ↛ 43line 42 didn't jump to line 43, because the condition on line 42 was never true
43 from io import BytesIO, StringIO
44 from .commit import Commit
45 from .blob import Blob
46 from .tag import TagObject
47 from .tree import Tree, TraversedTreeTup
48 from subprocess import Popen
49 from .submodule.base import Submodule
50 from git.types import Protocol, runtime_checkable
51else:
52 # Protocol = Generic[_T] # Needed for typing bug #572?
53 Protocol = ABC
55 def runtime_checkable(f):
56 return f
59class TraverseNT(NamedTuple):
60 depth: int
61 item: Union["Traversable", "Blob"]
62 src: Union["Traversable", None]
65T_TIobj = TypeVar("T_TIobj", bound="TraversableIterableObj") # for TraversableIterableObj.traverse()
67TraversedTup = Union[
68 Tuple[Union["Traversable", None], "Traversable"], # for commit, submodule
69 "TraversedTreeTup",
70] # for tree.traverse()
72# --------------------------------------------------------------------
74__all__ = (
75 "get_object_type_by_name",
76 "parse_date",
77 "parse_actor_and_date",
78 "ProcessStreamAdapter",
79 "Traversable",
80 "altz_to_utctz_str",
81 "utctz_to_altz",
82 "verify_utctz",
83 "Actor",
84 "tzoffset",
85 "utc",
86)
88ZERO = timedelta(0)
90# { Functions
93def mode_str_to_int(modestr: Union[bytes, str]) -> int:
94 """
95 :param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used
96 :return:
97 String identifying a mode compatible to the mode methods ids of the
98 stat module regarding the rwx permissions for user, group and other,
99 special flags and file system flags, i.e. whether it is a symlink
100 for example."""
101 mode = 0
102 for iteration, char in enumerate(reversed(modestr[-6:])):
103 char = cast(Union[str, int], char)
104 mode += int(char) << iteration * 3
105 # END for each char
106 return mode
109def get_object_type_by_name(
110 object_type_name: bytes,
111) -> Union[Type["Commit"], Type["TagObject"], Type["Tree"], Type["Blob"]]:
112 """
113 :return: type suitable to handle the given object type name.
114 Use the type to create new instances.
116 :param object_type_name: Member of TYPES
118 :raise ValueError: In case object_type_name is unknown"""
119 if object_type_name == b"commit":
120 from . import commit
122 return commit.Commit
123 elif object_type_name == b"tag":
124 from . import tag
126 return tag.TagObject
127 elif object_type_name == b"blob":
128 from . import blob
130 return blob.Blob
131 elif object_type_name == b"tree":
132 from . import tree
134 return tree.Tree
135 else:
136 raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode())
139def utctz_to_altz(utctz: str) -> int:
140 """we convert utctz to the timezone in seconds, it is the format time.altzone
141 returns. Git stores it as UTC timezone which has the opposite sign as well,
142 which explains the -1 * ( that was made explicit here )
143 :param utctz: git utc timezone string, i.e. +0200"""
144 return -1 * int(float(utctz) / 100 * 3600)
147def altz_to_utctz_str(altz: float) -> str:
148 """As above, but inverses the operation, returning a string that can be used
149 in commit objects"""
150 utci = -1 * int((float(altz) / 3600) * 100)
151 utcs = str(abs(utci))
152 utcs = "0" * (4 - len(utcs)) + utcs
153 prefix = (utci < 0 and "-") or "+"
154 return prefix + utcs
157def verify_utctz(offset: str) -> str:
158 """:raise ValueError: if offset is incorrect
159 :return: offset"""
160 fmt_exc = ValueError("Invalid timezone offset format: %s" % offset)
161 if len(offset) != 5:
162 raise fmt_exc
163 if offset[0] not in "+-":
164 raise fmt_exc
165 if offset[1] not in digits or offset[2] not in digits or offset[3] not in digits or offset[4] not in digits:
166 raise fmt_exc
167 # END for each char
168 return offset
171class tzoffset(tzinfo):
172 def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None:
173 self._offset = timedelta(seconds=-secs_west_of_utc)
174 self._name = name or "fixed"
176 def __reduce__(self) -> Tuple[Type["tzoffset"], Tuple[float, str]]:
177 return tzoffset, (-self._offset.total_seconds(), self._name)
179 def utcoffset(self, dt: Union[datetime, None]) -> timedelta:
180 return self._offset
182 def tzname(self, dt: Union[datetime, None]) -> str:
183 return self._name
185 def dst(self, dt: Union[datetime, None]) -> timedelta:
186 return ZERO
189utc = tzoffset(0, "UTC")
192def from_timestamp(timestamp: float, tz_offset: float) -> datetime:
193 """Converts a timestamp + tz_offset into an aware datetime instance."""
194 utc_dt = datetime.fromtimestamp(timestamp, utc)
195 try:
196 local_dt = utc_dt.astimezone(tzoffset(tz_offset))
197 return local_dt
198 except ValueError:
199 return utc_dt
202def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]:
203 """
204 Parse the given date as one of the following
206 * aware datetime instance
207 * Git internal format: timestamp offset
208 * RFC 2822: Thu, 07 Apr 2005 22:13:13 +0200.
209 * ISO 8601 2005-04-07T22:13:13
210 The T can be a space as well
212 :return: Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch
213 :raise ValueError: If the format could not be understood
214 :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY.
215 """
216 if isinstance(string_date, datetime):
217 if string_date.tzinfo:
218 utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None
219 offset = -int(utcoffset.total_seconds())
220 return int(string_date.astimezone(utc).timestamp()), offset
221 else:
222 raise ValueError(f"string_date datetime object without tzinfo, {string_date}")
224 # git time
225 try:
226 if string_date.count(" ") == 1 and string_date.rfind(":") == -1:
227 timestamp, offset_str = string_date.split()
228 if timestamp.startswith("@"):
229 timestamp = timestamp[1:]
230 timestamp_int = int(timestamp)
231 return timestamp_int, utctz_to_altz(verify_utctz(offset_str))
232 else:
233 offset_str = "+0000" # local time by default
234 if string_date[-5] in "-+":
235 offset_str = verify_utctz(string_date[-5:])
236 string_date = string_date[:-6] # skip space as well
237 # END split timezone info
238 offset = utctz_to_altz(offset_str)
240 # now figure out the date and time portion - split time
241 date_formats = []
242 splitter = -1
243 if "," in string_date:
244 date_formats.append("%a, %d %b %Y")
245 splitter = string_date.rfind(" ")
246 else:
247 # iso plus additional
248 date_formats.append("%Y-%m-%d")
249 date_formats.append("%Y.%m.%d")
250 date_formats.append("%m/%d/%Y")
251 date_formats.append("%d.%m.%Y")
253 splitter = string_date.rfind("T")
254 if splitter == -1:
255 splitter = string_date.rfind(" ")
256 # END handle 'T' and ' '
257 # END handle rfc or iso
259 assert splitter > -1
261 # split date and time
262 time_part = string_date[splitter + 1 :] # skip space
263 date_part = string_date[:splitter]
265 # parse time
266 tstruct = time.strptime(time_part, "%H:%M:%S")
268 for fmt in date_formats:
269 try:
270 dtstruct = time.strptime(date_part, fmt)
271 utctime = calendar.timegm(
272 (
273 dtstruct.tm_year,
274 dtstruct.tm_mon,
275 dtstruct.tm_mday,
276 tstruct.tm_hour,
277 tstruct.tm_min,
278 tstruct.tm_sec,
279 dtstruct.tm_wday,
280 dtstruct.tm_yday,
281 tstruct.tm_isdst,
282 )
283 )
284 return int(utctime), offset
285 except ValueError:
286 continue
287 # END exception handling
288 # END for each fmt
290 # still here ? fail
291 raise ValueError("no format matched")
292 # END handle format
293 except Exception as e:
294 raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e
295 # END handle exceptions
298# precompiled regex
299_re_actor_epoch = re.compile(r"^.+? (.*) (\d+) ([+-]\d+).*$")
300_re_only_actor = re.compile(r"^.+? (.*)$")
303def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]:
304 """Parse out the actor (author or committer) info from a line like::
306 author Tom Preston-Werner <tom@mojombo.com> 1191999972 -0700
308 :return: [Actor, int_seconds_since_epoch, int_timezone_offset]"""
309 actor, epoch, offset = "", "0", "0"
310 m = _re_actor_epoch.search(line)
311 if m:
312 actor, epoch, offset = m.groups()
313 else:
314 m = _re_only_actor.search(line)
315 actor = m.group(1) if m else line or ""
316 return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset))
319# } END functions
322# { Classes
325class ProcessStreamAdapter(object):
327 """Class wireing all calls to the contained Process instance.
329 Use this type to hide the underlying process to provide access only to a specified
330 stream. The process is usually wrapped into an AutoInterrupt class to kill
331 it if the instance goes out of scope."""
333 __slots__ = ("_proc", "_stream")
335 def __init__(self, process: "Popen", stream_name: str) -> None:
336 self._proc = process
337 self._stream: StringIO = getattr(process, stream_name) # guessed type
339 def __getattr__(self, attr: str) -> Any:
340 return getattr(self._stream, attr)
343@runtime_checkable
344class Traversable(Protocol):
346 """Simple interface to perform depth-first or breadth-first traversals
347 into one direction.
348 Subclasses only need to implement one function.
349 Instances of the Subclass must be hashable
351 Defined subclasses = [Commit, Tree, SubModule]
352 """
354 __slots__ = ()
356 @classmethod
357 @abstractmethod
358 def _get_intermediate_items(cls, item: Any) -> Sequence["Traversable"]:
359 """
360 Returns:
361 Tuple of items connected to the given item.
362 Must be implemented in subclass
364 class Commit:: (cls, Commit) -> Tuple[Commit, ...]
365 class Submodule:: (cls, Submodule) -> Iterablelist[Submodule]
366 class Tree:: (cls, Tree) -> Tuple[Tree, ...]
367 """
368 raise NotImplementedError("To be implemented in subclass")
370 @abstractmethod
371 def list_traverse(self, *args: Any, **kwargs: Any) -> Any:
372 """ """
373 warnings.warn(
374 "list_traverse() method should only be called from subclasses."
375 "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20"
376 "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit",
377 DeprecationWarning,
378 stacklevel=2,
379 )
380 return self._list_traverse(*args, **kwargs)
382 def _list_traverse(
383 self, as_edge: bool = False, *args: Any, **kwargs: Any
384 ) -> IterableList[Union["Commit", "Submodule", "Tree", "Blob"]]:
385 """
386 :return: IterableList with the results of the traversal as produced by
387 traverse()
388 Commit -> IterableList['Commit']
389 Submodule -> IterableList['Submodule']
390 Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']]
391 """
392 # Commit and Submodule have id.__attribute__ as IterableObj
393 # Tree has id.__attribute__ inherited from IndexObject
394 if isinstance(self, Has_id_attribute):
395 id = self._id_attribute_
396 else:
397 id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_
398 # could add _id_attribute_ to Traversable, or make all Traversable also Iterable?
400 if not as_edge:
401 out: IterableList[Union["Commit", "Submodule", "Tree", "Blob"]] = IterableList(id)
402 out.extend(self.traverse(as_edge=as_edge, *args, **kwargs))
403 return out
404 # overloads in subclasses (mypy doesn't allow typing self: subclass)
405 # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]]
406 else:
407 # Raise deprecationwarning, doesn't make sense to use this
408 out_list: IterableList = IterableList(self.traverse(*args, **kwargs))
409 return out_list
411 @abstractmethod
412 def traverse(self, *args: Any, **kwargs: Any) -> Any:
413 """ """
414 warnings.warn(
415 "traverse() method should only be called from subclasses."
416 "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20"
417 "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit",
418 DeprecationWarning,
419 stacklevel=2,
420 )
421 return self._traverse(*args, **kwargs)
423 def _traverse( 423 ↛ exitline 423 didn't jump to the function exit
424 self,
425 predicate: Callable[[Union["Traversable", "Blob", TraversedTup], int], bool] = lambda i, d: True,
426 prune: Callable[[Union["Traversable", "Blob", TraversedTup], int], bool] = lambda i, d: False,
427 depth: int = -1,
428 branch_first: bool = True,
429 visit_once: bool = True,
430 ignore_self: int = 1,
431 as_edge: bool = False,
432 ) -> Union[Iterator[Union["Traversable", "Blob"]], Iterator[TraversedTup]]:
433 """:return: iterator yielding of items found when traversing self
434 :param predicate: f(i,d) returns False if item i at depth d should not be included in the result
436 :param prune:
437 f(i,d) return True if the search should stop at item i at depth d.
438 Item i will not be returned.
440 :param depth:
441 define at which level the iteration should not go deeper
442 if -1, there is no limit
443 if 0, you would effectively only get self, the root of the iteration
444 i.e. if 1, you would only get the first level of predecessors/successors
446 :param branch_first:
447 if True, items will be returned branch first, otherwise depth first
449 :param visit_once:
450 if True, items will only be returned once, although they might be encountered
451 several times. Loops are prevented that way.
453 :param ignore_self:
454 if True, self will be ignored and automatically pruned from
455 the result. Otherwise it will be the first item to be returned.
456 If as_edge is True, the source of the first edge is None
458 :param as_edge:
459 if True, return a pair of items, first being the source, second the
460 destination, i.e. tuple(src, dest) with the edge spanning from
461 source to destination"""
463 """
464 Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]]
465 Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]]
466 Tree -> Iterator[Union[Blob, Tree, Submodule,
467 Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]]
469 ignore_self=True is_edge=True -> Iterator[item]
470 ignore_self=True is_edge=False --> Iterator[item]
471 ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]]
472 ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]"""
474 visited = set()
475 stack: Deque[TraverseNT] = deque()
476 stack.append(TraverseNT(0, self, None)) # self is always depth level 0
478 def addToStack(
479 stack: Deque[TraverseNT],
480 src_item: "Traversable",
481 branch_first: bool,
482 depth: int,
483 ) -> None:
484 lst = self._get_intermediate_items(item)
485 if not lst: # empty list
486 return None
487 if branch_first:
488 stack.extendleft(TraverseNT(depth, i, src_item) for i in lst)
489 else:
490 reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1))
491 stack.extend(reviter)
493 # END addToStack local method
495 while stack:
496 d, item, src = stack.pop() # depth of item, item, item_source
498 if visit_once and item in visited:
499 continue
501 if visit_once:
502 visited.add(item)
504 rval: Union[TraversedTup, "Traversable", "Blob"]
505 if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item)
506 rval = (src, item)
507 else:
508 rval = item
510 if prune(rval, d):
511 continue
513 skipStartItem = ignore_self and (item is self)
514 if not skipStartItem and predicate(rval, d):
515 yield rval
517 # only continue to next level if this is appropriate !
518 nd = d + 1
519 if depth > -1 and nd > depth:
520 continue
522 addToStack(stack, item, branch_first, nd)
523 # END for each item on work stack
526@runtime_checkable
527class Serializable(Protocol):
529 """Defines methods to serialize and deserialize objects from and into a data stream"""
531 __slots__ = ()
533 # @abstractmethod
534 def _serialize(self, stream: "BytesIO") -> "Serializable":
535 """Serialize the data of this object into the given data stream
536 :note: a serialized object would ``_deserialize`` into the same object
537 :param stream: a file-like object
538 :return: self"""
539 raise NotImplementedError("To be implemented in subclass")
541 # @abstractmethod
542 def _deserialize(self, stream: "BytesIO") -> "Serializable":
543 """Deserialize all information regarding this object from the stream
544 :param stream: a file-like object
545 :return: self"""
546 raise NotImplementedError("To be implemented in subclass")
549class TraversableIterableObj(IterableObj, Traversable):
550 __slots__ = ()
552 TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj]
554 def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]:
555 return super(TraversableIterableObj, self)._list_traverse(*args, **kwargs)
557 @overload # type: ignore
558 def traverse(self: T_TIobj) -> Iterator[T_TIobj]:
559 ...
561 @overload
562 def traverse(
563 self: T_TIobj,
564 predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
565 prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
566 depth: int,
567 branch_first: bool,
568 visit_once: bool,
569 ignore_self: Literal[True],
570 as_edge: Literal[False],
571 ) -> Iterator[T_TIobj]:
572 ...
574 @overload
575 def traverse(
576 self: T_TIobj,
577 predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
578 prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
579 depth: int,
580 branch_first: bool,
581 visit_once: bool,
582 ignore_self: Literal[False],
583 as_edge: Literal[True],
584 ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]:
585 ...
587 @overload
588 def traverse(
589 self: T_TIobj,
590 predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
591 prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
592 depth: int,
593 branch_first: bool,
594 visit_once: bool,
595 ignore_self: Literal[True],
596 as_edge: Literal[True],
597 ) -> Iterator[Tuple[T_TIobj, T_TIobj]]:
598 ...
600 def traverse( 600 ↛ exitline 600 didn't jump to the function exit
601 self: T_TIobj,
602 predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: True,
603 prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: False,
604 depth: int = -1,
605 branch_first: bool = True,
606 visit_once: bool = True,
607 ignore_self: int = 1,
608 as_edge: bool = False,
609 ) -> Union[Iterator[T_TIobj], Iterator[Tuple[T_TIobj, T_TIobj]], Iterator[TIobj_tuple]]:
610 """For documentation, see util.Traversable._traverse()"""
612 """
613 # To typecheck instead of using cast.
614 import itertools
615 from git.types import TypeGuard
616 def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]:
617 for x in inp[1]:
618 if not isinstance(x, tuple) and len(x) != 2:
619 if all(isinstance(inner, Commit) for inner in x):
620 continue
621 return True
623 ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge)
624 ret_tup = itertools.tee(ret, 2)
625 assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}"
626 return ret_tup[0]
627 """
628 return cast(
629 Union[Iterator[T_TIobj], Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]],
630 super(TraversableIterableObj, self)._traverse(
631 predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore
632 ),
633 )