Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/objects/tree.py: 25%

181 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# tree.py 

2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

3# 

4# This module is part of GitPython and is released under 

5# the BSD License: http://www.opensource.org/licenses/bsd-license.php 

6 

7from git.util import IterableList, join_path 

8import git.diff as git_diff 

9from git.util import to_bin_sha 

10 

11from . import util 

12from .base import IndexObject, IndexObjUnion 

13from .blob import Blob 

14from .submodule.base import Submodule 

15 

16from .fun import tree_entries_from_data, tree_to_stream 

17 

18 

19# typing ------------------------------------------------- 

20 

21from typing import ( 

22 Any, 

23 Callable, 

24 Dict, 

25 Iterable, 

26 Iterator, 

27 List, 

28 Tuple, 

29 Type, 

30 Union, 

31 cast, 

32 TYPE_CHECKING, 

33) 

34 

35from git.types import PathLike, Literal 

36 

37if TYPE_CHECKING: 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true

38 from git.repo import Repo 

39 from io import BytesIO 

40 

41TreeCacheTup = Tuple[bytes, int, str] 

42 

43TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]] 

44 

45 

46# def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[TreeCacheTup]: 

47# return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str) 

48 

49# -------------------------------------------------------- 

50 

51 

52cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b) 52 ↛ exitline 52 didn't run the lambda on line 52

53 

54__all__ = ("TreeModifier", "Tree") 

55 

56 

57def git_cmp(t1: TreeCacheTup, t2: TreeCacheTup) -> int: 

58 a, b = t1[2], t2[2] 

59 # assert isinstance(a, str) and isinstance(b, str) 

60 len_a, len_b = len(a), len(b) 

61 min_len = min(len_a, len_b) 

62 min_cmp = cmp(a[:min_len], b[:min_len]) 

63 

64 if min_cmp: 

65 return min_cmp 

66 

67 return len_a - len_b 

68 

69 

70def merge_sort(a: List[TreeCacheTup], cmp: Callable[[TreeCacheTup, TreeCacheTup], int]) -> None: 

71 if len(a) < 2: 

72 return None 

73 

74 mid = len(a) // 2 

75 lefthalf = a[:mid] 

76 righthalf = a[mid:] 

77 

78 merge_sort(lefthalf, cmp) 

79 merge_sort(righthalf, cmp) 

80 

81 i = 0 

82 j = 0 

83 k = 0 

84 

85 while i < len(lefthalf) and j < len(righthalf): 

86 if cmp(lefthalf[i], righthalf[j]) <= 0: 

87 a[k] = lefthalf[i] 

88 i = i + 1 

89 else: 

90 a[k] = righthalf[j] 

91 j = j + 1 

92 k = k + 1 

93 

94 while i < len(lefthalf): 

95 a[k] = lefthalf[i] 

96 i = i + 1 

97 k = k + 1 

98 

99 while j < len(righthalf): 

100 a[k] = righthalf[j] 

101 j = j + 1 

102 k = k + 1 

103 

104 

105class TreeModifier(object): 

106 

107 """A utility class providing methods to alter the underlying cache in a list-like fashion. 

108 

109 Once all adjustments are complete, the _cache, which really is a reference to 

110 the cache of a tree, will be sorted. Assuring it will be in a serializable state""" 

111 

112 __slots__ = "_cache" 

113 

114 def __init__(self, cache: List[TreeCacheTup]) -> None: 

115 self._cache = cache 

116 

117 def _index_by_name(self, name: str) -> int: 

118 """:return: index of an item with name, or -1 if not found""" 

119 for i, t in enumerate(self._cache): 

120 if t[2] == name: 

121 return i 

122 # END found item 

123 # END for each item in cache 

124 return -1 

125 

126 # { Interface 

127 def set_done(self) -> "TreeModifier": 

128 """Call this method once you are done modifying the tree information. 

129 It may be called several times, but be aware that each call will cause 

130 a sort operation 

131 :return self:""" 

132 merge_sort(self._cache, git_cmp) 

133 return self 

134 

135 # } END interface 

136 

137 # { Mutators 

138 def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier": 

139 """Add the given item to the tree. If an item with the given name already 

140 exists, nothing will be done, but a ValueError will be raised if the 

141 sha and mode of the existing item do not match the one you add, unless 

142 force is True 

143 

144 :param sha: The 20 or 40 byte sha of the item to add 

145 :param mode: int representing the stat compatible mode of the item 

146 :param force: If True, an item with your name and information will overwrite 

147 any existing item with the same name, no matter which information it has 

148 :return: self""" 

