Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/diff.py: 19%

236 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# diff.py 

2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

3# 

4# This module is part of GitPython and is released under 

5# the BSD License: http://www.opensource.org/licenses/bsd-license.php 

6 

7import re 

8from git.cmd import handle_process_output 

9from git.compat import defenc 

10from git.util import finalize_process, hex_to_bin 

11 

12from .objects.blob import Blob 

13from .objects.util import mode_str_to_int 

14 

15 

16# typing ------------------------------------------------------------------ 

17 

18from typing import ( 

19 Any, 

20 Iterator, 

21 List, 

22 Match, 

23 Optional, 

24 Tuple, 

25 Type, 

26 TypeVar, 

27 Union, 

28 TYPE_CHECKING, 

29 cast, 

30) 

31from git.types import PathLike, Literal 

32 

33if TYPE_CHECKING: 33 ↛ 34line 33 didn't jump to line 34, because the condition on line 33 was never true

34 from .objects.tree import Tree 

35 from .objects import Commit 

36 from git.repo.base import Repo 

37 from git.objects.base import IndexObject 

38 from subprocess import Popen 

39 from git import Git 

40 

41Lit_change_type = Literal["A", "D", "C", "M", "R", "T", "U"] 

42 

43 

44# def is_change_type(inp: str) -> TypeGuard[Lit_change_type]: 

45# # return True 

46# return inp in ['A', 'D', 'C', 'M', 'R', 'T', 'U'] 

47 

48# ------------------------------------------------------------------------ 

49 

50 

51__all__ = ("Diffable", "DiffIndex", "Diff", "NULL_TREE") 

52 

53# Special object to compare against the empty tree in diffs 

54NULL_TREE = object() 

55 

56_octal_byte_re = re.compile(b"\\\\([0-9]{3})") 

57 

58 

59def _octal_repl(matchobj: Match) -> bytes: 

60 value = matchobj.group(1) 

61 value = int(value, 8) 

62 value = bytes(bytearray((value,))) 

63 return value 

64 

65 

66def decode_path(path: bytes, has_ab_prefix: bool = True) -> Optional[bytes]: 

67 if path == b"/dev/null": 

68 return None 

69 

70 if path.startswith(b'"') and path.endswith(b'"'): 

71 path = path[1:-1].replace(b"\\n", b"\n").replace(b"\\t", b"\t").replace(b'\\"', b'"').replace(b"\\\\", b"\\") 

72 

73 path = _octal_byte_re.sub(_octal_repl, path) 

74 

75 if has_ab_prefix: 

76 assert path.startswith(b"a/") or path.startswith(b"b/") 

77 path = path[2:] 

78 

79 return path 

80 

81 

82class Diffable(object): 

83 

84 """Common interface for all object that can be diffed against another object of compatible type. 

85 

86 :note: 

87 Subclasses require a repo member as it is the case for Object instances, for practical 

88 reasons we do not derive from Object.""" 

89 

90 __slots__ = () 

91 

92 # standin indicating you want to diff against the index 

93 class Index(object): 

94 pass 

95 

96 def _process_diff_args( 

97 self, args: List[Union[str, "Diffable", Type["Diffable.Index"], object]] 

98 ) -> List[Union[str, "Diffable", Type["Diffable.Index"], object]]: 

99 """ 

100 :return: 

101 possibly altered version of the given args list. 

102 Method is called right before git command execution. 

103 Subclasses can use it to alter the behaviour of the superclass""" 

104 return args 

105 

106 def diff( 

107 self, 

108 other: Union[Type["Index"], "Tree", "Commit", None, str, object] = Index, 

109 paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None, 

110 create_patch: bool = False, 

111 **kwargs: Any, 

112 ) -> "DiffIndex": 

