Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/objects/fun.py: 12%
101 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""Module with functions which are supposed to be as fast as possible"""
2from stat import S_ISDIR
5from git.compat import safe_decode, defenc
7# typing ----------------------------------------------
9from typing import (
10 Callable,
11 List,
12 MutableSequence,
13 Sequence,
14 Tuple,
15 TYPE_CHECKING,
16 Union,
17 overload,
18)
20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21, because the condition on line 20 was never true
21 from _typeshed import ReadableBuffer
22 from git import GitCmdObjectDB
24EntryTup = Tuple[bytes, int, str] # same as TreeCacheTup in tree.py
25EntryTupOrNone = Union[EntryTup, None]
27# ---------------------------------------------------
30__all__ = (
31 "tree_to_stream",
32 "tree_entries_from_data",
33 "traverse_trees_recursive",
34 "traverse_tree_recursive",
35)
38def tree_to_stream(entries: Sequence[EntryTup], write: Callable[["ReadableBuffer"], Union[int, None]]) -> None:
39 """Write the give list of entries into a stream using its write method
40 :param entries: **sorted** list of tuples with (binsha, mode, name)
41 :param write: write method which takes a data string"""
42 ord_zero = ord("0")
43 bit_mask = 7 # 3 bits set
45 for binsha, mode, name in entries:
46 mode_str = b""
47 for i in range(6):
48 mode_str = bytes([((mode >> (i * 3)) & bit_mask) + ord_zero]) + mode_str
49 # END for each 8 octal value
51 # git slices away the first octal if its zero
52 if mode_str[0] == ord_zero:
53 mode_str = mode_str[1:]
54 # END save a byte
56 # here it comes: if the name is actually unicode, the replacement below
57 # will not work as the binsha is not part of the ascii unicode encoding -
58 # hence we must convert to an utf8 string for it to work properly.
59 # According to my tests, this is exactly what git does, that is it just
60 # takes the input literally, which appears to be utf8 on linux.
61 if isinstance(name, str):
62 name_bytes = name.encode(defenc)
63 else:
64 name_bytes = name # type: ignore[unreachable] # check runtime types - is always str?
65 write(b"".join((mode_str, b" ", name_bytes, b"\0", binsha)))
66 # END for each item
69def tree_entries_from_data(data: bytes) -> List[EntryTup]:
70 """Reads the binary representation of a tree and returns tuples of Tree items
71 :param data: data block with tree data (as bytes)
72 :return: list(tuple(binsha, mode, tree_relative_path), ...)"""
73 ord_zero = ord("0")
74 space_ord = ord(" ")
75 len_data = len(data)
76 i = 0
77 out = []
78 while i < len_data:
79 mode = 0
81 # read mode
82 # Some git versions truncate the leading 0, some don't
83 # The type will be extracted from the mode later
84 while data[i] != space_ord:
85 # move existing mode integer up one level being 3 bits
86 # and add the actual ordinal value of the character
87 mode = (mode << 3) + (data[i] - ord_zero)
88 i += 1
89 # END while reading mode
91 # byte is space now, skip it
92 i += 1
94 # parse name, it is NULL separated
96 ns = i
97 while data[i] != 0:
98 i += 1
99 # END while not reached NULL
101 # default encoding for strings in git is utf8
102 # Only use the respective unicode object if the byte stream was encoded
103 name_bytes = data[ns:i]
104 name = safe_decode(name_bytes)
106 # byte is NULL, get next 20
107 i += 1
108 sha = data[i : i + 20]
109 i = i + 20
110 out.append((sha, mode, name))
111 # END for each byte in data stream
112 return out
115def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int) -> EntryTupOrNone:
116 """return data entry matching the given name and tree mode
117 or None.
118 Before the item is returned, the respective data item is set
119 None in the tree_data list to mark it done"""
121 try:
122 item = tree_data[start_at]
123 if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
124 tree_data[start_at] = None
125 return item
126 except IndexError:
127 pass
128 # END exception handling
129 for index, item in enumerate(tree_data):
130 if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
131 tree_data[index] = None
132 return item
133 # END if item matches
134 # END for each item
135 return None
138@overload
139def _to_full_path(item: None, path_prefix: str) -> None:
140 ...
143@overload
144def _to_full_path(item: EntryTup, path_prefix: str) -> EntryTup:
145 ...
148def _to_full_path(item: EntryTupOrNone, path_prefix: str) -> EntryTupOrNone:
149 """Rebuild entry with given path prefix"""
150 if not item:
151 return item
152 return (item[0], item[1], path_prefix + item[2])
155def traverse_trees_recursive(
156 odb: "GitCmdObjectDB", tree_shas: Sequence[Union[bytes, None]], path_prefix: str
157) -> List[Tuple[EntryTupOrNone, ...]]:
158 """
159 :return: list of list with entries according to the given binary tree-shas.
160 The result is encoded in a list
161 of n tuple|None per blob/commit, (n == len(tree_shas)), where
162 * [0] == 20 byte sha
163 * [1] == mode as int
164 * [2] == path relative to working tree root
165 The entry tuple is None if the respective blob/commit did not
166 exist in the given tree.
167 :param tree_shas: iterable of shas pointing to trees. All trees must
168 be on the same level. A tree-sha may be None in which case None
169 :param path_prefix: a prefix to be added to the returned paths on this level,
170 set it '' for the first iteration
171 :note: The ordering of the returned items will be partially lost"""
172 trees_data: List[List[EntryTupOrNone]] = []
174 nt = len(tree_shas)
175 for tree_sha in tree_shas:
176 if tree_sha is None:
177 data: List[EntryTupOrNone] = []
178 else:
179 # make new list for typing as list invariant
180 data = list(tree_entries_from_data(odb.stream(tree_sha).read()))
181 # END handle muted trees
182 trees_data.append(data)
183 # END for each sha to get data for
185 out: List[Tuple[EntryTupOrNone, ...]] = []
187 # find all matching entries and recursively process them together if the match
188 # is a tree. If the match is a non-tree item, put it into the result.
189 # Processed items will be set None
190 for ti, tree_data in enumerate(trees_data):
192 for ii, item in enumerate(tree_data):
193 if not item:
194 continue
195 # END skip already done items
196 entries: List[EntryTupOrNone]
197 entries = [None for _ in range(nt)]
198 entries[ti] = item
199 _sha, mode, name = item
200 is_dir = S_ISDIR(mode) # type mode bits
202 # find this item in all other tree data items
203 # wrap around, but stop one before our current index, hence
204 # ti+nt, not ti+1+nt
205 for tio in range(ti + 1, ti + nt):
206 tio = tio % nt
207 entries[tio] = _find_by_name(trees_data[tio], name, is_dir, ii)
209 # END for each other item data
210 # if we are a directory, enter recursion
211 if is_dir:
212 out.extend(
213 traverse_trees_recursive(
214 odb,
215 [((ei and ei[0]) or None) for ei in entries],
216 path_prefix + name + "/",
217 )
218 )
219 else:
220 out.append(tuple(_to_full_path(e, path_prefix) for e in entries))
222 # END handle recursion
223 # finally mark it done
224 tree_data[ii] = None
225 # END for each item
227 # we are done with one tree, set all its data empty
228 del tree_data[:]
229 # END for each tree_data chunk
230 return out
233def traverse_tree_recursive(odb: "GitCmdObjectDB", tree_sha: bytes, path_prefix: str) -> List[EntryTup]:
234 """
235 :return: list of entries of the tree pointed to by the binary tree_sha. An entry
236 has the following format:
237 * [0] 20 byte sha
238 * [1] mode as int
239 * [2] path relative to the repository
240 :param path_prefix: prefix to prepend to the front of all returned paths"""
241 entries = []
242 data = tree_entries_from_data(odb.stream(tree_sha).read())
244 # unpacking/packing is faster than accessing individual items
245 for sha, mode, name in data:
246 if S_ISDIR(mode):
247 entries.extend(traverse_tree_recursive(odb, sha, path_prefix + name + "/"))
248 else:
249 entries.append((sha, mode, path_prefix + name))
250 # END for each item
252 return entries