Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/repo/fun.py: 8%

200 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1"""Package with general repository related functions""" 

2from __future__ import annotations 

3import os 

4import stat 

5from string import digits 

6 

7from git.exc import WorkTreeRepositoryUnsupported 

8from git.objects import Object 

9from git.refs import SymbolicReference 

10from git.util import hex_to_bin, bin_to_hex, cygpath 

11from gitdb.exc import ( 

12 BadObject, 

13 BadName, 

14) 

15 

16import os.path as osp 

17from git.cmd import Git 

18 

19# Typing ---------------------------------------------------------------------- 

20 

21from typing import Union, Optional, cast, TYPE_CHECKING 

22from git.types import Commit_ish 

23 

24if TYPE_CHECKING: 24 ↛ 25line 24 didn't jump to line 25, because the condition on line 24 was never true

25 from git.types import PathLike 

26 from .base import Repo 

27 from git.db import GitCmdObjectDB 

28 from git.refs.reference import Reference 

29 from git.objects import Commit, TagObject, Blob, Tree 

30 from git.refs.tag import Tag 

31 

32# ---------------------------------------------------------------------------- 

33 

34__all__ = ( 

35 "rev_parse", 

36 "is_git_dir", 

37 "touch", 

38 "find_submodule_git_dir", 

39 "name_to_object", 

40 "short_to_long", 

41 "deref_tag", 

42 "to_commit", 

43 "find_worktree_git_dir", 

44) 

45 

46 

47def touch(filename: str) -> str: 

48 with open(filename, "ab"): 

49 pass 

50 return filename 

51 

52 

53def is_git_dir(d: "PathLike") -> bool: 

54 """This is taken from the git setup.c:is_git_directory 

55 function. 

56 

57 @throws WorkTreeRepositoryUnsupported if it sees a worktree directory. It's quite hacky to do that here, 

58 but at least clearly indicates that we don't support it. 

59 There is the unlikely danger to throw if we see directories which just look like a worktree dir, 

60 but are none.""" 

61 if osp.isdir(d): 

62 if (osp.isdir(osp.join(d, "objects")) or "GIT_OBJECT_DIRECTORY" in os.environ) and osp.isdir( 

63 osp.join(d, "refs") 

64 ): 

65 headref = osp.join(d, "HEAD") 

66 return osp.isfile(headref) or (osp.islink(headref) and os.readlink(headref).startswith("refs")) 

67 elif ( 

68 osp.isfile(osp.join(d, "gitdir")) 

69 and osp.isfile(osp.join(d, "commondir")) 

70 and osp.isfile(osp.join(d, "gitfile")) 

71 ): 

72 raise WorkTreeRepositoryUnsupported(d) 

73 return False 

74 

75 

76def find_worktree_git_dir(dotgit: "PathLike") -> Optional[str]: 

77 """Search for a gitdir for this worktree.""" 

78 try: 

79 statbuf = os.stat(dotgit) 

80 except OSError: 

81 return None 

82 if not stat.S_ISREG(statbuf.st_mode): 

83 return None 

84 

85 try: 

86 lines = open(dotgit, "r").readlines() 

87 for key, value in [line.strip().split(": ") for line in lines]: 

88 if key == "gitdir": 

89 return value 

90 except ValueError: 

91 pass 

92 return None 

93 

94 

95def find_submodule_git_dir(d: "PathLike") -> Optional["PathLike"]: 

96 """Search for a submodule repo.""" 

97 if is_git_dir(d): 

98 return d 

99 

100 try: 

101 with open(d) as fp: 

102 content = fp.read().rstrip() 

103 except IOError: 

104 # it's probably not a file 

105 pass 

106 else: 

107 if content.startswith("gitdir: "): 

108 path = content[8:] 

109 

110 if Git.is_cygwin(): 

111 ## Cygwin creates submodules prefixed with `/cygdrive/...` suffixes. 

112 # Cygwin git understands Cygwin paths much better than Windows ones 

113 # Also the Cygwin tests are assuming Cygwin paths. 

114 path = cygpath(path) 

115 if not osp.isabs(path): 

116 path = osp.normpath(osp.join(osp.dirname(d), path)) 

117 return find_submodule_git_dir(path) 

118 # end handle exception 

119 return None 

120 

121 

122def short_to_long(odb: "GitCmdObjectDB", hexsha: str) -> Optional[bytes]: 

123 """:return: long hexadecimal sha1 from the given less-than-40 byte hexsha 

124 or None if no candidate could be found. 

125 :param hexsha: hexsha with less than 40 byte""" 

