Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/objects/fun.py: 12%

101 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1"""Module with functions which are supposed to be as fast as possible""" 

2from stat import S_ISDIR 

3 

4 

5from git.compat import safe_decode, defenc 

6 

7# typing ---------------------------------------------- 

8 

9from typing import ( 

10 Callable, 

11 List, 

12 MutableSequence, 

13 Sequence, 

14 Tuple, 

15 TYPE_CHECKING, 

16 Union, 

17 overload, 

18) 

19 

20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21, because the condition on line 20 was never true

21 from _typeshed import ReadableBuffer 

22 from git import GitCmdObjectDB 

23 

24EntryTup = Tuple[bytes, int, str] # same as TreeCacheTup in tree.py 

25EntryTupOrNone = Union[EntryTup, None] 

26 

27# --------------------------------------------------- 

28 

29 

30__all__ = ( 

31 "tree_to_stream", 

32 "tree_entries_from_data", 

33 "traverse_trees_recursive", 

34 "traverse_tree_recursive", 

35) 

36 

37 

38def tree_to_stream(entries: Sequence[EntryTup], write: Callable[["ReadableBuffer"], Union[int, None]]) -> None: 

39 """Write the give list of entries into a stream using its write method 

40 :param entries: **sorted** list of tuples with (binsha, mode, name) 

41 :param write: write method which takes a data string""" 

42 ord_zero = ord("0") 

43 bit_mask = 7 # 3 bits set 

44 

45 for binsha, mode, name in entries: 

46 mode_str = b"" 

47 for i in range(6): 

48 mode_str = bytes([((mode >> (i * 3)) & bit_mask) + ord_zero]) + mode_str 

49 # END for each 8 octal value 

50 

51 # git slices away the first octal if its zero 

52 if mode_str[0] == ord_zero: 

53 mode_str = mode_str[1:] 

54 # END save a byte 

55 

56 # here it comes: if the name is actually unicode, the replacement below 

57 # will not work as the binsha is not part of the ascii unicode encoding - 

58 # hence we must convert to an utf8 string for it to work properly. 

59 # According to my tests, this is exactly what git does, that is it just 

60 # takes the input literally, which appears to be utf8 on linux. 

61 if isinstance(name, str): 

62 name_bytes = name.encode(defenc) 

63 else: 

64 name_bytes = name # type: ignore[unreachable] # check runtime types - is always str? 

65 write(b"".join((mode_str, b" ", name_bytes, b"\0", binsha))) 

66 # END for each item 

67 

68 

69def tree_entries_from_data(data: bytes) -> List[EntryTup]: 

70 """Reads the binary representation of a tree and returns tuples of Tree items 

71 :param data: data block with tree data (as bytes) 

72 :return: list(tuple(binsha, mode, tree_relative_path), ...)""" 

73 ord_zero = ord("0") 

74 space_ord = ord(" ") 

75 len_data = len(data) 

76 i = 0 

77 out = [] 

78 while i < len_data: 

79 mode = 0 

80 

81 # read mode 

82 # Some git versions truncate the leading 0, some don't 

83 # The type will be extracted from the mode later 

84 while data[i] != space_ord: 

85 # move existing mode integer up one level being 3 bits 

86 # and add the actual ordinal value of the character 

87 mode = (mode << 3) + (data[i] - ord_zero) 

88 i += 1 

89 # END while reading mode 

90 

91 # byte is space now, skip it 

92 i += 1 

93 

94 # parse name, it is NULL separated 

95 

96 ns = i 

97 while data[i] != 0: 

98 i += 1 

99 # END while not reached NULL 

100 

101 # default encoding for strings in git is utf8 

102 # Only use the respective unicode object if the byte stream was encoded 

103 name_bytes = data[ns:i] 

104 name = safe_decode(name_bytes) 

105 

106 # byte is NULL, get next 20 

107 i += 1 

108 sha = data[i : i + 20] 

109 i = i + 20 

110 out.append((sha, mode, name)) 

111 # END for each byte in data stream 

112 return out 

113 

114 

115def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int) -> EntryTupOrNone: 

116 """return data entry matching the given name and tree mode 

117 or None. 

118 Before the item is returned, the respective data item is set 

119 None in the tree_data list to mark it done""" 

120 

121 try: 

122 item = tree_data[start_at] 

123 if item and item[2] == name and S_ISDIR(item[1]) == is_dir: 

124 tree_data[start_at] = None 

125 return item 

126 except IndexError: 

127 pass 

128 # END exception handling 

129 for index, item in enumerate(tree_data): 

130 if item and item[2] == name and S_ISDIR(item[1]) == is_dir: 

