Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/refs/log.py: 29%

149 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from mmap import mmap 

2import re 

3import time as _time 

4 

5from git.compat import defenc 

6from git.objects.util import ( 

7 parse_date, 

8 Serializable, 

9 altz_to_utctz_str, 

10) 

11from git.util import ( 

12 Actor, 

13 LockedFD, 

14 LockFile, 

15 assure_directory_exists, 

16 to_native_path, 

17 bin_to_hex, 

18 file_contents_ro_filepath, 

19) 

20 

21import os.path as osp 

22 

23 

24# typing ------------------------------------------------------------------ 

25 

26from typing import Iterator, List, Tuple, Union, TYPE_CHECKING 

27 

28from git.types import PathLike 

29 

30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31, because the condition on line 30 was never true

31 from git.refs import SymbolicReference 

32 from io import BytesIO 

33 from git.config import GitConfigParser, SectionConstraint # NOQA 

34 

35# ------------------------------------------------------------------------------ 

36 

37__all__ = ["RefLog", "RefLogEntry"] 

38 

39 

40class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]): 

41 

42 """Named tuple allowing easy access to the revlog data fields""" 

43 

44 _re_hexsha_only = re.compile("^[0-9A-Fa-f]{40}$") 

45 __slots__ = () 

46 

47 def __repr__(self) -> str: 

48 """Representation of ourselves in git reflog format""" 

49 return self.format() 

50 

51 def format(self) -> str: 

52 """:return: a string suitable to be placed in a reflog file""" 

53 act = self.actor 

54 time = self.time 

55 return "{} {} {} <{}> {!s} {}\t{}\n".format( 

56 self.oldhexsha, 

57 self.newhexsha, 

58 act.name, 

59 act.email, 

60 time[0], 

61 altz_to_utctz_str(time[1]), 

62 self.message, 

63 ) 

64 

65 @property 

66 def oldhexsha(self) -> str: 

67 """The hexsha to the commit the ref pointed to before the change""" 

68 return self[0] 

69 

70 @property 

71 def newhexsha(self) -> str: 

72 """The hexsha to the commit the ref now points to, after the change""" 

73 return self[1] 

74 

75 @property 

76 def actor(self) -> Actor: 

77 """Actor instance, providing access""" 

78 return self[2] 

79 

80 @property 

81 def time(self) -> Tuple[int, int]: 

82 """time as tuple: 

83 

84 * [0] = int(time) 

85 * [1] = int(timezone_offset) in time.altzone format""" 

86 return self[3] 

87 

88 @property 

89 def message(self) -> str: 

90 """Message describing the operation that acted on the reference""" 

91 return self[4] 

92 

93 @classmethod 

94 def new( 

95 cls, 

96 oldhexsha: str, 

97 newhexsha: str, 

98 actor: Actor, 

99 time: int, 

100 tz_offset: int, 

101 message: str, 

102 ) -> "RefLogEntry": # skipcq: PYL-W0621 

103 """:return: New instance of a RefLogEntry""" 

104 if not isinstance(actor, Actor): 

105 raise ValueError("Need actor instance, got %s" % actor) 

106 # END check types 

107 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), message)) 

108 

109 @classmethod 

110 def from_line(cls, line: bytes) -> "RefLogEntry": 

111 """:return: New RefLogEntry instance from the given revlog line. 

112 :param line: line bytes without trailing newline 

113 :raise ValueError: If line could not be parsed""" 

114 line_str = line.decode(defenc) 

115 fields = line_str.split("\t", 1) 

116 if len(fields) == 1: 

117 info, msg = fields[0], None 

118 elif len(fields) == 2: 

119 info, msg = fields 

120 else: 

121 raise ValueError("Line must have up to two TAB-separated fields." " Got %s" % repr(line_str)) 

122 # END handle first split 

123 

124 oldhexsha = info[:40] 

125 newhexsha = info[41:81] 

126 for hexsha in (oldhexsha, newhexsha): 

127 if not cls._re_hexsha_only.match(hexsha): 

128 raise ValueError("Invalid hexsha: %r" % (hexsha,)) 