113 """Creates diffs between two items being trees, trees and index or an 

114 index and the working tree. It will detect renames automatically. 

115 

116 :param other: 

117 Is the item to compare us with. 

118 If None, we will be compared to the working tree. 

119 If Treeish, it will be compared against the respective tree 

120 If Index ( type ), it will be compared against the index. 

121 If git.NULL_TREE, it will compare against the empty tree. 

122 It defaults to Index to assure the method will not by-default fail 

123 on bare repositories. 

124 

125 :param paths: 

126 is a list of paths or a single path to limit the diff to. 

127 It will only include at least one of the given path or paths. 

128 

129 :param create_patch: 

130 If True, the returned Diff contains a detailed patch that if applied 

131 makes the self to other. Patches are somewhat costly as blobs have to be read 

132 and diffed. 

133 

134 :param kwargs: 

135 Additional arguments passed to git-diff, such as 

136 R=True to swap both sides of the diff. 

137 

138 :return: git.DiffIndex 

139 

140 :note: 

141 On a bare repository, 'other' needs to be provided as Index or as 

142 as Tree/Commit, or a git command error will occur""" 

143 args: List[Union[PathLike, Diffable, Type["Diffable.Index"], object]] = [] 

144 args.append("--abbrev=40") # we need full shas 

145 args.append("--full-index") # get full index paths, not only filenames 

146 

147 args.append("-M") # check for renames, in both formats 

148 if create_patch: 

149 args.append("-p") 

150 else: 

151 args.append("--raw") 

152 args.append("-z") 

153 

154 # in any way, assure we don't see colored output, 

155 # fixes https://github.com/gitpython-developers/GitPython/issues/172 

156 args.append("--no-color") 

157 

158 if paths is not None and not isinstance(paths, (tuple, list)): 

159 paths = [paths] 

160 

161 if hasattr(self, "Has_Repo"): 

162 self.repo: "Repo" = self.repo 

163 

164 diff_cmd = self.repo.git.diff 

165 if other is self.Index: 

166 args.insert(0, "--cached") 

167 elif other is NULL_TREE: 

168 args.insert(0, "-r") # recursive diff-tree 

169 args.insert(0, "--root") 

170 diff_cmd = self.repo.git.diff_tree 

171 elif other is not None: 

172 args.insert(0, "-r") # recursive diff-tree 

173 args.insert(0, other) 

174 diff_cmd = self.repo.git.diff_tree 

175 

176 args.insert(0, self) 

177 

178 # paths is list here or None 

179 if paths: 

180 args.append("--") 

181 args.extend(paths) 

182 # END paths handling 

183 

184 kwargs["as_process"] = True 

185 proc = diff_cmd(*self._process_diff_args(args), **kwargs) 

186 

187 diff_method = Diff._index_from_patch_format if create_patch else Diff._index_from_raw_format 

188 index = diff_method(self.repo, proc) 

189 

190 proc.wait() 

191 return index 

192 

193 

194T_Diff = TypeVar("T_Diff", bound="Diff") 

195 

196 

197class DiffIndex(List[T_Diff]): 

198 

199 """Implements an Index for diffs, allowing a list of Diffs to be queried by 

200 the diff properties. 

201 

202 The class improves the diff handling convenience""" 

203 

204 # change type invariant identifying possible ways a blob can have changed 

205 # A = Added 

206 # D = Deleted 

207 # R = Renamed 

208 # M = Modified 

209 # T = Changed in the type 

210 change_type = ("A", "C", "D", "R", "M", "T") 

211 

212 def iter_change_type(self, change_type: Lit_change_type) -> Iterator[T_Diff]: 

213 """ 

214 :return: 

215 iterator yielding Diff instances that match the given change_type 

216 

217 :param change_type: 

218 Member of DiffIndex.change_type, namely: 

219 

220 * 'A' for added paths 

221 * 'D' for deleted paths 

222 * 'R' for renamed paths 

223 * 'M' for paths with modified data 

224 * 'T' for changed in the type paths 

225 """ 

226 if change_type not in self.change_type: 

227 raise ValueError("Invalid change type: %s" % change_type) 

228 

229 for diffidx in self: 

230 if diffidx.change_type == change_type: 

231 yield diffidx 

232 elif change_type == "A" and diffidx.new_file: 

233 yield diffidx 

234 elif change_type == "D" and diffidx.deleted_file: 

235 yield diffidx 