126 try: 

127 return bin_to_hex(odb.partial_to_complete_sha_hex(hexsha)) 

128 except BadObject: 

129 return None 

130 # END exception handling 

131 

132 

133def name_to_object( 

134 repo: "Repo", name: str, return_ref: bool = False 

135) -> Union[SymbolicReference, "Commit", "TagObject", "Blob", "Tree"]: 

136 """ 

137 :return: object specified by the given name, hexshas ( short and long ) 

138 as well as references are supported 

139 :param return_ref: if name specifies a reference, we will return the reference 

140 instead of the object. Otherwise it will raise BadObject or BadName 

141 """ 

142 hexsha: Union[None, str, bytes] = None 

143 

144 # is it a hexsha ? Try the most common ones, which is 7 to 40 

145 if repo.re_hexsha_shortened.match(name): 

146 if len(name) != 40: 

147 # find long sha for short sha 

148 hexsha = short_to_long(repo.odb, name) 

149 else: 

150 hexsha = name 

151 # END handle short shas 

152 # END find sha if it matches 

153 

154 # if we couldn't find an object for what seemed to be a short hexsha 

155 # try to find it as reference anyway, it could be named 'aaa' for instance 

156 if hexsha is None: 

157 for base in ( 

158 "%s", 

159 "refs/%s", 

160 "refs/tags/%s", 

161 "refs/heads/%s", 

162 "refs/remotes/%s", 

163 "refs/remotes/%s/HEAD", 

164 ): 

165 try: 

166 hexsha = SymbolicReference.dereference_recursive(repo, base % name) 

167 if return_ref: 

168 return SymbolicReference(repo, base % name) 

169 # END handle symbolic ref 

170 break 

171 except ValueError: 

172 pass 

173 # END for each base 

174 # END handle hexsha 

175 

176 # didn't find any ref, this is an error 

177 if return_ref: 

178 raise BadObject("Couldn't find reference named %r" % name) 

179 # END handle return ref 

180 

181 # tried everything ? fail 

182 if hexsha is None: 

183 raise BadName(name) 

184 # END assert hexsha was found 

185 

186 return Object.new_from_sha(repo, hex_to_bin(hexsha)) 

187 

188 

189def deref_tag(tag: "Tag") -> "TagObject": 

190 """Recursively dereference a tag and return the resulting object""" 

191 while True: 

192 try: 

193 tag = tag.object 

194 except AttributeError: 

195 break 

196 # END dereference tag 

197 return tag 

198 

199 

200def to_commit(obj: Object) -> Union["Commit", "TagObject"]: 

201 """Convert the given object to a commit if possible and return it""" 

202 if obj.type == "tag": 

203 obj = deref_tag(obj) 

204 

205 if obj.type != "commit": 

206 raise ValueError("Cannot convert object %r to type commit" % obj) 

207 # END verify type 

208 return obj 

209 

210 

211def rev_parse(repo: "Repo", rev: str) -> Union["Commit", "Tag", "Tree", "Blob"]: 

212 """ 

213 :return: Object at the given revision, either Commit, Tag, Tree or Blob 

214 :param rev: git-rev-parse compatible revision specification as string, please see 

215 http://www.kernel.org/pub/software/scm/git/docs/git-rev-parse.html 

216 for details 

217 :raise BadObject: if the given revision could not be found 

218 :raise ValueError: If rev couldn't be parsed 

219 :raise IndexError: If invalid reflog index is specified""" 

220 

221 # colon search mode ? 

222 if rev.startswith(":/"): 

223 # colon search mode 

224 raise NotImplementedError("commit by message search ( regex )") 

225 # END handle search 

226 

227 obj: Union[Commit_ish, "Reference", None] = None 

228 ref = None 

229 output_type = "commit" 

230 start = 0 

231 parsed_to = 0 

232 lr = len(rev) 

233 while start < lr: 

234 if rev[start] not in "^~:@": 

235 start += 1 

236 continue 

237 # END handle start 

238 

239 token = rev[start] 

240 

241 if obj is None: 

242 # token is a rev name 

243 if start == 0: 

244 ref = repo.head.ref 

245 else: 

246 if token == "@": 

247 ref = cast("Reference", name_to_object(repo, rev[:start], return_ref=True)) 

248 else: 

249 obj = cast(Commit_ish, name_to_object(repo, rev[:start])) 

250 # END handle token 

251 # END handle refname 

252 else: 

253 assert obj is not None 