129 # END if hexsha re doesn't match 

130 # END for each hexsha 

131 

132 email_end = info.find(">", 82) 

133 if email_end == -1: 

134 raise ValueError("Missing token: >") 

135 # END handle missing end brace 

136 

137 actor = Actor._from_string(info[82 : email_end + 1]) 

138 time, tz_offset = parse_date(info[email_end + 2 :]) # skipcq: PYL-W0621 

139 

140 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), msg)) 

141 

142 

143class RefLog(List[RefLogEntry], Serializable): 

144 

145 """A reflog contains RefLogEntrys, each of which defines a certain state 

146 of the head in question. Custom query methods allow to retrieve log entries 

147 by date or by other criteria. 

148 

149 Reflog entries are ordered, the first added entry is first in the list, the last 

150 entry, i.e. the last change of the head or reference, is last in the list.""" 

151 

152 __slots__ = ("_path",) 

153 

154 def __new__(cls, filepath: Union[PathLike, None] = None) -> "RefLog": 

155 inst = super(RefLog, cls).__new__(cls) 

156 return inst 

157 

158 def __init__(self, filepath: Union[PathLike, None] = None): 

159 """Initialize this instance with an optional filepath, from which we will 

160 initialize our data. The path is also used to write changes back using 

161 the write() method""" 

162 self._path = filepath 

163 if filepath is not None: 

164 self._read_from_file() 

165 # END handle filepath 

166 

167 def _read_from_file(self) -> None: 

168 try: 

169 fmap = file_contents_ro_filepath(self._path, stream=True, allow_mmap=True) 

170 except OSError: 

171 # it is possible and allowed that the file doesn't exist ! 

172 return 

173 # END handle invalid log 

174 

175 try: 

176 self._deserialize(fmap) 

177 finally: 

178 fmap.close() 

179 # END handle closing of handle 

180 

181 # { Interface 

182 

183 @classmethod 

184 def from_file(cls, filepath: PathLike) -> "RefLog": 

185 """ 

186 :return: a new RefLog instance containing all entries from the reflog 

187 at the given filepath 

188 :param filepath: path to reflog 

189 :raise ValueError: If the file could not be read or was corrupted in some way""" 

190 return cls(filepath) 

191 

192 @classmethod 

193 def path(cls, ref: "SymbolicReference") -> str: 

194 """ 

195 :return: string to absolute path at which the reflog of the given ref 

196 instance would be found. The path is not guaranteed to point to a valid 

197 file though. 

198 :param ref: SymbolicReference instance""" 

199 return osp.join(ref.repo.git_dir, "logs", to_native_path(ref.path)) 

200 

201 @classmethod 

202 def iter_entries(cls, stream: Union[str, "BytesIO", mmap]) -> Iterator[RefLogEntry]: 

203 """ 

204 :return: Iterator yielding RefLogEntry instances, one for each line read 

205 sfrom the given stream. 

206 :param stream: file-like object containing the revlog in its native format 

207 or string instance pointing to a file to read""" 

208 new_entry = RefLogEntry.from_line 

209 if isinstance(stream, str): 

210 # default args return mmap on py>3 

211 _stream = file_contents_ro_filepath(stream) 

212 assert isinstance(_stream, mmap) 

213 else: 

214 _stream = stream 

215 # END handle stream type 

216 while True: 

217 line = _stream.readline() 

218 if not line: 

219 return 

220 yield new_entry(line.strip()) 

221 # END endless loop 

222 

223 @classmethod 

224 def entry_at(cls, filepath: PathLike, index: int) -> "RefLogEntry": 

225 """ 

226 :return: RefLogEntry at the given index 

227 

228 :param filepath: full path to the index file from which to read the entry 

229 

230 :param index: python list compatible index, i.e. it may be negative to 

231 specify an entry counted from the end of the list 

232 

233 :raise IndexError: If the entry didn't exist 

234 

235 .. note:: This method is faster as it only parses the entry at index, skipping 

236 all other lines. Nonetheless, the whole file has to be read if 

237 the index is negative 

238 """ 

