Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/index/fun.py: 13%

186 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# Contains standalone functions to accompany the index implementation and make it 

2# more versatile 

3# NOTE: Autodoc hates it if this is a docstring 

4 

5from io import BytesIO 

6from pathlib import Path 

7import os 

8from stat import ( 

9 S_IFDIR, 

10 S_IFLNK, 

11 S_ISLNK, 

12 S_ISDIR, 

13 S_IFMT, 

14 S_IFREG, 

15 S_IXUSR, 

16) 

17import subprocess 

18 

19from git.cmd import PROC_CREATIONFLAGS, handle_process_output 

20from git.compat import ( 

21 defenc, 

22 force_text, 

23 force_bytes, 

24 is_posix, 

25 is_win, 

26 safe_decode, 

27) 

28from git.exc import UnmergedEntriesError, HookExecutionError 

29from git.objects.fun import ( 

30 tree_to_stream, 

31 traverse_tree_recursive, 

32 traverse_trees_recursive, 

33) 

34from git.util import IndexFileSHA1Writer, finalize_process 

35from gitdb.base import IStream 

36from gitdb.typ import str_tree_type 

37 

38import os.path as osp 

39 

40from .typ import BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT 

41from .util import pack, unpack 

42 

43# typing ----------------------------------------------------------------------------- 

44 

45from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast 

46 

47from git.types import PathLike 

48 

49if TYPE_CHECKING: 49 ↛ 50line 49 didn't jump to line 50, because the condition on line 49 was never true

50 from .base import IndexFile 

51 from git.db import GitCmdObjectDB 

52 from git.objects.tree import TreeCacheTup 

53 

54 # from git.objects.fun import EntryTupOrNone 

55 

56# ------------------------------------------------------------------------------------ 

57 

58 

59S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule 

60CE_NAMEMASK_INV = ~CE_NAMEMASK 

61 

62__all__ = ( 

63 "write_cache", 

64 "read_cache", 

65 "write_tree_from_cache", 

66 "entry_key", 

67 "stat_mode_to_index_mode", 

68 "S_IFGITLINK", 

69 "run_commit_hook", 

70 "hook_path", 

71) 

72 

73 

74def hook_path(name: str, git_dir: PathLike) -> str: 

75 """:return: path to the given named hook in the given git repository directory""" 

76 return osp.join(git_dir, "hooks", name) 

77 

78 

79def _has_file_extension(path): 

80 return osp.splitext(path)[1] 

81 

82 

83def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None: 

84 """Run the commit hook of the given name. Silently ignores hooks that do not exist. 

85 :param name: name of hook, like 'pre-commit' 

86 :param index: IndexFile instance 

87 :param args: arguments passed to hook file 

88 :raises HookExecutionError:""" 

89 hp = hook_path(name, index.repo.git_dir) 

90 if not os.access(hp, os.X_OK): 

91 return None 

92 

93 env = os.environ.copy() 

94 env["GIT_INDEX_FILE"] = safe_decode(str(index.path)) 

95 env["GIT_EDITOR"] = ":" 

96 cmd = [hp] 

97 try: 

98 if is_win and not _has_file_extension(hp): 

99 # Windows only uses extensions to determine how to open files 

100 # (doesn't understand shebangs). Try using bash to run the hook. 

101 relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix() 

102 cmd = ["bash.exe", relative_hp] 

103 

104 cmd = subprocess.Popen( 

105 cmd + list(args), 

106 env=env, 

107 stdout=subprocess.PIPE, 

108 stderr=subprocess.PIPE, 

109 cwd=index.repo.working_dir, 

110 close_fds=is_posix, 

111 creationflags=PROC_CREATIONFLAGS, 

112 ) 

113 except Exception as ex: 

114 raise HookExecutionError(hp, ex) from ex 

115 else: 

116 stdout_list: List[str] = [] 

117 stderr_list: List[str] = [] 

118 handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process) 

119 stdout = "".join(stdout_list) 

120 stderr = "".join(stderr_list) 

121 if cmd.returncode != 0: 