254 

255 if ref is not None: 

256 obj = cast("Commit", ref.commit) 

257 # END handle ref 

258 # END initialize obj on first token 

259 

260 start += 1 

261 

262 # try to parse {type} 

263 if start < lr and rev[start] == "{": 

264 end = rev.find("}", start) 

265 if end == -1: 

266 raise ValueError("Missing closing brace to define type in %s" % rev) 

267 output_type = rev[start + 1 : end] # exclude brace 

268 

269 # handle type 

270 if output_type == "commit": 

271 pass # default 

272 elif output_type == "tree": 

273 try: 

274 obj = cast(Commit_ish, obj) 

275 obj = to_commit(obj).tree 

276 except (AttributeError, ValueError): 

277 pass # error raised later 

278 # END exception handling 

279 elif output_type in ("", "blob"): 

280 obj = cast("TagObject", obj) 

281 if obj and obj.type == "tag": 

282 obj = deref_tag(obj) 

283 else: 

284 # cannot do anything for non-tags 

285 pass 

286 # END handle tag 

287 elif token == "@": 

288 # try single int 

289 assert ref is not None, "Require Reference to access reflog" 

290 revlog_index = None 

291 try: 

292 # transform reversed index into the format of our revlog 

293 revlog_index = -(int(output_type) + 1) 

294 except ValueError as e: 

295 # TODO: Try to parse the other date options, using parse_date 

296 # maybe 

297 raise NotImplementedError("Support for additional @{...} modes not implemented") from e 

298 # END handle revlog index 

299 

300 try: 

301 entry = ref.log_entry(revlog_index) 

302 except IndexError as e: 

303 raise IndexError("Invalid revlog index: %i" % revlog_index) from e 

304 # END handle index out of bound 

305 

306 obj = Object.new_from_sha(repo, hex_to_bin(entry.newhexsha)) 

307 

308 # make it pass the following checks 

309 output_type = "" 

310 else: 

311 raise ValueError("Invalid output type: %s ( in %s )" % (output_type, rev)) 

312 # END handle output type 

313 

314 # empty output types don't require any specific type, its just about dereferencing tags 

315 if output_type and obj and obj.type != output_type: 

316 raise ValueError("Could not accommodate requested object type %r, got %s" % (output_type, obj.type)) 

317 # END verify output type 

318 

319 start = end + 1 # skip brace 

320 parsed_to = start 

321 continue 

322 # END parse type 

323 

324 # try to parse a number 

325 num = 0 

326 if token != ":": 

327 found_digit = False 

328 while start < lr: 

329 if rev[start] in digits: 

330 num = num * 10 + int(rev[start]) 

331 start += 1 

332 found_digit = True 

333 else: 

334 break 

335 # END handle number 

336 # END number parse loop 

337 

338 # no explicit number given, 1 is the default 

339 # It could be 0 though 

340 if not found_digit: 

341 num = 1 

342 # END set default num 

343 # END number parsing only if non-blob mode 

344 

345 parsed_to = start 

346 # handle hierarchy walk 

347 try: 

348 obj = cast(Commit_ish, obj) 

349 if token == "~": 

350 obj = to_commit(obj) 

351 for _ in range(num): 

352 obj = obj.parents[0] 

353 # END for each history item to walk 

354 elif token == "^": 

355 obj = to_commit(obj) 

356 # must be n'th parent 

357 if num: 

358 obj = obj.parents[num - 1] 

359 elif token == ":": 

360 if obj.type != "tree": 

361 obj = obj.tree 

362 # END get tree type 

363 obj = obj[rev[start:]] 

364 parsed_to = lr 

365 else: 

366 raise ValueError("Invalid token: %r" % token) 

367 # END end handle tag 

368 except (IndexError, AttributeError) as e: 

369 raise BadName( 

370 f"Invalid revision spec '{rev}' - not enough " f"parent commits to reach '{token}{int(num)}'" 

371 ) from e 

372 # END exception handling 

373 # END parse loop 

374 

375 # still no obj ? Its probably a simple name 

376 if obj is None: 

377 obj = cast(Commit_ish, name_to_object(repo, rev)) 

378 parsed_to = lr 

379 # END handle simple name 

380 

381 if obj is None: 

382 raise ValueError("Revision specifier could not be parsed: %s" % rev) 

383 

384 if parsed_to != lr: 

385 raise ValueError("Didn't consume complete rev spec %s, consumed part: %s" % (rev, rev[:parsed_to])) 

386 

387 return obj