239 with open(filepath, "rb") as fp: 

240 if index < 0: 

241 return RefLogEntry.from_line(fp.readlines()[index].strip()) 

242 # read until index is reached 

243 

244 for i in range(index + 1): 

245 line = fp.readline() 

246 if not line: 

247 raise IndexError(f"Index file ended at line {i+1}, before given index was reached") 

248 # END abort on eof 

249 # END handle runup 

250 

251 return RefLogEntry.from_line(line.strip()) 

252 # END handle index 

253 

254 def to_file(self, filepath: PathLike) -> None: 

255 """Write the contents of the reflog instance to a file at the given filepath. 

256 :param filepath: path to file, parent directories are assumed to exist""" 

257 lfd = LockedFD(filepath) 

258 assure_directory_exists(filepath, is_file=True) 

259 

260 fp = lfd.open(write=True, stream=True) 

261 try: 

262 self._serialize(fp) 

263 lfd.commit() 

264 except Exception: 

265 # on failure it rolls back automatically, but we make it clear 

266 lfd.rollback() 

267 raise 

268 # END handle change 

269 

270 @classmethod 

271 def append_entry( 

272 cls, 

273 config_reader: Union[Actor, "GitConfigParser", "SectionConstraint", None], 

274 filepath: PathLike, 

275 oldbinsha: bytes, 

276 newbinsha: bytes, 

277 message: str, 

278 write: bool = True, 

279 ) -> "RefLogEntry": 

280 """Append a new log entry to the revlog at filepath. 

281 

282 :param config_reader: configuration reader of the repository - used to obtain 

283 user information. May also be an Actor instance identifying the committer directly or None. 

284 :param filepath: full path to the log file 

285 :param oldbinsha: binary sha of the previous commit 

286 :param newbinsha: binary sha of the current commit 

287 :param message: message describing the change to the reference 

288 :param write: If True, the changes will be written right away. Otherwise 

289 the change will not be written 

290 

291 :return: RefLogEntry objects which was appended to the log 

292 

293 :note: As we are append-only, concurrent access is not a problem as we 

294 do not interfere with readers.""" 

295 

296 if len(oldbinsha) != 20 or len(newbinsha) != 20: 

297 raise ValueError("Shas need to be given in binary format") 

298 # END handle sha type 

299 assure_directory_exists(filepath, is_file=True) 

300 first_line = message.split("\n")[0] 

301 if isinstance(config_reader, Actor): 

302 committer = config_reader # mypy thinks this is Actor | Gitconfigparser, but why? 

303 else: 

304 committer = Actor.committer(config_reader) 

305 entry = RefLogEntry( 

306 ( 

307 bin_to_hex(oldbinsha).decode("ascii"), 

308 bin_to_hex(newbinsha).decode("ascii"), 

309 committer, 

310 (int(_time.time()), _time.altzone), 

311 first_line, 

312 ) 

313 ) 

314 

315 if write: 

316 lf = LockFile(filepath) 

317 lf._obtain_lock_or_raise() 

318 fd = open(filepath, "ab") 

319 try: 

320 fd.write(entry.format().encode(defenc)) 

321 finally: 

322 fd.close() 

323 lf._release_lock() 

324 # END handle write operation 

325 return entry 

326 

327 def write(self) -> "RefLog": 

328 """Write this instance's data to the file we are originating from 

329 :return: self""" 

330 if self._path is None: 

331 raise ValueError("Instance was not initialized with a path, use to_file(...) instead") 

332 # END assert path 

333 self.to_file(self._path) 

334 return self 

335 

336 # } END interface 

337 

338 # { Serializable Interface 

339 def _serialize(self, stream: "BytesIO") -> "RefLog": 

340 write = stream.write 

341 

342 # write all entries 

343 for e in self: 

344 write(e.format().encode(defenc)) 

345 # END for each entry 

346 return self 

347 

348 def _deserialize(self, stream: "BytesIO") -> "RefLog": 

349 self.extend(self.iter_entries(stream)) 

350 # } END serializable interface 

351 return self