122 stdout = force_text(stdout, defenc) 

123 stderr = force_text(stderr, defenc) 

124 raise HookExecutionError(hp, cmd.returncode, stderr, stdout) 

125 # end handle return code 

126 

127 

128def stat_mode_to_index_mode(mode: int) -> int: 

129 """Convert the given mode from a stat call to the corresponding index mode 

130 and return it""" 

131 if S_ISLNK(mode): # symlinks 

132 return S_IFLNK 

133 if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules 

134 return S_IFGITLINK 

135 return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit 

136 

137 

138def write_cache( 

139 entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]], 

140 stream: IO[bytes], 

141 extension_data: Union[None, bytes] = None, 

142 ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer, 

143) -> None: 

144 """Write the cache represented by entries to a stream 

145 

146 :param entries: **sorted** list of entries 

147 :param stream: stream to wrap into the AdapterStreamCls - it is used for 

148 final output. 

149 

150 :param ShaStreamCls: Type to use when writing to the stream. It produces a sha 

151 while writing to it, before the data is passed on to the wrapped stream 

152 

153 :param extension_data: any kind of data to write as a trailer, it must begin 

154 a 4 byte identifier, followed by its size ( 4 bytes )""" 

155 # wrap the stream into a compatible writer 

156 stream_sha = ShaStreamCls(stream) 

157 

158 tell = stream_sha.tell 

159 write = stream_sha.write 

160 

161 # header 

162 version = 2 

163 write(b"DIRC") 

164 write(pack(">LL", version, len(entries))) 

165 

166 # body 

167 for entry in entries: 

168 beginoffset = tell() 

169 write(entry.ctime_bytes) # ctime 

170 write(entry.mtime_bytes) # mtime 

171 path_str = str(entry.path) 

172 path: bytes = force_bytes(path_str, encoding=defenc) 

173 plen = len(path) & CE_NAMEMASK # path length 

174 assert plen == len(path), "Path %s too long to fit into index" % entry.path 

175 flags = plen | (entry.flags & CE_NAMEMASK_INV) # clear possible previous values 

176 write( 

177 pack( 

178 ">LLLLLL20sH", 

179 entry.dev, 

180 entry.inode, 

181 entry.mode, 

182 entry.uid, 

183 entry.gid, 

184 entry.size, 

185 entry.binsha, 

186 flags, 

187 ) 

188 ) 

189 write(path) 

190 real_size = (tell() - beginoffset + 8) & ~7 

191 write(b"\0" * ((beginoffset + real_size) - tell())) 

192 # END for each entry 

193 

194 # write previously cached extensions data 

195 if extension_data is not None: 

196 stream_sha.write(extension_data) 

197 

198 # write the sha over the content 

199 stream_sha.write_sha() 

200 

201 

202def read_header(stream: IO[bytes]) -> Tuple[int, int]: 

203 """Return tuple(version_long, num_entries) from the given stream""" 

204 type_id = stream.read(4) 

205 if type_id != b"DIRC": 

206 raise AssertionError("Invalid index file header: %r" % type_id) 

207 unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2))) 

208 version, num_entries = unpacked 

209 

210 # TODO: handle version 3: extended data, see read-cache.c 

211 assert version in (1, 2) 

212 return version, num_entries 

213 

214 

215def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]: 

216 """:return: Key suitable to be used for the index.entries dictionary 

217 :param entry: One instance of type BaseIndexEntry or the path and the stage""" 

218 

219 # def is_entry_key_tup(entry_key: Tuple) -> TypeGuard[Tuple[PathLike, int]]: 

220 # return isinstance(entry_key, tuple) and len(entry_key) == 2 

221 

222 if len(entry) == 1: 

223 entry_first = entry[0] 

224 assert isinstance(entry_first, BaseIndexEntry) 

225 return (entry_first.path, entry_first.stage) 

226 else: 

227 # assert is_entry_key_tup(entry) 

228 entry = cast(Tuple[PathLike, int], entry) 

229 return entry 

230 # END handle entry 

231 

232 

233def read_cache( 

234 stream: IO[bytes], 

235) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]: 