131 tree_data[index] = None 

132 return item 

133 # END if item matches 

134 # END for each item 

135 return None 

136 

137 

138@overload 

139def _to_full_path(item: None, path_prefix: str) -> None: 

140 ... 

141 

142 

143@overload 

144def _to_full_path(item: EntryTup, path_prefix: str) -> EntryTup: 

145 ... 

146 

147 

148def _to_full_path(item: EntryTupOrNone, path_prefix: str) -> EntryTupOrNone: 

149 """Rebuild entry with given path prefix""" 

150 if not item: 

151 return item 

152 return (item[0], item[1], path_prefix + item[2]) 

153 

154 

155def traverse_trees_recursive( 

156 odb: "GitCmdObjectDB", tree_shas: Sequence[Union[bytes, None]], path_prefix: str 

157) -> List[Tuple[EntryTupOrNone, ...]]: 

158 """ 

159 :return: list of list with entries according to the given binary tree-shas. 

160 The result is encoded in a list 

161 of n tuple|None per blob/commit, (n == len(tree_shas)), where 

162 * [0] == 20 byte sha 

163 * [1] == mode as int 

164 * [2] == path relative to working tree root 

165 The entry tuple is None if the respective blob/commit did not 

166 exist in the given tree. 

167 :param tree_shas: iterable of shas pointing to trees. All trees must 

168 be on the same level. A tree-sha may be None in which case None 

169 :param path_prefix: a prefix to be added to the returned paths on this level, 

170 set it '' for the first iteration 

171 :note: The ordering of the returned items will be partially lost""" 

172 trees_data: List[List[EntryTupOrNone]] = [] 

173 

174 nt = len(tree_shas) 

175 for tree_sha in tree_shas: 

176 if tree_sha is None: 

177 data: List[EntryTupOrNone] = [] 

178 else: 

179 # make new list for typing as list invariant 

180 data = list(tree_entries_from_data(odb.stream(tree_sha).read())) 

181 # END handle muted trees 

182 trees_data.append(data) 

183 # END for each sha to get data for 

184 

185 out: List[Tuple[EntryTupOrNone, ...]] = [] 

186 

187 # find all matching entries and recursively process them together if the match 

188 # is a tree. If the match is a non-tree item, put it into the result. 

189 # Processed items will be set None 

190 for ti, tree_data in enumerate(trees_data): 

191 

192 for ii, item in enumerate(tree_data): 

193 if not item: 

194 continue 

195 # END skip already done items 

196 entries: List[EntryTupOrNone] 

197 entries = [None for _ in range(nt)] 

198 entries[ti] = item 

199 _sha, mode, name = item 

200 is_dir = S_ISDIR(mode) # type mode bits 

201 

202 # find this item in all other tree data items 

203 # wrap around, but stop one before our current index, hence 

204 # ti+nt, not ti+1+nt 

205 for tio in range(ti + 1, ti + nt): 

206 tio = tio % nt 

207 entries[tio] = _find_by_name(trees_data[tio], name, is_dir, ii) 

208 

209 # END for each other item data 

210 # if we are a directory, enter recursion 

211 if is_dir: 

212 out.extend( 

213 traverse_trees_recursive( 

214 odb, 

215 [((ei and ei[0]) or None) for ei in entries], 

216 path_prefix + name + "/", 

217 ) 

218 ) 

219 else: 

220 out.append(tuple(_to_full_path(e, path_prefix) for e in entries)) 

221 

222 # END handle recursion 

223 # finally mark it done 

224 tree_data[ii] = None 

225 # END for each item 

226 

227 # we are done with one tree, set all its data empty 

228 del tree_data[:] 

229 # END for each tree_data chunk 

230 return out 

231 

232 

233def traverse_tree_recursive(odb: "GitCmdObjectDB", tree_sha: bytes, path_prefix: str) -> List[EntryTup]: 

234 """ 

235 :return: list of entries of the tree pointed to by the binary tree_sha. An entry 

236 has the following format: 

237 * [0] 20 byte sha 

238 * [1] mode as int 

239 * [2] path relative to the repository 

240 :param path_prefix: prefix to prepend to the front of all returned paths""" 

241 entries = [] 

242 data = tree_entries_from_data(odb.stream(tree_sha).read()) 

243 

244 # unpacking/packing is faster than accessing individual items 

245 for sha, mode, name in data: 

246 if S_ISDIR(mode): 

247 entries.extend(traverse_tree_recursive(odb, sha, path_prefix + name + "/")) 

248 else: 

249 entries.append((sha, mode, path_prefix + name)) 

250 # END for each item 

251 

252 return entries