149 if "/" in name: 

150 raise ValueError("Name must not contain '/' characters") 

151 if (mode >> 12) not in Tree._map_id_to_type: 

152 raise ValueError("Invalid object type according to mode %o" % mode) 

153 

154 sha = to_bin_sha(sha) 

155 index = self._index_by_name(name) 

156 

157 item = (sha, mode, name) 

158 # assert is_tree_cache(item) 

159 

160 if index == -1: 

161 self._cache.append(item) 

162 else: 

163 if force: 

164 self._cache[index] = item 

165 else: 

166 ex_item = self._cache[index] 

167 if ex_item[0] != sha or ex_item[1] != mode: 

168 raise ValueError("Item %r existed with different properties" % name) 

169 # END handle mismatch 

170 # END handle force 

171 # END handle name exists 

172 return self 

173 

174 def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None: 

175 """Add the given item to the tree, its correctness is assumed, which 

176 puts the caller into responsibility to assure the input is correct. 

177 For more information on the parameters, see ``add`` 

178 :param binsha: 20 byte binary sha""" 

179 assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str) 

180 tree_cache = (binsha, mode, name) 

181 

182 self._cache.append(tree_cache) 

183 

184 def __delitem__(self, name: str) -> None: 

185 """Deletes an item with the given name if it exists""" 

186 index = self._index_by_name(name) 

187 if index > -1: 

188 del self._cache[index] 

189 

190 # } END mutators 

191 

192 

193class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): 

194 

195 """Tree objects represent an ordered list of Blobs and other Trees. 

196 

197 ``Tree as a list``:: 

198 

199 Access a specific blob using the 

200 tree['filename'] notation. 

201 

202 You may as well access by index 

203 blob = tree[0] 

204 """ 

205 

206 type: Literal["tree"] = "tree" 

207 __slots__ = "_cache" 

208 

209 # actual integer ids for comparison 

210 commit_id = 0o16 # equals stat.S_IFDIR | stat.S_IFLNK - a directory link 

211 blob_id = 0o10 

212 symlink_id = 0o12 

213 tree_id = 0o04 

214 

215 _map_id_to_type: Dict[int, Type[IndexObjUnion]] = { 

216 commit_id: Submodule, 

217 blob_id: Blob, 

218 symlink_id: Blob 

219 # tree id added once Tree is defined 

220 } 

221 

222 def __init__( 

223 self, 

224 repo: "Repo", 

225 binsha: bytes, 

226 mode: int = tree_id << 12, 

227 path: Union[PathLike, None] = None, 

228 ): 

229 super(Tree, self).__init__(repo, binsha, mode, path) 

230 

231 @classmethod 

232 def _get_intermediate_items( 

233 cls, 

234 index_object: IndexObjUnion, 

235 ) -> Union[Tuple["Tree", ...], Tuple[()]]: 

236 if index_object.type == "tree": 

237 return tuple(index_object._iter_convert_to_object(index_object._cache)) 

238 return () 

239 

240 def _set_cache_(self, attr: str) -> None: 

241 if attr == "_cache": 

242 # Set the data when we need it 

243 ostream = self.repo.odb.stream(self.binsha) 

244 self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read()) 

245 else: 

246 super(Tree, self)._set_cache_(attr) 

247 # END handle attribute 

248 

249 def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]: 

250 """Iterable yields tuples of (binsha, mode, name), which will be converted 

251 to the respective object representation""" 

252 for binsha, mode, name in iterable: 

253 path = join_path(self.path, name) 

254 try: 

255 yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path) 

256 except KeyError as e: 

257 raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e 

258 # END for each item 

259 

260 def join(self, file: str) -> IndexObjUnion: 

261 """Find the named object in this tree's contents 

262 :return: ``git.Blob`` or ``git.Tree`` or ``git.Submodule`` 

263 

264 :raise KeyError: if given file or tree does not exist in tree""" 

265 msg = "Blob or Tree named %r not found" 

266 if "/" in file: 

267 tree = self 

268 item = self 

269 tokens = file.split("/") 

270 for i, token in enumerate(tokens): 

271 item = tree[token] 

272 if item.type == "tree": 

273 tree = item 

274 else: 

275 # safety assertion - blobs are at the end of the path 

276 if i != len(tokens) - 1: 

277 raise KeyError(msg % file) 

278 return item 

279 # END handle item type 

280 # END for each token of split path 

281 if item == self: 

282 raise KeyError(msg % file) 

283 return item 