236 """Read a cache file from the given stream 

237 :return: tuple(version, entries_dict, extension_data, content_sha) 

238 * version is the integer version number 

239 * entries dict is a dictionary which maps IndexEntry instances to a path at a stage 

240 * extension_data is '' or 4 bytes of type + 4 bytes of size + size bytes 

241 * content_sha is a 20 byte sha on all cache file contents""" 

242 version, num_entries = read_header(stream) 

243 count = 0 

244 entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {} 

245 

246 read = stream.read 

247 tell = stream.tell 

248 while count < num_entries: 

249 beginoffset = tell() 

250 ctime = unpack(">8s", read(8))[0] 

251 mtime = unpack(">8s", read(8))[0] 

252 (dev, ino, mode, uid, gid, size, sha, flags) = unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2)) 

253 path_size = flags & CE_NAMEMASK 

254 path = read(path_size).decode(defenc) 

255 

256 real_size = (tell() - beginoffset + 8) & ~7 

257 read((beginoffset + real_size) - tell()) 

258 entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size)) 

259 # entry_key would be the method to use, but we safe the effort 

260 entries[(path, entry.stage)] = entry 

261 count += 1 

262 # END for each entry 

263 

264 # the footer contains extension data and a sha on the content so far 

265 # Keep the extension footer,and verify we have a sha in the end 

266 # Extension data format is: 

267 # 4 bytes ID 

268 # 4 bytes length of chunk 

269 # repeated 0 - N times 

270 extension_data = stream.read(~0) 

271 assert ( 

272 len(extension_data) > 19 

273 ), "Index Footer was not at least a sha on content as it was only %i bytes in size" % len(extension_data) 

274 

275 content_sha = extension_data[-20:] 

276 

277 # truncate the sha in the end as we will dynamically create it anyway 

278 extension_data = extension_data[:-20] 

279 

280 return (version, entries, extension_data, content_sha) 

281 

282 

283def write_tree_from_cache( 

284 entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0 

285) -> Tuple[bytes, List["TreeCacheTup"]]: 

286 """Create a tree from the given sorted list of entries and put the respective 

287 trees into the given object database 

288 

289 :param entries: **sorted** list of IndexEntries 

290 :param odb: object database to store the trees in 

291 :param si: start index at which we should start creating subtrees 

292 :param sl: slice indicating the range we should process on the entries list 

293 :return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of 

294 tree entries being a tuple of hexsha, mode, name""" 

295 tree_items: List["TreeCacheTup"] = [] 

296 

297 ci = sl.start 

298 end = sl.stop 

299 while ci < end: 

300 entry = entries[ci] 

301 if entry.stage != 0: 

302 raise UnmergedEntriesError(entry) 

303 # END abort on unmerged 

304 ci += 1 

305 rbound = entry.path.find("/", si) 

306 if rbound == -1: 

307 # its not a tree 

308 tree_items.append((entry.binsha, entry.mode, entry.path[si:])) 

309 else: 

310 # find common base range 

311 base = entry.path[si:rbound] 

312 xi = ci 

313 while xi < end: 

314 oentry = entries[xi] 

315 orbound = oentry.path.find("/", si) 

316 if orbound == -1 or oentry.path[si:orbound] != base: 

317 break 

318 # END abort on base mismatch 

319 xi += 1 

320 # END find common base 

321 

322 # enter recursion 

323 # ci - 1 as we want to count our current item as well 

324 sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1) 

325 tree_items.append((sha, S_IFDIR, base)) 

326 

327 # skip ahead 

328 ci = xi 

329 # END handle bounds 

330 # END for each entry 

331 

332 # finally create the tree 

333 sio = BytesIO() 

334 tree_to_stream(tree_items, sio.write) # writes to stream as bytes, but doesn't change tree_items 

335 sio.seek(0) 

336 

337 istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) 

338 return (istream.binsha, tree_items) 

339 

340 

341def _tree_entry_to_baseindexentry(tree_entry: "TreeCacheTup", stage: int) -> BaseIndexEntry: 

