Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/index/fun.py: 13%
186 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# Contains standalone functions to accompany the index implementation and make it
2# more versatile
3# NOTE: Autodoc hates it if this is a docstring
5from io import BytesIO
6from pathlib import Path
7import os
8from stat import (
9 S_IFDIR,
10 S_IFLNK,
11 S_ISLNK,
12 S_ISDIR,
13 S_IFMT,
14 S_IFREG,
15 S_IXUSR,
16)
17import subprocess
19from git.cmd import PROC_CREATIONFLAGS, handle_process_output
20from git.compat import (
21 defenc,
22 force_text,
23 force_bytes,
24 is_posix,
25 is_win,
26 safe_decode,
27)
28from git.exc import UnmergedEntriesError, HookExecutionError
29from git.objects.fun import (
30 tree_to_stream,
31 traverse_tree_recursive,
32 traverse_trees_recursive,
33)
34from git.util import IndexFileSHA1Writer, finalize_process
35from gitdb.base import IStream
36from gitdb.typ import str_tree_type
38import os.path as osp
40from .typ import BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT
41from .util import pack, unpack
43# typing -----------------------------------------------------------------------------
45from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast
47from git.types import PathLike
49if TYPE_CHECKING: 49 ↛ 50line 49 didn't jump to line 50, because the condition on line 49 was never true
50 from .base import IndexFile
51 from git.db import GitCmdObjectDB
52 from git.objects.tree import TreeCacheTup
54 # from git.objects.fun import EntryTupOrNone
56# ------------------------------------------------------------------------------------
59S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule
60CE_NAMEMASK_INV = ~CE_NAMEMASK
62__all__ = (
63 "write_cache",
64 "read_cache",
65 "write_tree_from_cache",
66 "entry_key",
67 "stat_mode_to_index_mode",
68 "S_IFGITLINK",
69 "run_commit_hook",
70 "hook_path",
71)
74def hook_path(name: str, git_dir: PathLike) -> str:
75 """:return: path to the given named hook in the given git repository directory"""
76 return osp.join(git_dir, "hooks", name)
79def _has_file_extension(path):
80 return osp.splitext(path)[1]
83def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None:
84 """Run the commit hook of the given name. Silently ignores hooks that do not exist.
85 :param name: name of hook, like 'pre-commit'
86 :param index: IndexFile instance
87 :param args: arguments passed to hook file
88 :raises HookExecutionError:"""
89 hp = hook_path(name, index.repo.git_dir)
90 if not os.access(hp, os.X_OK):
91 return None
93 env = os.environ.copy()
94 env["GIT_INDEX_FILE"] = safe_decode(str(index.path))
95 env["GIT_EDITOR"] = ":"
96 cmd = [hp]
97 try:
98 if is_win and not _has_file_extension(hp):
99 # Windows only uses extensions to determine how to open files
100 # (doesn't understand shebangs). Try using bash to run the hook.
101 relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix()
102 cmd = ["bash.exe", relative_hp]
104 cmd = subprocess.Popen(
105 cmd + list(args),
106 env=env,
107 stdout=subprocess.PIPE,
108 stderr=subprocess.PIPE,
109 cwd=index.repo.working_dir,
110 close_fds=is_posix,
111 creationflags=PROC_CREATIONFLAGS,
112 )
113 except Exception as ex:
114 raise HookExecutionError(hp, ex) from ex
115 else:
116 stdout_list: List[str] = []
117 stderr_list: List[str] = []
118 handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process)
119 stdout = "".join(stdout_list)
120 stderr = "".join(stderr_list)
121 if cmd.returncode != 0:
122 stdout = force_text(stdout, defenc)
123 stderr = force_text(stderr, defenc)
124 raise HookExecutionError(hp, cmd.returncode, stderr, stdout)
125 # end handle return code
128def stat_mode_to_index_mode(mode: int) -> int:
129 """Convert the given mode from a stat call to the corresponding index mode
130 and return it"""
131 if S_ISLNK(mode): # symlinks
132 return S_IFLNK
133 if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules
134 return S_IFGITLINK
135 return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit
138def write_cache(
139 entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]],
140 stream: IO[bytes],
141 extension_data: Union[None, bytes] = None,
142 ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer,
143) -> None:
144 """Write the cache represented by entries to a stream
146 :param entries: **sorted** list of entries
147 :param stream: stream to wrap into the AdapterStreamCls - it is used for
148 final output.
150 :param ShaStreamCls: Type to use when writing to the stream. It produces a sha
151 while writing to it, before the data is passed on to the wrapped stream
153 :param extension_data: any kind of data to write as a trailer, it must begin
154 a 4 byte identifier, followed by its size ( 4 bytes )"""
155 # wrap the stream into a compatible writer
156 stream_sha = ShaStreamCls(stream)
158 tell = stream_sha.tell
159 write = stream_sha.write
161 # header
162 version = 2
163 write(b"DIRC")
164 write(pack(">LL", version, len(entries)))
166 # body
167 for entry in entries:
168 beginoffset = tell()
169 write(entry.ctime_bytes) # ctime
170 write(entry.mtime_bytes) # mtime
171 path_str = str(entry.path)
172 path: bytes = force_bytes(path_str, encoding=defenc)
173 plen = len(path) & CE_NAMEMASK # path length
174 assert plen == len(path), "Path %s too long to fit into index" % entry.path
175 flags = plen | (entry.flags & CE_NAMEMASK_INV) # clear possible previous values
176 write(
177 pack(
178 ">LLLLLL20sH",
179 entry.dev,
180 entry.inode,
181 entry.mode,
182 entry.uid,
183 entry.gid,
184 entry.size,
185 entry.binsha,
186 flags,
187 )
188 )
189 write(path)
190 real_size = (tell() - beginoffset + 8) & ~7
191 write(b"\0" * ((beginoffset + real_size) - tell()))
192 # END for each entry
194 # write previously cached extensions data
195 if extension_data is not None:
196 stream_sha.write(extension_data)
198 # write the sha over the content
199 stream_sha.write_sha()
202def read_header(stream: IO[bytes]) -> Tuple[int, int]:
203 """Return tuple(version_long, num_entries) from the given stream"""
204 type_id = stream.read(4)
205 if type_id != b"DIRC":
206 raise AssertionError("Invalid index file header: %r" % type_id)
207 unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2)))
208 version, num_entries = unpacked
210 # TODO: handle version 3: extended data, see read-cache.c
211 assert version in (1, 2)
212 return version, num_entries
215def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]:
216 """:return: Key suitable to be used for the index.entries dictionary
217 :param entry: One instance of type BaseIndexEntry or the path and the stage"""
219 # def is_entry_key_tup(entry_key: Tuple) -> TypeGuard[Tuple[PathLike, int]]:
220 # return isinstance(entry_key, tuple) and len(entry_key) == 2
222 if len(entry) == 1:
223 entry_first = entry[0]
224 assert isinstance(entry_first, BaseIndexEntry)
225 return (entry_first.path, entry_first.stage)
226 else:
227 # assert is_entry_key_tup(entry)
228 entry = cast(Tuple[PathLike, int], entry)
229 return entry
230 # END handle entry
233def read_cache(
234 stream: IO[bytes],
235) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]:
236 """Read a cache file from the given stream
237 :return: tuple(version, entries_dict, extension_data, content_sha)
238 * version is the integer version number
239 * entries dict is a dictionary which maps IndexEntry instances to a path at a stage
240 * extension_data is '' or 4 bytes of type + 4 bytes of size + size bytes
241 * content_sha is a 20 byte sha on all cache file contents"""
242 version, num_entries = read_header(stream)
243 count = 0
244 entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {}
246 read = stream.read
247 tell = stream.tell
248 while count < num_entries:
249 beginoffset = tell()
250 ctime = unpack(">8s", read(8))[0]
251 mtime = unpack(">8s", read(8))[0]
252 (dev, ino, mode, uid, gid, size, sha, flags) = unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2))
253 path_size = flags & CE_NAMEMASK
254 path = read(path_size).decode(defenc)
256 real_size = (tell() - beginoffset + 8) & ~7
257 read((beginoffset + real_size) - tell())
258 entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size))
259 # entry_key would be the method to use, but we safe the effort
260 entries[(path, entry.stage)] = entry
261 count += 1
262 # END for each entry
264 # the footer contains extension data and a sha on the content so far
265 # Keep the extension footer,and verify we have a sha in the end
266 # Extension data format is:
267 # 4 bytes ID
268 # 4 bytes length of chunk
269 # repeated 0 - N times
270 extension_data = stream.read(~0)
271 assert (
272 len(extension_data) > 19
273 ), "Index Footer was not at least a sha on content as it was only %i bytes in size" % len(extension_data)
275 content_sha = extension_data[-20:]
277 # truncate the sha in the end as we will dynamically create it anyway
278 extension_data = extension_data[:-20]
280 return (version, entries, extension_data, content_sha)
283def write_tree_from_cache(
284 entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0
285) -> Tuple[bytes, List["TreeCacheTup"]]:
286 """Create a tree from the given sorted list of entries and put the respective
287 trees into the given object database
289 :param entries: **sorted** list of IndexEntries
290 :param odb: object database to store the trees in
291 :param si: start index at which we should start creating subtrees
292 :param sl: slice indicating the range we should process on the entries list
293 :return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of
294 tree entries being a tuple of hexsha, mode, name"""
295 tree_items: List["TreeCacheTup"] = []
297 ci = sl.start
298 end = sl.stop
299 while ci < end:
300 entry = entries[ci]
301 if entry.stage != 0:
302 raise UnmergedEntriesError(entry)
303 # END abort on unmerged
304 ci += 1
305 rbound = entry.path.find("/", si)
306 if rbound == -1:
307 # its not a tree
308 tree_items.append((entry.binsha, entry.mode, entry.path[si:]))
309 else:
310 # find common base range
311 base = entry.path[si:rbound]
312 xi = ci
313 while xi < end:
314 oentry = entries[xi]
315 orbound = oentry.path.find("/", si)
316 if orbound == -1 or oentry.path[si:orbound] != base:
317 break
318 # END abort on base mismatch
319 xi += 1
320 # END find common base
322 # enter recursion
323 # ci - 1 as we want to count our current item as well
324 sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1)
325 tree_items.append((sha, S_IFDIR, base))
327 # skip ahead
328 ci = xi
329 # END handle bounds
330 # END for each entry
332 # finally create the tree
333 sio = BytesIO()
334 tree_to_stream(tree_items, sio.write) # writes to stream as bytes, but doesn't change tree_items
335 sio.seek(0)
337 istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
338 return (istream.binsha, tree_items)
341def _tree_entry_to_baseindexentry(tree_entry: "TreeCacheTup", stage: int) -> BaseIndexEntry:
342 return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]))
345def aggressive_tree_merge(odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]:
346 """
347 :return: list of BaseIndexEntries representing the aggressive merge of the given
348 trees. All valid entries are on stage 0, whereas the conflicting ones are left
349 on stage 1, 2 or 3, whereas stage 1 corresponds to the common ancestor tree,
350 2 to our tree and 3 to 'their' tree.
351 :param tree_shas: 1, 2 or 3 trees as identified by their binary 20 byte shas
352 If 1 or two, the entries will effectively correspond to the last given tree
353 If 3 are given, a 3 way merge is performed"""
354 out: List[BaseIndexEntry] = []
356 # one and two way is the same for us, as we don't have to handle an existing
357 # index, instrea
358 if len(tree_shas) in (1, 2):
359 for entry in traverse_tree_recursive(odb, tree_shas[-1], ""):
360 out.append(_tree_entry_to_baseindexentry(entry, 0))
361 # END for each entry
362 return out
363 # END handle single tree
365 if len(tree_shas) > 3:
366 raise ValueError("Cannot handle %i trees at once" % len(tree_shas))
368 # three trees
369 for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""):
370 if base is not None:
371 # base version exists
372 if ours is not None:
373 # ours exists
374 if theirs is not None:
375 # it exists in all branches, if it was changed in both
376 # its a conflict, otherwise we take the changed version
377 # This should be the most common branch, so it comes first
378 if (base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or (
379 base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]
380 ):
381 # changed by both
382 out.append(_tree_entry_to_baseindexentry(base, 1))
383 out.append(_tree_entry_to_baseindexentry(ours, 2))
384 out.append(_tree_entry_to_baseindexentry(theirs, 3))
385 elif base[0] != ours[0] or base[1] != ours[1]:
386 # only we changed it
387 out.append(_tree_entry_to_baseindexentry(ours, 0))
388 else:
389 # either nobody changed it, or they did. In either
390 # case, use theirs
391 out.append(_tree_entry_to_baseindexentry(theirs, 0))
392 # END handle modification
393 else:
395 if ours[0] != base[0] or ours[1] != base[1]:
396 # they deleted it, we changed it, conflict
397 out.append(_tree_entry_to_baseindexentry(base, 1))
398 out.append(_tree_entry_to_baseindexentry(ours, 2))
399 # else:
400 # we didn't change it, ignore
401 # pass
402 # END handle our change
403 # END handle theirs
404 else:
405 if theirs is None:
406 # deleted in both, its fine - its out
407 pass
408 else:
409 if theirs[0] != base[0] or theirs[1] != base[1]:
410 # deleted in ours, changed theirs, conflict
411 out.append(_tree_entry_to_baseindexentry(base, 1))
412 out.append(_tree_entry_to_baseindexentry(theirs, 3))
413 # END theirs changed
414 # else:
415 # theirs didn't change
416 # pass
417 # END handle theirs
418 # END handle ours
419 else:
420 # all three can't be None
421 if ours is None:
422 # added in their branch
423 assert theirs is not None
424 out.append(_tree_entry_to_baseindexentry(theirs, 0))
425 elif theirs is None:
426 # added in our branch
427 out.append(_tree_entry_to_baseindexentry(ours, 0))
428 else:
429 # both have it, except for the base, see whether it changed
430 if ours[0] != theirs[0] or ours[1] != theirs[1]:
431 out.append(_tree_entry_to_baseindexentry(ours, 2))
432 out.append(_tree_entry_to_baseindexentry(theirs, 3))
433 else:
434 # it was added the same in both
435 out.append(_tree_entry_to_baseindexentry(ours, 0))
436 # END handle two items
437 # END handle heads
438 # END handle base exists
439 # END for each entries tuple
441 return out