284 else: 

285 for info in self._cache: 

286 if info[2] == file: # [2] == name 

287 return self._map_id_to_type[info[1] >> 12]( 

288 self.repo, info[0], info[1], join_path(self.path, info[2]) 

289 ) 

290 # END for each obj 

291 raise KeyError(msg % file) 

292 # END handle long paths 

293 

294 def __truediv__(self, file: str) -> IndexObjUnion: 

295 """For PY3 only""" 

296 return self.join(file) 

297 

298 @property 

299 def trees(self) -> List["Tree"]: 

300 """:return: list(Tree, ...) list of trees directly below this tree""" 

301 return [i for i in self if i.type == "tree"] 

302 

303 @property 

304 def blobs(self) -> List[Blob]: 

305 """:return: list(Blob, ...) list of blobs directly below this tree""" 

306 return [i for i in self if i.type == "blob"] 

307 

308 @property 

309 def cache(self) -> TreeModifier: 

310 """ 

311 :return: An object allowing to modify the internal cache. This can be used 

312 to change the tree's contents. When done, make sure you call ``set_done`` 

313 on the tree modifier, or serialization behaviour will be incorrect. 

314 See the ``TreeModifier`` for more information on how to alter the cache""" 

315 return TreeModifier(self._cache) 

316 

317 def traverse( 317 ↛ exitline 317 didn't jump to the function exit

318 self, # type: ignore[override] 

319 predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True, 

320 prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False, 

321 depth: int = -1, 

322 branch_first: bool = True, 

323 visit_once: bool = False, 

324 ignore_self: int = 1, 

325 as_edge: bool = False, 

326 ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]: 

327 """For documentation, see util.Traversable._traverse() 

328 Trees are set to visit_once = False to gain more performance in the traversal""" 

329 

330 # """ 

331 # # To typecheck instead of using cast. 

332 # import itertools 

333 # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]: 

334 # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1]) 

335 

336 # ret = super(Tree, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self) 

337 # ret_tup = itertools.tee(ret, 2) 

338 # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}" 

339 # return ret_tup[0]""" 

340 return cast( 

341 Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]], 

342 super(Tree, self)._traverse( 

343 predicate, 

344 prune, 

345 depth, # type: ignore 

346 branch_first, 

347 visit_once, 

348 ignore_self, 

349 ), 

350 ) 

351 

352 def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]: 

353 """ 

354 :return: IterableList with the results of the traversal as produced by 

355 traverse() 

356 Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] 

357 """ 

358 return super(Tree, self)._list_traverse(*args, **kwargs) 

359 

360 # List protocol 

361 

362 def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]: 

363 return list(self._iter_convert_to_object(self._cache[i:j])) 

364 

365 def __iter__(self) -> Iterator[IndexObjUnion]: 

366 return self._iter_convert_to_object(self._cache) 

367 

368 def __len__(self) -> int: 

369 return len(self._cache) 

370 

371 def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion: 

372 if isinstance(item, int): 

373 info = self._cache[item] 

374 return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2])) 

375 

376 if isinstance(item, str): 

377 # compatibility 

378 return self.join(item) 

379 # END index is basestring 

380 

381 raise TypeError("Invalid index type: %r" % item) 

382 

383 def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool: 

384 if isinstance(item, IndexObject): 

385 for info in self._cache: 

386 if item.binsha == info[0]: 

387 return True 

388 # END compare sha 

389 # END for each entry 

390 # END handle item is index object 

391 # compatibility 

392 

393 # treat item as repo-relative path 

394 else: 

395 path = self.path 

396 for info in self._cache: 

397 if item == join_path(path, info[2]): 

398 return True 

399 # END for each item 

400 return False 

401 

402 def __reversed__(self) -> Iterator[IndexObjUnion]: 

403 return reversed(self._iter_convert_to_object(self._cache)) # type: ignore 

404 

405 def _serialize(self, stream: "BytesIO") -> "Tree": 

406 """Serialize this tree into the stream. Please note that we will assume 

407 our tree data to be in a sorted state. If this is not the case, serialization 

408 will not generate a correct tree representation as these are assumed to be sorted 

409 by algorithms""" 

410 tree_to_stream(self._cache, stream.write) 

411 return self 

412 

413 def _deserialize(self, stream: "BytesIO") -> "Tree": 

414 self._cache = tree_entries_from_data(stream.read()) 

415 return self 

416 

417 

418# END tree 

419 

420# finalize map definition 

421Tree._map_id_to_type[Tree.tree_id] = Tree 

422#