236 elif change_type == "C" and diffidx.copied_file: 

237 yield diffidx 

238 elif change_type == "R" and diffidx.renamed: 

239 yield diffidx 

240 elif change_type == "M" and diffidx.a_blob and diffidx.b_blob and diffidx.a_blob != diffidx.b_blob: 

241 yield diffidx 

242 # END for each diff 

243 

244 

245class Diff(object): 

246 

247 """A Diff contains diff information between two Trees. 

248 

249 It contains two sides a and b of the diff, members are prefixed with 

250 "a" and "b" respectively to inidcate that. 

251 

252 Diffs keep information about the changed blob objects, the file mode, renames, 

253 deletions and new files. 

254 

255 There are a few cases where None has to be expected as member variable value: 

256 

257 ``New File``:: 

258 

259 a_mode is None 

260 a_blob is None 

261 a_path is None 

262 

263 ``Deleted File``:: 

264 

265 b_mode is None 

266 b_blob is None 

267 b_path is None 

268 

269 ``Working Tree Blobs`` 

270 

271 When comparing to working trees, the working tree blob will have a null hexsha 

272 as a corresponding object does not yet exist. The mode will be null as well. 

273 But the path will be available though. 

274 If it is listed in a diff the working tree version of the file must 

275 be different to the version in the index or tree, and hence has been modified.""" 

276 

277 # precompiled regex 

278 re_header = re.compile( 

279 rb""" 

280 ^diff[ ]--git 

281 [ ](?P<a_path_fallback>"?[ab]/.+?"?)[ ](?P<b_path_fallback>"?[ab]/.+?"?)\n 

282 (?:^old[ ]mode[ ](?P<old_mode>\d+)\n 

283 ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))? 

284 (?:^similarity[ ]index[ ]\d+%\n 

285 ^rename[ ]from[ ](?P<rename_from>.*)\n 

286 ^rename[ ]to[ ](?P<rename_to>.*)(?:\n|$))? 

287 (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))? 

288 (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))? 

289 (?:^similarity[ ]index[ ]\d+%\n 

290 ^copy[ ]from[ ].*\n 

291 ^copy[ ]to[ ](?P<copied_file_name>.*)(?:\n|$))? 

292 (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+) 

293 \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))? 

294 (?:^---[ ](?P<a_path>[^\t\n\r\f\v]*)[\t\r\f\v]*(?:\n|$))? 

295 (?:^\+\+\+[ ](?P<b_path>[^\t\n\r\f\v]*)[\t\r\f\v]*(?:\n|$))? 

296 """, 

297 re.VERBOSE | re.MULTILINE, 

298 ) 

299 # can be used for comparisons 

300 NULL_HEX_SHA = "0" * 40 

301 NULL_BIN_SHA = b"\0" * 20 

302 

303 __slots__ = ( 

304 "a_blob", 

305 "b_blob", 

306 "a_mode", 

307 "b_mode", 

308 "a_rawpath", 

309 "b_rawpath", 

310 "new_file", 

311 "deleted_file", 

312 "copied_file", 

313 "raw_rename_from", 

314 "raw_rename_to", 

315 "diff", 

316 "change_type", 

317 "score", 

318 ) 

319 

320 def __init__( 

321 self, 

322 repo: "Repo", 

323 a_rawpath: Optional[bytes], 

324 b_rawpath: Optional[bytes], 

325 a_blob_id: Union[str, bytes, None], 

326 b_blob_id: Union[str, bytes, None], 

327 a_mode: Union[bytes, str, None], 

328 b_mode: Union[bytes, str, None], 

329 new_file: bool, 

330 deleted_file: bool, 

331 copied_file: bool, 

332 raw_rename_from: Optional[bytes], 

333 raw_rename_to: Optional[bytes], 

334 diff: Union[str, bytes, None], 

335 change_type: Optional[Lit_change_type], 

336 score: Optional[int], 

337 ) -> None: 

338 

339 assert a_rawpath is None or isinstance(a_rawpath, bytes) 

340 assert b_rawpath is None or isinstance(b_rawpath, bytes) 

