Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/refs/log.py: 29%
149 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from mmap import mmap
2import re
3import time as _time
5from git.compat import defenc
6from git.objects.util import (
7 parse_date,
8 Serializable,
9 altz_to_utctz_str,
10)
11from git.util import (
12 Actor,
13 LockedFD,
14 LockFile,
15 assure_directory_exists,
16 to_native_path,
17 bin_to_hex,
18 file_contents_ro_filepath,
19)
21import os.path as osp
24# typing ------------------------------------------------------------------
26from typing import Iterator, List, Tuple, Union, TYPE_CHECKING
28from git.types import PathLike
30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31, because the condition on line 30 was never true
31 from git.refs import SymbolicReference
32 from io import BytesIO
33 from git.config import GitConfigParser, SectionConstraint # NOQA
35# ------------------------------------------------------------------------------
37__all__ = ["RefLog", "RefLogEntry"]
40class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):
42 """Named tuple allowing easy access to the revlog data fields"""
44 _re_hexsha_only = re.compile("^[0-9A-Fa-f]{40}$")
45 __slots__ = ()
47 def __repr__(self) -> str:
48 """Representation of ourselves in git reflog format"""
49 return self.format()
51 def format(self) -> str:
52 """:return: a string suitable to be placed in a reflog file"""
53 act = self.actor
54 time = self.time
55 return "{} {} {} <{}> {!s} {}\t{}\n".format(
56 self.oldhexsha,
57 self.newhexsha,
58 act.name,
59 act.email,
60 time[0],
61 altz_to_utctz_str(time[1]),
62 self.message,
63 )
65 @property
66 def oldhexsha(self) -> str:
67 """The hexsha to the commit the ref pointed to before the change"""
68 return self[0]
70 @property
71 def newhexsha(self) -> str:
72 """The hexsha to the commit the ref now points to, after the change"""
73 return self[1]
75 @property
76 def actor(self) -> Actor:
77 """Actor instance, providing access"""
78 return self[2]
80 @property
81 def time(self) -> Tuple[int, int]:
82 """time as tuple:
84 * [0] = int(time)
85 * [1] = int(timezone_offset) in time.altzone format"""
86 return self[3]
88 @property
89 def message(self) -> str:
90 """Message describing the operation that acted on the reference"""
91 return self[4]
93 @classmethod
94 def new(
95 cls,
96 oldhexsha: str,
97 newhexsha: str,
98 actor: Actor,
99 time: int,
100 tz_offset: int,
101 message: str,
102 ) -> "RefLogEntry": # skipcq: PYL-W0621
103 """:return: New instance of a RefLogEntry"""
104 if not isinstance(actor, Actor):
105 raise ValueError("Need actor instance, got %s" % actor)
106 # END check types
107 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), message))
109 @classmethod
110 def from_line(cls, line: bytes) -> "RefLogEntry":
111 """:return: New RefLogEntry instance from the given revlog line.
112 :param line: line bytes without trailing newline
113 :raise ValueError: If line could not be parsed"""
114 line_str = line.decode(defenc)
115 fields = line_str.split("\t", 1)
116 if len(fields) == 1:
117 info, msg = fields[0], None
118 elif len(fields) == 2:
119 info, msg = fields
120 else:
121 raise ValueError("Line must have up to two TAB-separated fields." " Got %s" % repr(line_str))
122 # END handle first split
124 oldhexsha = info[:40]
125 newhexsha = info[41:81]
126 for hexsha in (oldhexsha, newhexsha):
127 if not cls._re_hexsha_only.match(hexsha):
128 raise ValueError("Invalid hexsha: %r" % (hexsha,))
129 # END if hexsha re doesn't match
130 # END for each hexsha
132 email_end = info.find(">", 82)
133 if email_end == -1:
134 raise ValueError("Missing token: >")
135 # END handle missing end brace
137 actor = Actor._from_string(info[82 : email_end + 1])
138 time, tz_offset = parse_date(info[email_end + 2 :]) # skipcq: PYL-W0621
140 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), msg))
143class RefLog(List[RefLogEntry], Serializable):
145 """A reflog contains RefLogEntrys, each of which defines a certain state
146 of the head in question. Custom query methods allow to retrieve log entries
147 by date or by other criteria.
149 Reflog entries are ordered, the first added entry is first in the list, the last
150 entry, i.e. the last change of the head or reference, is last in the list."""
152 __slots__ = ("_path",)
154 def __new__(cls, filepath: Union[PathLike, None] = None) -> "RefLog":
155 inst = super(RefLog, cls).__new__(cls)
156 return inst
158 def __init__(self, filepath: Union[PathLike, None] = None):
159 """Initialize this instance with an optional filepath, from which we will
160 initialize our data. The path is also used to write changes back using
161 the write() method"""
162 self._path = filepath
163 if filepath is not None:
164 self._read_from_file()
165 # END handle filepath
167 def _read_from_file(self) -> None:
168 try:
169 fmap = file_contents_ro_filepath(self._path, stream=True, allow_mmap=True)
170 except OSError:
171 # it is possible and allowed that the file doesn't exist !
172 return
173 # END handle invalid log
175 try:
176 self._deserialize(fmap)
177 finally:
178 fmap.close()
179 # END handle closing of handle
181 # { Interface
183 @classmethod
184 def from_file(cls, filepath: PathLike) -> "RefLog":
185 """
186 :return: a new RefLog instance containing all entries from the reflog
187 at the given filepath
188 :param filepath: path to reflog
189 :raise ValueError: If the file could not be read or was corrupted in some way"""
190 return cls(filepath)
192 @classmethod
193 def path(cls, ref: "SymbolicReference") -> str:
194 """
195 :return: string to absolute path at which the reflog of the given ref
196 instance would be found. The path is not guaranteed to point to a valid
197 file though.
198 :param ref: SymbolicReference instance"""
199 return osp.join(ref.repo.git_dir, "logs", to_native_path(ref.path))
201 @classmethod
202 def iter_entries(cls, stream: Union[str, "BytesIO", mmap]) -> Iterator[RefLogEntry]:
203 """
204 :return: Iterator yielding RefLogEntry instances, one for each line read
205 sfrom the given stream.
206 :param stream: file-like object containing the revlog in its native format
207 or string instance pointing to a file to read"""
208 new_entry = RefLogEntry.from_line
209 if isinstance(stream, str):
210 # default args return mmap on py>3
211 _stream = file_contents_ro_filepath(stream)
212 assert isinstance(_stream, mmap)
213 else:
214 _stream = stream
215 # END handle stream type
216 while True:
217 line = _stream.readline()
218 if not line:
219 return
220 yield new_entry(line.strip())
221 # END endless loop
223 @classmethod
224 def entry_at(cls, filepath: PathLike, index: int) -> "RefLogEntry":
225 """
226 :return: RefLogEntry at the given index
228 :param filepath: full path to the index file from which to read the entry
230 :param index: python list compatible index, i.e. it may be negative to
231 specify an entry counted from the end of the list
233 :raise IndexError: If the entry didn't exist
235 .. note:: This method is faster as it only parses the entry at index, skipping
236 all other lines. Nonetheless, the whole file has to be read if
237 the index is negative
238 """
239 with open(filepath, "rb") as fp:
240 if index < 0:
241 return RefLogEntry.from_line(fp.readlines()[index].strip())
242 # read until index is reached
244 for i in range(index + 1):
245 line = fp.readline()
246 if not line:
247 raise IndexError(f"Index file ended at line {i+1}, before given index was reached")
248 # END abort on eof
249 # END handle runup
251 return RefLogEntry.from_line(line.strip())
252 # END handle index
254 def to_file(self, filepath: PathLike) -> None:
255 """Write the contents of the reflog instance to a file at the given filepath.
256 :param filepath: path to file, parent directories are assumed to exist"""
257 lfd = LockedFD(filepath)
258 assure_directory_exists(filepath, is_file=True)
260 fp = lfd.open(write=True, stream=True)
261 try:
262 self._serialize(fp)
263 lfd.commit()
264 except Exception:
265 # on failure it rolls back automatically, but we make it clear
266 lfd.rollback()
267 raise
268 # END handle change
270 @classmethod
271 def append_entry(
272 cls,
273 config_reader: Union[Actor, "GitConfigParser", "SectionConstraint", None],
274 filepath: PathLike,
275 oldbinsha: bytes,
276 newbinsha: bytes,
277 message: str,
278 write: bool = True,
279 ) -> "RefLogEntry":
280 """Append a new log entry to the revlog at filepath.
282 :param config_reader: configuration reader of the repository - used to obtain
283 user information. May also be an Actor instance identifying the committer directly or None.
284 :param filepath: full path to the log file
285 :param oldbinsha: binary sha of the previous commit
286 :param newbinsha: binary sha of the current commit
287 :param message: message describing the change to the reference
288 :param write: If True, the changes will be written right away. Otherwise
289 the change will not be written
291 :return: RefLogEntry objects which was appended to the log
293 :note: As we are append-only, concurrent access is not a problem as we
294 do not interfere with readers."""
296 if len(oldbinsha) != 20 or len(newbinsha) != 20:
297 raise ValueError("Shas need to be given in binary format")
298 # END handle sha type
299 assure_directory_exists(filepath, is_file=True)
300 first_line = message.split("\n")[0]
301 if isinstance(config_reader, Actor):
302 committer = config_reader # mypy thinks this is Actor | Gitconfigparser, but why?
303 else:
304 committer = Actor.committer(config_reader)
305 entry = RefLogEntry(
306 (
307 bin_to_hex(oldbinsha).decode("ascii"),
308 bin_to_hex(newbinsha).decode("ascii"),
309 committer,
310 (int(_time.time()), _time.altzone),
311 first_line,
312 )
313 )
315 if write:
316 lf = LockFile(filepath)
317 lf._obtain_lock_or_raise()
318 fd = open(filepath, "ab")
319 try:
320 fd.write(entry.format().encode(defenc))
321 finally:
322 fd.close()
323 lf._release_lock()
324 # END handle write operation
325 return entry
327 def write(self) -> "RefLog":
328 """Write this instance's data to the file we are originating from
329 :return: self"""
330 if self._path is None:
331 raise ValueError("Instance was not initialized with a path, use to_file(...) instead")
332 # END assert path
333 self.to_file(self._path)
334 return self
336 # } END interface
338 # { Serializable Interface
339 def _serialize(self, stream: "BytesIO") -> "RefLog":
340 write = stream.write
342 # write all entries
343 for e in self:
344 write(e.format().encode(defenc))
345 # END for each entry
346 return self
348 def _deserialize(self, stream: "BytesIO") -> "RefLog":
349 self.extend(self.iter_entries(stream))
350 # } END serializable interface
351 return self