342 return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) 

343 

344 

345def aggressive_tree_merge(odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]: 

346 """ 

347 :return: list of BaseIndexEntries representing the aggressive merge of the given 

348 trees. All valid entries are on stage 0, whereas the conflicting ones are left 

349 on stage 1, 2 or 3, whereas stage 1 corresponds to the common ancestor tree, 

350 2 to our tree and 3 to 'their' tree. 

351 :param tree_shas: 1, 2 or 3 trees as identified by their binary 20 byte shas 

352 If 1 or two, the entries will effectively correspond to the last given tree 

353 If 3 are given, a 3 way merge is performed""" 

354 out: List[BaseIndexEntry] = [] 

355 

356 # one and two way is the same for us, as we don't have to handle an existing 

357 # index, instrea 

358 if len(tree_shas) in (1, 2): 

359 for entry in traverse_tree_recursive(odb, tree_shas[-1], ""): 

360 out.append(_tree_entry_to_baseindexentry(entry, 0)) 

361 # END for each entry 

362 return out 

363 # END handle single tree 

364 

365 if len(tree_shas) > 3: 

366 raise ValueError("Cannot handle %i trees at once" % len(tree_shas)) 

367 

368 # three trees 

369 for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""): 

370 if base is not None: 

371 # base version exists 

372 if ours is not None: 

373 # ours exists 

374 if theirs is not None: 

375 # it exists in all branches, if it was changed in both 

376 # its a conflict, otherwise we take the changed version 

377 # This should be the most common branch, so it comes first 

378 if (base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or ( 

379 base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1] 

380 ): 

381 # changed by both 

382 out.append(_tree_entry_to_baseindexentry(base, 1)) 

383 out.append(_tree_entry_to_baseindexentry(ours, 2)) 

384 out.append(_tree_entry_to_baseindexentry(theirs, 3)) 

385 elif base[0] != ours[0] or base[1] != ours[1]: 

386 # only we changed it 

387 out.append(_tree_entry_to_baseindexentry(ours, 0)) 

388 else: 

389 # either nobody changed it, or they did. In either 

390 # case, use theirs 

391 out.append(_tree_entry_to_baseindexentry(theirs, 0)) 

392 # END handle modification 

393 else: 

394 

395 if ours[0] != base[0] or ours[1] != base[1]: 

396 # they deleted it, we changed it, conflict 

397 out.append(_tree_entry_to_baseindexentry(base, 1)) 

398 out.append(_tree_entry_to_baseindexentry(ours, 2)) 

399 # else: 

400 # we didn't change it, ignore 

401 # pass 

402 # END handle our change 

403 # END handle theirs 

404 else: 

405 if theirs is None: 

406 # deleted in both, its fine - its out 

407 pass 

408 else: 

409 if theirs[0] != base[0] or theirs[1] != base[1]: 

410 # deleted in ours, changed theirs, conflict 

411 out.append(_tree_entry_to_baseindexentry(base, 1)) 

412 out.append(_tree_entry_to_baseindexentry(theirs, 3)) 

413 # END theirs changed 

414 # else: 

415 # theirs didn't change 

416 # pass 

417 # END handle theirs 

418 # END handle ours 

419 else: 

420 # all three can't be None 

421 if ours is None: 

422 # added in their branch 

423 assert theirs is not None 

424 out.append(_tree_entry_to_baseindexentry(theirs, 0)) 

425 elif theirs is None: 

426 # added in our branch 

427 out.append(_tree_entry_to_baseindexentry(ours, 0)) 

428 else: 

429 # both have it, except for the base, see whether it changed 

430 if ours[0] != theirs[0] or ours[1] != theirs[1]: 

431 out.append(_tree_entry_to_baseindexentry(ours, 2)) 

432 out.append(_tree_entry_to_baseindexentry(theirs, 3)) 

433 else: 

434 # it was added the same in both 

435 out.append(_tree_entry_to_baseindexentry(ours, 0)) 

436 # END handle two items 

437 # END handle heads 

438 # END handle base exists 

439 # END for each entries tuple 

440 

441 return out