Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/repo/fun.py: 8%
200 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""Package with general repository related functions"""
2from __future__ import annotations
3import os
4import stat
5from string import digits
7from git.exc import WorkTreeRepositoryUnsupported
8from git.objects import Object
9from git.refs import SymbolicReference
10from git.util import hex_to_bin, bin_to_hex, cygpath
11from gitdb.exc import (
12 BadObject,
13 BadName,
14)
16import os.path as osp
17from git.cmd import Git
19# Typing ----------------------------------------------------------------------
21from typing import Union, Optional, cast, TYPE_CHECKING
22from git.types import Commit_ish
24if TYPE_CHECKING: 24 ↛ 25line 24 didn't jump to line 25, because the condition on line 24 was never true
25 from git.types import PathLike
26 from .base import Repo
27 from git.db import GitCmdObjectDB
28 from git.refs.reference import Reference
29 from git.objects import Commit, TagObject, Blob, Tree
30 from git.refs.tag import Tag
32# ----------------------------------------------------------------------------
34__all__ = (
35 "rev_parse",
36 "is_git_dir",
37 "touch",
38 "find_submodule_git_dir",
39 "name_to_object",
40 "short_to_long",
41 "deref_tag",
42 "to_commit",
43 "find_worktree_git_dir",
44)
47def touch(filename: str) -> str:
48 with open(filename, "ab"):
49 pass
50 return filename
53def is_git_dir(d: "PathLike") -> bool:
54 """This is taken from the git setup.c:is_git_directory
55 function.
57 @throws WorkTreeRepositoryUnsupported if it sees a worktree directory. It's quite hacky to do that here,
58 but at least clearly indicates that we don't support it.
59 There is the unlikely danger to throw if we see directories which just look like a worktree dir,
60 but are none."""
61 if osp.isdir(d):
62 if (osp.isdir(osp.join(d, "objects")) or "GIT_OBJECT_DIRECTORY" in os.environ) and osp.isdir(
63 osp.join(d, "refs")
64 ):
65 headref = osp.join(d, "HEAD")
66 return osp.isfile(headref) or (osp.islink(headref) and os.readlink(headref).startswith("refs"))
67 elif (
68 osp.isfile(osp.join(d, "gitdir"))
69 and osp.isfile(osp.join(d, "commondir"))
70 and osp.isfile(osp.join(d, "gitfile"))
71 ):
72 raise WorkTreeRepositoryUnsupported(d)
73 return False
76def find_worktree_git_dir(dotgit: "PathLike") -> Optional[str]:
77 """Search for a gitdir for this worktree."""
78 try:
79 statbuf = os.stat(dotgit)
80 except OSError:
81 return None
82 if not stat.S_ISREG(statbuf.st_mode):
83 return None
85 try:
86 lines = open(dotgit, "r").readlines()
87 for key, value in [line.strip().split(": ") for line in lines]:
88 if key == "gitdir":
89 return value
90 except ValueError:
91 pass
92 return None
95def find_submodule_git_dir(d: "PathLike") -> Optional["PathLike"]:
96 """Search for a submodule repo."""
97 if is_git_dir(d):
98 return d
100 try:
101 with open(d) as fp:
102 content = fp.read().rstrip()
103 except IOError:
104 # it's probably not a file
105 pass
106 else:
107 if content.startswith("gitdir: "):
108 path = content[8:]
110 if Git.is_cygwin():
111 ## Cygwin creates submodules prefixed with `/cygdrive/...` suffixes.
112 # Cygwin git understands Cygwin paths much better than Windows ones
113 # Also the Cygwin tests are assuming Cygwin paths.
114 path = cygpath(path)
115 if not osp.isabs(path):
116 path = osp.normpath(osp.join(osp.dirname(d), path))
117 return find_submodule_git_dir(path)
118 # end handle exception
119 return None
122def short_to_long(odb: "GitCmdObjectDB", hexsha: str) -> Optional[bytes]:
123 """:return: long hexadecimal sha1 from the given less-than-40 byte hexsha
124 or None if no candidate could be found.
125 :param hexsha: hexsha with less than 40 byte"""
126 try:
127 return bin_to_hex(odb.partial_to_complete_sha_hex(hexsha))
128 except BadObject:
129 return None
130 # END exception handling
133def name_to_object(
134 repo: "Repo", name: str, return_ref: bool = False
135) -> Union[SymbolicReference, "Commit", "TagObject", "Blob", "Tree"]:
136 """
137 :return: object specified by the given name, hexshas ( short and long )
138 as well as references are supported
139 :param return_ref: if name specifies a reference, we will return the reference
140 instead of the object. Otherwise it will raise BadObject or BadName
141 """
142 hexsha: Union[None, str, bytes] = None
144 # is it a hexsha ? Try the most common ones, which is 7 to 40
145 if repo.re_hexsha_shortened.match(name):
146 if len(name) != 40:
147 # find long sha for short sha
148 hexsha = short_to_long(repo.odb, name)
149 else:
150 hexsha = name
151 # END handle short shas
152 # END find sha if it matches
154 # if we couldn't find an object for what seemed to be a short hexsha
155 # try to find it as reference anyway, it could be named 'aaa' for instance
156 if hexsha is None:
157 for base in (
158 "%s",
159 "refs/%s",
160 "refs/tags/%s",
161 "refs/heads/%s",
162 "refs/remotes/%s",
163 "refs/remotes/%s/HEAD",
164 ):
165 try:
166 hexsha = SymbolicReference.dereference_recursive(repo, base % name)
167 if return_ref:
168 return SymbolicReference(repo, base % name)
169 # END handle symbolic ref
170 break
171 except ValueError:
172 pass
173 # END for each base
174 # END handle hexsha
176 # didn't find any ref, this is an error
177 if return_ref:
178 raise BadObject("Couldn't find reference named %r" % name)
179 # END handle return ref
181 # tried everything ? fail
182 if hexsha is None:
183 raise BadName(name)
184 # END assert hexsha was found
186 return Object.new_from_sha(repo, hex_to_bin(hexsha))
189def deref_tag(tag: "Tag") -> "TagObject":
190 """Recursively dereference a tag and return the resulting object"""
191 while True:
192 try:
193 tag = tag.object
194 except AttributeError:
195 break
196 # END dereference tag
197 return tag
200def to_commit(obj: Object) -> Union["Commit", "TagObject"]:
201 """Convert the given object to a commit if possible and return it"""
202 if obj.type == "tag":
203 obj = deref_tag(obj)
205 if obj.type != "commit":
206 raise ValueError("Cannot convert object %r to type commit" % obj)
207 # END verify type
208 return obj
211def rev_parse(repo: "Repo", rev: str) -> Union["Commit", "Tag", "Tree", "Blob"]:
212 """
213 :return: Object at the given revision, either Commit, Tag, Tree or Blob
214 :param rev: git-rev-parse compatible revision specification as string, please see
215 http://www.kernel.org/pub/software/scm/git/docs/git-rev-parse.html
216 for details
217 :raise BadObject: if the given revision could not be found
218 :raise ValueError: If rev couldn't be parsed
219 :raise IndexError: If invalid reflog index is specified"""
221 # colon search mode ?
222 if rev.startswith(":/"):
223 # colon search mode
224 raise NotImplementedError("commit by message search ( regex )")
225 # END handle search
227 obj: Union[Commit_ish, "Reference", None] = None
228 ref = None
229 output_type = "commit"
230 start = 0
231 parsed_to = 0
232 lr = len(rev)
233 while start < lr:
234 if rev[start] not in "^~:@":
235 start += 1
236 continue
237 # END handle start
239 token = rev[start]
241 if obj is None:
242 # token is a rev name
243 if start == 0:
244 ref = repo.head.ref
245 else:
246 if token == "@":
247 ref = cast("Reference", name_to_object(repo, rev[:start], return_ref=True))
248 else:
249 obj = cast(Commit_ish, name_to_object(repo, rev[:start]))
250 # END handle token
251 # END handle refname
252 else:
253 assert obj is not None
255 if ref is not None:
256 obj = cast("Commit", ref.commit)
257 # END handle ref
258 # END initialize obj on first token
260 start += 1
262 # try to parse {type}
263 if start < lr and rev[start] == "{":
264 end = rev.find("}", start)
265 if end == -1:
266 raise ValueError("Missing closing brace to define type in %s" % rev)
267 output_type = rev[start + 1 : end] # exclude brace
269 # handle type
270 if output_type == "commit":
271 pass # default
272 elif output_type == "tree":
273 try:
274 obj = cast(Commit_ish, obj)
275 obj = to_commit(obj).tree
276 except (AttributeError, ValueError):
277 pass # error raised later
278 # END exception handling
279 elif output_type in ("", "blob"):
280 obj = cast("TagObject", obj)
281 if obj and obj.type == "tag":
282 obj = deref_tag(obj)
283 else:
284 # cannot do anything for non-tags
285 pass
286 # END handle tag
287 elif token == "@":
288 # try single int
289 assert ref is not None, "Require Reference to access reflog"
290 revlog_index = None
291 try:
292 # transform reversed index into the format of our revlog
293 revlog_index = -(int(output_type) + 1)
294 except ValueError as e:
295 # TODO: Try to parse the other date options, using parse_date
296 # maybe
297 raise NotImplementedError("Support for additional @{...} modes not implemented") from e
298 # END handle revlog index
300 try:
301 entry = ref.log_entry(revlog_index)
302 except IndexError as e:
303 raise IndexError("Invalid revlog index: %i" % revlog_index) from e
304 # END handle index out of bound
306 obj = Object.new_from_sha(repo, hex_to_bin(entry.newhexsha))
308 # make it pass the following checks
309 output_type = ""
310 else:
311 raise ValueError("Invalid output type: %s ( in %s )" % (output_type, rev))
312 # END handle output type
314 # empty output types don't require any specific type, its just about dereferencing tags
315 if output_type and obj and obj.type != output_type:
316 raise ValueError("Could not accommodate requested object type %r, got %s" % (output_type, obj.type))
317 # END verify output type
319 start = end + 1 # skip brace
320 parsed_to = start
321 continue
322 # END parse type
324 # try to parse a number
325 num = 0
326 if token != ":":
327 found_digit = False
328 while start < lr:
329 if rev[start] in digits:
330 num = num * 10 + int(rev[start])
331 start += 1
332 found_digit = True
333 else:
334 break
335 # END handle number
336 # END number parse loop
338 # no explicit number given, 1 is the default
339 # It could be 0 though
340 if not found_digit:
341 num = 1
342 # END set default num
343 # END number parsing only if non-blob mode
345 parsed_to = start
346 # handle hierarchy walk
347 try:
348 obj = cast(Commit_ish, obj)
349 if token == "~":
350 obj = to_commit(obj)
351 for _ in range(num):
352 obj = obj.parents[0]
353 # END for each history item to walk
354 elif token == "^":
355 obj = to_commit(obj)
356 # must be n'th parent
357 if num:
358 obj = obj.parents[num - 1]
359 elif token == ":":
360 if obj.type != "tree":
361 obj = obj.tree
362 # END get tree type
363 obj = obj[rev[start:]]
364 parsed_to = lr
365 else:
366 raise ValueError("Invalid token: %r" % token)
367 # END end handle tag
368 except (IndexError, AttributeError) as e:
369 raise BadName(
370 f"Invalid revision spec '{rev}' - not enough " f"parent commits to reach '{token}{int(num)}'"
371 ) from e
372 # END exception handling
373 # END parse loop
375 # still no obj ? Its probably a simple name
376 if obj is None:
377 obj = cast(Commit_ish, name_to_object(repo, rev))
378 parsed_to = lr
379 # END handle simple name
381 if obj is None:
382 raise ValueError("Revision specifier could not be parsed: %s" % rev)
384 if parsed_to != lr:
385 raise ValueError("Didn't consume complete rev spec %s, consumed part: %s" % (rev, rev[:parsed_to]))
387 return obj