341 self.a_rawpath = a_rawpath 

342 self.b_rawpath = b_rawpath 

343 

344 self.a_mode = mode_str_to_int(a_mode) if a_mode else None 

345 self.b_mode = mode_str_to_int(b_mode) if b_mode else None 

346 

347 # Determine whether this diff references a submodule, if it does then 

348 # we need to overwrite "repo" to the corresponding submodule's repo instead 

349 if repo and a_rawpath: 

350 for submodule in repo.submodules: 

351 if submodule.path == a_rawpath.decode(defenc, "replace"): 

352 if submodule.module_exists(): 

353 repo = submodule.module() 

354 break 

355 

356 self.a_blob: Union["IndexObject", None] 

357 if a_blob_id is None or a_blob_id == self.NULL_HEX_SHA: 

358 self.a_blob = None 

359 else: 

360 self.a_blob = Blob(repo, hex_to_bin(a_blob_id), mode=self.a_mode, path=self.a_path) 

361 

362 self.b_blob: Union["IndexObject", None] 

363 if b_blob_id is None or b_blob_id == self.NULL_HEX_SHA: 

364 self.b_blob = None 

365 else: 

366 self.b_blob = Blob(repo, hex_to_bin(b_blob_id), mode=self.b_mode, path=self.b_path) 

367 

368 self.new_file: bool = new_file 

369 self.deleted_file: bool = deleted_file 

370 self.copied_file: bool = copied_file 

371 

372 # be clear and use None instead of empty strings 

373 assert raw_rename_from is None or isinstance(raw_rename_from, bytes) 

374 assert raw_rename_to is None or isinstance(raw_rename_to, bytes) 

375 self.raw_rename_from = raw_rename_from or None 

376 self.raw_rename_to = raw_rename_to or None 

377 

378 self.diff = diff 

379 self.change_type: Union[Lit_change_type, None] = change_type 

380 self.score = score 

381 

382 def __eq__(self, other: object) -> bool: 

383 for name in self.__slots__: 

384 if getattr(self, name) != getattr(other, name): 

385 return False 

386 # END for each name 

387 return True 

388 

389 def __ne__(self, other: object) -> bool: 

390 return not (self == other) 

391 

392 def __hash__(self) -> int: 

393 return hash(tuple(getattr(self, n) for n in self.__slots__)) 

394 

395 def __str__(self) -> str: 

396 h: str = "%s" 

397 if self.a_blob: 

398 h %= self.a_blob.path 

399 elif self.b_blob: 

400 h %= self.b_blob.path 

401 

402 msg: str = "" 

403 line = None # temp line 

404 line_length = 0 # line length 

405 for b, n in zip((self.a_blob, self.b_blob), ("lhs", "rhs")): 

406 if b: 

407 line = "\n%s: %o | %s" % (n, b.mode, b.hexsha) 

408 else: 

409 line = "\n%s: None" % n 

410 # END if blob is not None 

411 line_length = max(len(line), line_length) 

412 msg += line 

413 # END for each blob 

414 

415 # add headline 

416 h += "\n" + "=" * line_length 

417 

418 if self.deleted_file: 

419 msg += "\nfile deleted in rhs" 

420 if self.new_file: 

421 msg += "\nfile added in rhs" 

422 if self.copied_file: 

423 msg += "\nfile %r copied from %r" % (self.b_path, self.a_path) 

424 if self.rename_from: 

425 msg += "\nfile renamed from %r" % self.rename_from 

426 if self.rename_to: 

427 msg += "\nfile renamed to %r" % self.rename_to 

428 if self.diff: 

429 msg += "\n---" 

430 try: 

431 msg += self.diff.decode(defenc) if isinstance(self.diff, bytes) else self.diff 

432 except UnicodeDecodeError: 

433 msg += "OMITTED BINARY DATA" 

434 # end handle encoding 

435 msg += "\n---" 

436 # END diff info 

437 

438 # Python2 silliness: have to assure we convert our likely to be unicode object to a string with the 

439 # right encoding. Otherwise it tries to convert it using ascii, which may fail ungracefully 

