Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/objects/tree.py: 25%
181 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# tree.py
2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
3#
4# This module is part of GitPython and is released under
5# the BSD License: http://www.opensource.org/licenses/bsd-license.php
7from git.util import IterableList, join_path
8import git.diff as git_diff
9from git.util import to_bin_sha
11from . import util
12from .base import IndexObject, IndexObjUnion
13from .blob import Blob
14from .submodule.base import Submodule
16from .fun import tree_entries_from_data, tree_to_stream
19# typing -------------------------------------------------
21from typing import (
22 Any,
23 Callable,
24 Dict,
25 Iterable,
26 Iterator,
27 List,
28 Tuple,
29 Type,
30 Union,
31 cast,
32 TYPE_CHECKING,
33)
35from git.types import PathLike, Literal
37if TYPE_CHECKING: 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true
38 from git.repo import Repo
39 from io import BytesIO
41TreeCacheTup = Tuple[bytes, int, str]
43TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]]
46# def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[TreeCacheTup]:
47# return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str)
49# --------------------------------------------------------
52cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b) 52 ↛ exitline 52 didn't run the lambda on line 52
54__all__ = ("TreeModifier", "Tree")
57def git_cmp(t1: TreeCacheTup, t2: TreeCacheTup) -> int:
58 a, b = t1[2], t2[2]
59 # assert isinstance(a, str) and isinstance(b, str)
60 len_a, len_b = len(a), len(b)
61 min_len = min(len_a, len_b)
62 min_cmp = cmp(a[:min_len], b[:min_len])
64 if min_cmp:
65 return min_cmp
67 return len_a - len_b
70def merge_sort(a: List[TreeCacheTup], cmp: Callable[[TreeCacheTup, TreeCacheTup], int]) -> None:
71 if len(a) < 2:
72 return None
74 mid = len(a) // 2
75 lefthalf = a[:mid]
76 righthalf = a[mid:]
78 merge_sort(lefthalf, cmp)
79 merge_sort(righthalf, cmp)
81 i = 0
82 j = 0
83 k = 0
85 while i < len(lefthalf) and j < len(righthalf):
86 if cmp(lefthalf[i], righthalf[j]) <= 0:
87 a[k] = lefthalf[i]
88 i = i + 1
89 else:
90 a[k] = righthalf[j]
91 j = j + 1
92 k = k + 1
94 while i < len(lefthalf):
95 a[k] = lefthalf[i]
96 i = i + 1
97 k = k + 1
99 while j < len(righthalf):
100 a[k] = righthalf[j]
101 j = j + 1
102 k = k + 1
105class TreeModifier(object):
107 """A utility class providing methods to alter the underlying cache in a list-like fashion.
109 Once all adjustments are complete, the _cache, which really is a reference to
110 the cache of a tree, will be sorted. Assuring it will be in a serializable state"""
112 __slots__ = "_cache"
114 def __init__(self, cache: List[TreeCacheTup]) -> None:
115 self._cache = cache
117 def _index_by_name(self, name: str) -> int:
118 """:return: index of an item with name, or -1 if not found"""
119 for i, t in enumerate(self._cache):
120 if t[2] == name:
121 return i
122 # END found item
123 # END for each item in cache
124 return -1
126 # { Interface
127 def set_done(self) -> "TreeModifier":
128 """Call this method once you are done modifying the tree information.
129 It may be called several times, but be aware that each call will cause
130 a sort operation
131 :return self:"""
132 merge_sort(self._cache, git_cmp)
133 return self
135 # } END interface
137 # { Mutators
138 def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier":
139 """Add the given item to the tree. If an item with the given name already
140 exists, nothing will be done, but a ValueError will be raised if the
141 sha and mode of the existing item do not match the one you add, unless
142 force is True
144 :param sha: The 20 or 40 byte sha of the item to add
145 :param mode: int representing the stat compatible mode of the item
146 :param force: If True, an item with your name and information will overwrite
147 any existing item with the same name, no matter which information it has
148 :return: self"""
149 if "/" in name:
150 raise ValueError("Name must not contain '/' characters")
151 if (mode >> 12) not in Tree._map_id_to_type:
152 raise ValueError("Invalid object type according to mode %o" % mode)
154 sha = to_bin_sha(sha)
155 index = self._index_by_name(name)
157 item = (sha, mode, name)
158 # assert is_tree_cache(item)
160 if index == -1:
161 self._cache.append(item)
162 else:
163 if force:
164 self._cache[index] = item
165 else:
166 ex_item = self._cache[index]
167 if ex_item[0] != sha or ex_item[1] != mode:
168 raise ValueError("Item %r existed with different properties" % name)
169 # END handle mismatch
170 # END handle force
171 # END handle name exists
172 return self
174 def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None:
175 """Add the given item to the tree, its correctness is assumed, which
176 puts the caller into responsibility to assure the input is correct.
177 For more information on the parameters, see ``add``
178 :param binsha: 20 byte binary sha"""
179 assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str)
180 tree_cache = (binsha, mode, name)
182 self._cache.append(tree_cache)
184 def __delitem__(self, name: str) -> None:
185 """Deletes an item with the given name if it exists"""
186 index = self._index_by_name(name)
187 if index > -1:
188 del self._cache[index]
190 # } END mutators
193class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
195 """Tree objects represent an ordered list of Blobs and other Trees.
197 ``Tree as a list``::
199 Access a specific blob using the
200 tree['filename'] notation.
202 You may as well access by index
203 blob = tree[0]
204 """
206 type: Literal["tree"] = "tree"
207 __slots__ = "_cache"
209 # actual integer ids for comparison
210 commit_id = 0o16 # equals stat.S_IFDIR | stat.S_IFLNK - a directory link
211 blob_id = 0o10
212 symlink_id = 0o12
213 tree_id = 0o04
215 _map_id_to_type: Dict[int, Type[IndexObjUnion]] = {
216 commit_id: Submodule,
217 blob_id: Blob,
218 symlink_id: Blob
219 # tree id added once Tree is defined
220 }
222 def __init__(
223 self,
224 repo: "Repo",
225 binsha: bytes,
226 mode: int = tree_id << 12,
227 path: Union[PathLike, None] = None,
228 ):
229 super(Tree, self).__init__(repo, binsha, mode, path)
231 @classmethod
232 def _get_intermediate_items(
233 cls,
234 index_object: IndexObjUnion,
235 ) -> Union[Tuple["Tree", ...], Tuple[()]]:
236 if index_object.type == "tree":
237 return tuple(index_object._iter_convert_to_object(index_object._cache))
238 return ()
240 def _set_cache_(self, attr: str) -> None:
241 if attr == "_cache":
242 # Set the data when we need it
243 ostream = self.repo.odb.stream(self.binsha)
244 self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read())
245 else:
246 super(Tree, self)._set_cache_(attr)
247 # END handle attribute
249 def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]:
250 """Iterable yields tuples of (binsha, mode, name), which will be converted
251 to the respective object representation"""
252 for binsha, mode, name in iterable:
253 path = join_path(self.path, name)
254 try:
255 yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path)
256 except KeyError as e:
257 raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e
258 # END for each item
260 def join(self, file: str) -> IndexObjUnion:
261 """Find the named object in this tree's contents
262 :return: ``git.Blob`` or ``git.Tree`` or ``git.Submodule``
264 :raise KeyError: if given file or tree does not exist in tree"""
265 msg = "Blob or Tree named %r not found"
266 if "/" in file:
267 tree = self
268 item = self
269 tokens = file.split("/")
270 for i, token in enumerate(tokens):
271 item = tree[token]
272 if item.type == "tree":
273 tree = item
274 else:
275 # safety assertion - blobs are at the end of the path
276 if i != len(tokens) - 1:
277 raise KeyError(msg % file)
278 return item
279 # END handle item type
280 # END for each token of split path
281 if item == self:
282 raise KeyError(msg % file)
283 return item
284 else:
285 for info in self._cache:
286 if info[2] == file: # [2] == name
287 return self._map_id_to_type[info[1] >> 12](
288 self.repo, info[0], info[1], join_path(self.path, info[2])
289 )
290 # END for each obj
291 raise KeyError(msg % file)
292 # END handle long paths
294 def __truediv__(self, file: str) -> IndexObjUnion:
295 """For PY3 only"""
296 return self.join(file)
298 @property
299 def trees(self) -> List["Tree"]:
300 """:return: list(Tree, ...) list of trees directly below this tree"""
301 return [i for i in self if i.type == "tree"]
303 @property
304 def blobs(self) -> List[Blob]:
305 """:return: list(Blob, ...) list of blobs directly below this tree"""
306 return [i for i in self if i.type == "blob"]
308 @property
309 def cache(self) -> TreeModifier:
310 """
311 :return: An object allowing to modify the internal cache. This can be used
312 to change the tree's contents. When done, make sure you call ``set_done``
313 on the tree modifier, or serialization behaviour will be incorrect.
314 See the ``TreeModifier`` for more information on how to alter the cache"""
315 return TreeModifier(self._cache)
317 def traverse( 317 ↛ exitline 317 didn't jump to the function exit
318 self, # type: ignore[override]
319 predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True,
320 prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False,
321 depth: int = -1,
322 branch_first: bool = True,
323 visit_once: bool = False,
324 ignore_self: int = 1,
325 as_edge: bool = False,
326 ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]:
327 """For documentation, see util.Traversable._traverse()
328 Trees are set to visit_once = False to gain more performance in the traversal"""
330 # """
331 # # To typecheck instead of using cast.
332 # import itertools
333 # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]:
334 # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1])
336 # ret = super(Tree, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
337 # ret_tup = itertools.tee(ret, 2)
338 # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}"
339 # return ret_tup[0]"""
340 return cast(
341 Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]],
342 super(Tree, self)._traverse(
343 predicate,
344 prune,
345 depth, # type: ignore
346 branch_first,
347 visit_once,
348 ignore_self,
349 ),
350 )
352 def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]:
353 """
354 :return: IterableList with the results of the traversal as produced by
355 traverse()
356 Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']]
357 """
358 return super(Tree, self)._list_traverse(*args, **kwargs)
360 # List protocol
362 def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]:
363 return list(self._iter_convert_to_object(self._cache[i:j]))
365 def __iter__(self) -> Iterator[IndexObjUnion]:
366 return self._iter_convert_to_object(self._cache)
368 def __len__(self) -> int:
369 return len(self._cache)
371 def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion:
372 if isinstance(item, int):
373 info = self._cache[item]
374 return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
376 if isinstance(item, str):
377 # compatibility
378 return self.join(item)
379 # END index is basestring
381 raise TypeError("Invalid index type: %r" % item)
383 def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool:
384 if isinstance(item, IndexObject):
385 for info in self._cache:
386 if item.binsha == info[0]:
387 return True
388 # END compare sha
389 # END for each entry
390 # END handle item is index object
391 # compatibility
393 # treat item as repo-relative path
394 else:
395 path = self.path
396 for info in self._cache:
397 if item == join_path(path, info[2]):
398 return True
399 # END for each item
400 return False
402 def __reversed__(self) -> Iterator[IndexObjUnion]:
403 return reversed(self._iter_convert_to_object(self._cache)) # type: ignore
405 def _serialize(self, stream: "BytesIO") -> "Tree":
406 """Serialize this tree into the stream. Please note that we will assume
407 our tree data to be in a sorted state. If this is not the case, serialization
408 will not generate a correct tree representation as these are assumed to be sorted
409 by algorithms"""
410 tree_to_stream(self._cache, stream.write)
411 return self
413 def _deserialize(self, stream: "BytesIO") -> "Tree":
414 self._cache = tree_entries_from_data(stream.read())
415 return self
418# END tree
420# finalize map definition
421Tree._map_id_to_type[Tree.tree_id] = Tree
422#