440 res = h + msg 

441 # end 

442 return res 

443 

444 @property 

445 def a_path(self) -> Optional[str]: 

446 return self.a_rawpath.decode(defenc, "replace") if self.a_rawpath else None 

447 

448 @property 

449 def b_path(self) -> Optional[str]: 

450 return self.b_rawpath.decode(defenc, "replace") if self.b_rawpath else None 

451 

452 @property 

453 def rename_from(self) -> Optional[str]: 

454 return self.raw_rename_from.decode(defenc, "replace") if self.raw_rename_from else None 

455 

456 @property 

457 def rename_to(self) -> Optional[str]: 

458 return self.raw_rename_to.decode(defenc, "replace") if self.raw_rename_to else None 

459 

460 @property 

461 def renamed(self) -> bool: 

462 """:returns: True if the blob of our diff has been renamed 

463 :note: This property is deprecated, please use ``renamed_file`` instead. 

464 """ 

465 return self.renamed_file 

466 

467 @property 

468 def renamed_file(self) -> bool: 

469 """:returns: True if the blob of our diff has been renamed""" 

470 return self.rename_from != self.rename_to 

471 

472 @classmethod 

473 def _pick_best_path(cls, path_match: bytes, rename_match: bytes, path_fallback_match: bytes) -> Optional[bytes]: 

474 if path_match: 

475 return decode_path(path_match) 

476 

477 if rename_match: 

478 return decode_path(rename_match, has_ab_prefix=False) 

479 

480 if path_fallback_match: 

481 return decode_path(path_fallback_match) 

482 

483 return None 

484 

485 @classmethod 

486 def _index_from_patch_format(cls, repo: "Repo", proc: Union["Popen", "Git.AutoInterrupt"]) -> DiffIndex: 

487 """Create a new DiffIndex from the given text which must be in patch format 

488 :param repo: is the repository we are operating on - it is required 

489 :param stream: result of 'git diff' as a stream (supporting file protocol) 

490 :return: git.DiffIndex""" 

491 

492 ## FIXME: Here SLURPING raw, need to re-phrase header-regexes linewise. 

493 text_list: List[bytes] = [] 

494 handle_process_output(proc, text_list.append, None, finalize_process, decode_streams=False) 

495 

496 # for now, we have to bake the stream 

497 text = b"".join(text_list) 

498 index: "DiffIndex" = DiffIndex() 

499 previous_header: Union[Match[bytes], None] = None 

500 header: Union[Match[bytes], None] = None 

501 a_path, b_path = None, None # for mypy 

502 a_mode, b_mode = None, None # for mypy 

503 for _header in cls.re_header.finditer(text): 

504 ( 

505 a_path_fallback, 

506 b_path_fallback, 

507 old_mode, 

508 new_mode, 

509 rename_from, 

510 rename_to, 

511 new_file_mode, 

512 deleted_file_mode, 

513 copied_file_name, 

514 a_blob_id, 

515 b_blob_id, 

516 b_mode, 

517 a_path, 

518 b_path, 

519 ) = _header.groups() 

520 

521 new_file, deleted_file, copied_file = ( 

522 bool(new_file_mode), 

523 bool(deleted_file_mode), 

524 bool(copied_file_name), 

525 ) 

526 

527 a_path = cls._pick_best_path(a_path, rename_from, a_path_fallback) 

528 b_path = cls._pick_best_path(b_path, rename_to, b_path_fallback) 

529 

530 # Our only means to find the actual text is to see what has not been matched by our regex, 

531 # and then retro-actively assign it to our index 

532 if previous_header is not None: 

533 index[-1].diff = text[previous_header.end() : _header.start()] 

534 # end assign actual diff 

535 

536 # Make sure the mode is set if the path is set. Otherwise the resulting blob is invalid 

537 # We just use the one mode we should have parsed 

538 a_mode = old_mode or deleted_file_mode or (a_path and (b_mode or new_mode or new_file_mode)) 

539 b_mode = b_mode or new_mode or new_file_mode or (b_path and a_mode) 

540 index.append( 

541 Diff( 

542 repo, 

543 a_path, 

544 b_path, 

545 a_blob_id and a_blob_id.decode(defenc), 

546 b_blob_id and b_blob_id.decode(defenc), 

547 a_mode and a_mode.decode(defenc), 

548 b_mode and b_mode.decode(defenc), 

549 new_file, 

550 deleted_file, 

551 copied_file, 

552 rename_from, 

553 rename_to, 

554 None, 

555 None, 

556 None, 

557 ) 

558 ) 

559 

560 previous_header = _header 

561 header = _header 

562 # end for each header we parse 

563 if index and header: 

564 index[-1].diff = text[header.end() :] 

565 # end assign last diff 

566 

567 return index 

568 

569 @staticmethod 

570 def _handle_diff_line(lines_bytes: bytes, repo: "Repo", index: DiffIndex) -> None: 

571 lines = lines_bytes.decode(defenc) 

572 

573 # Discard everything before the first colon, and the colon itself. 

574 _, _, lines = lines.partition(":") 

575 

576 for line in lines.split("\x00:"): 

577 if not line: 

578 # The line data is empty, skip 

579 continue 

580 meta, _, path = line.partition("\x00") 

581 path = path.rstrip("\x00") 

582 a_blob_id: Optional[str] 

583 b_blob_id: Optional[str] 

584 old_mode, new_mode, a_blob_id, b_blob_id, _change_type = meta.split(None, 4) 

585 # Change type can be R100 

586 # R: status letter 

587 # 100: score (in case of copy and rename) 

588 # assert is_change_type(_change_type[0]), f"Unexpected value for change_type received: {_change_type[0]}" 

589 change_type: Lit_change_type = cast(Lit_change_type, _change_type[0]) 

590 score_str = "".join(_change_type[1:]) 

591 score = int(score_str) if score_str.isdigit() else None 

592 path = path.strip() 

593 a_path = path.encode(defenc) 

594 b_path = path.encode(defenc) 

595 deleted_file = False 

596 new_file = False 

597 copied_file = False 

598 rename_from = None 

599 rename_to = None 

600 

601 # NOTE: We cannot conclude from the existence of a blob to change type 

602 # as diffs with the working do not have blobs yet 

603 if change_type == "D": 

604 b_blob_id = None # Optional[str] 

605 deleted_file = True 

606 elif change_type == "A": 

607 a_blob_id = None 

608 new_file = True 

609 elif change_type == "C": 

610 copied_file = True 

611 a_path_str, b_path_str = path.split("\x00", 1) 

612 a_path = a_path_str.encode(defenc) 

613 b_path = b_path_str.encode(defenc) 

614 elif change_type == "R": 

615 a_path_str, b_path_str = path.split("\x00", 1) 

616 a_path = a_path_str.encode(defenc) 

617 b_path = b_path_str.encode(defenc) 

618 rename_from, rename_to = a_path, b_path 

619 elif change_type == "T": 

620 # Nothing to do 

621 pass 

622 # END add/remove handling 

623 

624 diff = Diff( 

625 repo, 

626 a_path, 

627 b_path, 

628 a_blob_id, 

629 b_blob_id, 

630 old_mode, 

631 new_mode, 

632 new_file, 

633 deleted_file, 

634 copied_file, 

635 rename_from, 

636 rename_to, 

637 "", 

638 change_type, 

639 score, 

640 ) 

641 index.append(diff) 

642 

643 @classmethod 

644 def _index_from_raw_format(cls, repo: "Repo", proc: "Popen") -> "DiffIndex": 

645 """Create a new DiffIndex from the given stream which must be in raw format. 

646 :return: git.DiffIndex""" 

647 # handles 

648 # :100644 100644 687099101... 37c5e30c8... M .gitignore 

649 

650 index: "DiffIndex" = DiffIndex() 

651 handle_process_output( 

652 proc, 

653 lambda byt: cls._handle_diff_line(byt, repo, index), 

654 None, 

655 finalize_process, 

656 decode_streams=False, 

657 ) 

658 

659 return index