Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/objects/commit.py: 15%
296 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# commit.py
2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
3#
4# This module is part of GitPython and is released under
5# the BSD License: http://www.opensource.org/licenses/bsd-license.php
6import datetime
7import re
8from subprocess import Popen, PIPE
9from gitdb import IStream
10from git.util import hex_to_bin, Actor, Stats, finalize_process
11from git.diff import Diffable
12from git.cmd import Git
14from .tree import Tree
15from . import base
16from .util import (
17 Serializable,
18 TraversableIterableObj,
19 parse_date,
20 altz_to_utctz_str,
21 parse_actor_and_date,
22 from_timestamp,
23)
25from time import time, daylight, altzone, timezone, localtime
26import os
27from io import BytesIO
28import logging
31# typing ------------------------------------------------------------------
33from typing import (
34 Any,
35 IO,
36 Iterator,
37 List,
38 Sequence,
39 Tuple,
40 Union,
41 TYPE_CHECKING,
42 cast,
43 Dict,
44)
46from git.types import PathLike, Literal
48if TYPE_CHECKING: 48 ↛ 49line 48 didn't jump to line 49, because the condition on line 48 was never true
49 from git.repo import Repo
50 from git.refs import SymbolicReference
52# ------------------------------------------------------------------------
54log = logging.getLogger("git.objects.commit")
55log.addHandler(logging.NullHandler())
57__all__ = ("Commit",)
60class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
62 """Wraps a git Commit object.
64 This class will act lazily on some of its attributes and will query the
65 value on demand only if it involves calling the git binary."""
67 # ENVIRONMENT VARIABLES
68 # read when creating new commits
69 env_author_date = "GIT_AUTHOR_DATE"
70 env_committer_date = "GIT_COMMITTER_DATE"
72 # CONFIGURATION KEYS
73 conf_encoding = "i18n.commitencoding"
75 # INVARIANTS
76 default_encoding = "UTF-8"
78 # object configuration
79 type: Literal["commit"] = "commit"
80 __slots__ = (
81 "tree",
82 "author",
83 "authored_date",
84 "author_tz_offset",
85 "committer",
86 "committed_date",
87 "committer_tz_offset",
88 "message",
89 "parents",
90 "encoding",
91 "gpgsig",
92 )
93 _id_attribute_ = "hexsha"
95 def __init__(
96 self,
97 repo: "Repo",
98 binsha: bytes,
99 tree: Union[Tree, None] = None,
100 author: Union[Actor, None] = None,
101 authored_date: Union[int, None] = None,
102 author_tz_offset: Union[None, float] = None,
103 committer: Union[Actor, None] = None,
104 committed_date: Union[int, None] = None,
105 committer_tz_offset: Union[None, float] = None,
106 message: Union[str, bytes, None] = None,
107 parents: Union[Sequence["Commit"], None] = None,
108 encoding: Union[str, None] = None,
109 gpgsig: Union[str, None] = None,
110 ) -> None:
111 """Instantiate a new Commit. All keyword arguments taking None as default will
112 be implicitly set on first query.
114 :param binsha: 20 byte sha1
115 :param parents: tuple( Commit, ... )
116 is a tuple of commit ids or actual Commits
117 :param tree: Tree object
118 :param author: Actor
119 is the author Actor object
120 :param authored_date: int_seconds_since_epoch
121 is the authored DateTime - use time.gmtime() to convert it into a
122 different format
123 :param author_tz_offset: int_seconds_west_of_utc
124 is the timezone that the authored_date is in
125 :param committer: Actor
126 is the committer string
127 :param committed_date: int_seconds_since_epoch
128 is the committed DateTime - use time.gmtime() to convert it into a
129 different format
130 :param committer_tz_offset: int_seconds_west_of_utc
131 is the timezone that the committed_date is in
132 :param message: string
133 is the commit message
134 :param encoding: string
135 encoding of the message, defaults to UTF-8
136 :param parents:
137 List or tuple of Commit objects which are our parent(s) in the commit
138 dependency graph
139 :return: git.Commit
141 :note:
142 Timezone information is in the same format and in the same sign
143 as what time.altzone returns. The sign is inverted compared to git's
144 UTC timezone."""
145 super(Commit, self).__init__(repo, binsha)
146 self.binsha = binsha
147 if tree is not None:
148 assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree)
149 if tree is not None:
150 self.tree = tree
151 if author is not None:
152 self.author = author
153 if authored_date is not None:
154 self.authored_date = authored_date
155 if author_tz_offset is not None:
156 self.author_tz_offset = author_tz_offset
157 if committer is not None:
158 self.committer = committer
159 if committed_date is not None:
160 self.committed_date = committed_date
161 if committer_tz_offset is not None:
162 self.committer_tz_offset = committer_tz_offset
163 if message is not None:
164 self.message = message
165 if parents is not None:
166 self.parents = parents
167 if encoding is not None:
168 self.encoding = encoding
169 if gpgsig is not None:
170 self.gpgsig = gpgsig
172 @classmethod
173 def _get_intermediate_items(cls, commit: "Commit") -> Tuple["Commit", ...]:
174 return tuple(commit.parents)
176 @classmethod
177 def _calculate_sha_(cls, repo: "Repo", commit: "Commit") -> bytes:
178 """Calculate the sha of a commit.
180 :param repo: Repo object the commit should be part of
181 :param commit: Commit object for which to generate the sha
182 """
184 stream = BytesIO()
185 commit._serialize(stream)
186 streamlen = stream.tell()
187 stream.seek(0)
189 istream = repo.odb.store(IStream(cls.type, streamlen, stream))
190 return istream.binsha
192 def replace(self, **kwargs: Any) -> "Commit":
193 """Create new commit object from existing commit object.
195 Any values provided as keyword arguments will replace the
196 corresponding attribute in the new object.
197 """
199 attrs = {k: getattr(self, k) for k in self.__slots__}
201 for attrname in kwargs:
202 if attrname not in self.__slots__:
203 raise ValueError("invalid attribute name")
205 attrs.update(kwargs)
206 new_commit = self.__class__(self.repo, self.NULL_BIN_SHA, **attrs)
207 new_commit.binsha = self._calculate_sha_(self.repo, new_commit)
209 return new_commit
211 def _set_cache_(self, attr: str) -> None:
212 if attr in Commit.__slots__:
213 # read the data in a chunk, its faster - then provide a file wrapper
214 _binsha, _typename, self.size, stream = self.repo.odb.stream(self.binsha)
215 self._deserialize(BytesIO(stream.read()))
216 else:
217 super(Commit, self)._set_cache_(attr)
218 # END handle attrs
220 @property
221 def authored_datetime(self) -> datetime.datetime:
222 return from_timestamp(self.authored_date, self.author_tz_offset)
224 @property
225 def committed_datetime(self) -> datetime.datetime:
226 return from_timestamp(self.committed_date, self.committer_tz_offset)
228 @property
229 def summary(self) -> Union[str, bytes]:
230 """:return: First line of the commit message"""
231 if isinstance(self.message, str):
232 return self.message.split("\n", 1)[0]
233 else:
234 return self.message.split(b"\n", 1)[0]
236 def count(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> int:
237 """Count the number of commits reachable from this commit
239 :param paths:
240 is an optional path or a list of paths restricting the return value
241 to commits actually containing the paths
243 :param kwargs:
244 Additional options to be passed to git-rev-list. They must not alter
245 the output style of the command, or parsing will yield incorrect results
246 :return: int defining the number of reachable commits"""
247 # yes, it makes a difference whether empty paths are given or not in our case
248 # as the empty paths version will ignore merge commits for some reason.
249 if paths:
250 return len(self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines())
251 return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines())
253 @property
254 def name_rev(self) -> str:
255 """
256 :return:
257 String describing the commits hex sha based on the closest Reference.
258 Mostly useful for UI purposes"""
259 return self.repo.git.name_rev(self)
261 @classmethod
262 def iter_items(
263 cls,
264 repo: "Repo",
265 rev: Union[str, "Commit", "SymbolicReference"], # type: ignore
266 paths: Union[PathLike, Sequence[PathLike]] = "",
267 **kwargs: Any,
268 ) -> Iterator["Commit"]:
269 """Find all commits matching the given criteria.
271 :param repo: is the Repo
272 :param rev: revision specifier, see git-rev-parse for viable options
273 :param paths:
274 is an optional path or list of paths, if set only Commits that include the path
275 or paths will be considered
276 :param kwargs:
277 optional keyword arguments to git rev-list where
278 ``max_count`` is the maximum number of commits to fetch
279 ``skip`` is the number of commits to skip
280 ``since`` all commits since i.e. '1970-01-01'
281 :return: iterator yielding Commit items"""
282 if "pretty" in kwargs:
283 raise ValueError("--pretty cannot be used as parsing expects single sha's only")
284 # END handle pretty
286 # use -- in any case, to prevent possibility of ambiguous arguments
287 # see https://github.com/gitpython-developers/GitPython/issues/264
289 args_list: List[PathLike] = ["--"]
291 if paths:
292 paths_tup: Tuple[PathLike, ...]
293 if isinstance(paths, (str, os.PathLike)):
294 paths_tup = (paths,)
295 else:
296 paths_tup = tuple(paths)
298 args_list.extend(paths_tup)
299 # END if paths
301 proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs)
302 return cls._iter_from_process_or_stream(repo, proc)
304 def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> Iterator["Commit"]:
305 """Iterate _all_ parents of this commit.
307 :param paths:
308 Optional path or list of paths limiting the Commits to those that
309 contain at least one of the paths
310 :param kwargs: All arguments allowed by git-rev-list
311 :return: Iterator yielding Commit objects which are parents of self"""
312 # skip ourselves
313 skip = kwargs.get("skip", 1)
314 if skip == 0: # skip ourselves
315 skip = 1
316 kwargs["skip"] = skip
318 return self.iter_items(self.repo, self, paths, **kwargs)
320 @property
321 def stats(self) -> Stats:
322 """Create a git stat from changes between this commit and its first parent
323 or from all changes done if this is the very first commit.
325 :return: git.Stats"""
326 if not self.parents:
327 text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, root=True)
328 text2 = ""
329 for line in text.splitlines()[1:]:
330 (insertions, deletions, filename) = line.split("\t")
331 text2 += "%s\t%s\t%s\n" % (insertions, deletions, filename)
332 text = text2
333 else:
334 text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, "--", numstat=True)
335 return Stats._list_from_string(self.repo, text)
337 @property
338 def trailers(self) -> Dict:
339 """Get the trailers of the message as dictionary
341 Git messages can contain trailer information that are similar to RFC 822
342 e-mail headers (see: https://git-scm.com/docs/git-interpret-trailers).
344 This functions calls ``git interpret-trailers --parse`` onto the message
345 to extract the trailer information. The key value pairs are stripped of
346 leading and trailing whitespaces before they get saved into a dictionary.
348 Valid message with trailer:
350 .. code-block::
352 Subject line
354 some body information
356 another information
358 key1: value1
359 key2 : value 2 with inner spaces
361 dictionary will look like this:
363 .. code-block::
365 {
366 "key1": "value1",
367 "key2": "value 2 with inner spaces"
368 }
370 :return: Dictionary containing whitespace stripped trailer information
372 """
373 d = {}
374 cmd = ["git", "interpret-trailers", "--parse"]
375 proc: Git.AutoInterrupt = self.repo.git.execute(cmd, as_process=True, istream=PIPE) # type: ignore
376 trailer: str = proc.communicate(str(self.message).encode())[0].decode()
377 if trailer.endswith("\n"):
378 trailer = trailer[0:-1]
379 if trailer != "":
380 for line in trailer.split("\n"):
381 key, value = line.split(":", 1)
382 d[key.strip()] = value.strip()
383 return d
385 @classmethod
386 def _iter_from_process_or_stream(cls, repo: "Repo", proc_or_stream: Union[Popen, IO]) -> Iterator["Commit"]:
387 """Parse out commit information into a list of Commit objects
388 We expect one-line per commit, and parse the actual commit information directly
389 from our lighting fast object database
391 :param proc: git-rev-list process instance - one sha per line
392 :return: iterator returning Commit objects"""
394 # def is_proc(inp) -> TypeGuard[Popen]:
395 # return hasattr(proc_or_stream, 'wait') and not hasattr(proc_or_stream, 'readline')
397 # def is_stream(inp) -> TypeGuard[IO]:
398 # return hasattr(proc_or_stream, 'readline')
400 if hasattr(proc_or_stream, "wait"):
401 proc_or_stream = cast(Popen, proc_or_stream)
402 if proc_or_stream.stdout is not None:
403 stream = proc_or_stream.stdout
404 elif hasattr(proc_or_stream, "readline"):
405 proc_or_stream = cast(IO, proc_or_stream)
406 stream = proc_or_stream
408 readline = stream.readline
409 while True:
410 line = readline()
411 if not line:
412 break
413 hexsha = line.strip()
414 if len(hexsha) > 40:
415 # split additional information, as returned by bisect for instance
416 hexsha, _ = line.split(None, 1)
417 # END handle extra info
419 assert len(hexsha) == 40, "Invalid line: %s" % hexsha
420 yield cls(repo, hex_to_bin(hexsha))
421 # END for each line in stream
422 # TODO: Review this - it seems process handling got a bit out of control
423 # due to many developers trying to fix the open file handles issue
424 if hasattr(proc_or_stream, "wait"):
425 proc_or_stream = cast(Popen, proc_or_stream)
426 finalize_process(proc_or_stream)
428 @classmethod
429 def create_from_tree(
430 cls,
431 repo: "Repo",
432 tree: Union[Tree, str],
433 message: str,
434 parent_commits: Union[None, List["Commit"]] = None,
435 head: bool = False,
436 author: Union[None, Actor] = None,
437 committer: Union[None, Actor] = None,
438 author_date: Union[None, str] = None,
439 commit_date: Union[None, str] = None,
440 ) -> "Commit":
441 """Commit the given tree, creating a commit object.
443 :param repo: Repo object the commit should be part of
444 :param tree: Tree object or hex or bin sha
445 the tree of the new commit
446 :param message: Commit message. It may be an empty string if no message is provided.
447 It will be converted to a string , in any case.
448 :param parent_commits:
449 Optional Commit objects to use as parents for the new commit.
450 If empty list, the commit will have no parents at all and become
451 a root commit.
452 If None , the current head commit will be the parent of the
453 new commit object
454 :param head:
455 If True, the HEAD will be advanced to the new commit automatically.
456 Else the HEAD will remain pointing on the previous commit. This could
457 lead to undesired results when diffing files.
458 :param author: The name of the author, optional. If unset, the repository
459 configuration is used to obtain this value.
460 :param committer: The name of the committer, optional. If unset, the
461 repository configuration is used to obtain this value.
462 :param author_date: The timestamp for the author field
463 :param commit_date: The timestamp for the committer field
465 :return: Commit object representing the new commit
467 :note:
468 Additional information about the committer and Author are taken from the
469 environment or from the git configuration, see git-commit-tree for
470 more information"""
471 if parent_commits is None:
472 try:
473 parent_commits = [repo.head.commit]
474 except ValueError:
475 # empty repositories have no head commit
476 parent_commits = []
477 # END handle parent commits
478 else:
479 for p in parent_commits:
480 if not isinstance(p, cls):
481 raise ValueError(f"Parent commit '{p!r}' must be of type {cls}")
482 # end check parent commit types
483 # END if parent commits are unset
485 # retrieve all additional information, create a commit object, and
486 # serialize it
487 # Generally:
488 # * Environment variables override configuration values
489 # * Sensible defaults are set according to the git documentation
491 # COMMITTER AND AUTHOR INFO
492 cr = repo.config_reader()
493 env = os.environ
495 committer = committer or Actor.committer(cr)
496 author = author or Actor.author(cr)
498 # PARSE THE DATES
499 unix_time = int(time())
500 is_dst = daylight and localtime().tm_isdst > 0
501 offset = altzone if is_dst else timezone
503 author_date_str = env.get(cls.env_author_date, "")
504 if author_date:
505 author_time, author_offset = parse_date(author_date)
506 elif author_date_str:
507 author_time, author_offset = parse_date(author_date_str)
508 else:
509 author_time, author_offset = unix_time, offset
510 # END set author time
512 committer_date_str = env.get(cls.env_committer_date, "")
513 if commit_date:
514 committer_time, committer_offset = parse_date(commit_date)
515 elif committer_date_str:
516 committer_time, committer_offset = parse_date(committer_date_str)
517 else:
518 committer_time, committer_offset = unix_time, offset
519 # END set committer time
521 # assume utf8 encoding
522 enc_section, enc_option = cls.conf_encoding.split(".")
523 conf_encoding = cr.get_value(enc_section, enc_option, cls.default_encoding)
524 if not isinstance(conf_encoding, str):
525 raise TypeError("conf_encoding could not be coerced to str")
527 # if the tree is no object, make sure we create one - otherwise
528 # the created commit object is invalid
529 if isinstance(tree, str):
530 tree = repo.tree(tree)
531 # END tree conversion
533 # CREATE NEW COMMIT
534 new_commit = cls(
535 repo,
536 cls.NULL_BIN_SHA,
537 tree,
538 author,
539 author_time,
540 author_offset,
541 committer,
542 committer_time,
543 committer_offset,
544 message,
545 parent_commits,
546 conf_encoding,
547 )
549 new_commit.binsha = cls._calculate_sha_(repo, new_commit)
551 if head:
552 # need late import here, importing git at the very beginning throws
553 # as well ...
554 import git.refs
556 try:
557 repo.head.set_commit(new_commit, logmsg=message)
558 except ValueError:
559 # head is not yet set to the ref our HEAD points to
560 # Happens on first commit
561 master = git.refs.Head.create(
562 repo,
563 repo.head.ref,
564 new_commit,
565 logmsg="commit (initial): %s" % message,
566 )
567 repo.head.set_reference(master, logmsg="commit: Switching to %s" % master)
568 # END handle empty repositories
569 # END advance head handling
571 return new_commit
573 # { Serializable Implementation
575 def _serialize(self, stream: BytesIO) -> "Commit":
576 write = stream.write
577 write(("tree %s\n" % self.tree).encode("ascii"))
578 for p in self.parents:
579 write(("parent %s\n" % p).encode("ascii"))
581 a = self.author
582 aname = a.name
583 c = self.committer
584 fmt = "%s %s <%s> %s %s\n"
585 write(
586 (
587 fmt
588 % (
589 "author",
590 aname,
591 a.email,
592 self.authored_date,
593 altz_to_utctz_str(self.author_tz_offset),
594 )
595 ).encode(self.encoding)
596 )
598 # encode committer
599 aname = c.name
600 write(
601 (
602 fmt
603 % (
604 "committer",
605 aname,
606 c.email,
607 self.committed_date,
608 altz_to_utctz_str(self.committer_tz_offset),
609 )
610 ).encode(self.encoding)
611 )
613 if self.encoding != self.default_encoding:
614 write(("encoding %s\n" % self.encoding).encode("ascii"))
616 try:
617 if self.__getattribute__("gpgsig"):
618 write(b"gpgsig")
619 for sigline in self.gpgsig.rstrip("\n").split("\n"):
620 write((" " + sigline + "\n").encode("ascii"))
621 except AttributeError:
622 pass
624 write(b"\n")
626 # write plain bytes, be sure its encoded according to our encoding
627 if isinstance(self.message, str):
628 write(self.message.encode(self.encoding))
629 else:
630 write(self.message)
631 # END handle encoding
632 return self
634 def _deserialize(self, stream: BytesIO) -> "Commit":
635 """
636 :param from_rev_list: if true, the stream format is coming from the rev-list command
637 Otherwise it is assumed to be a plain data stream from our object
638 """
639 readline = stream.readline
640 self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, "")
642 self.parents = []
643 next_line = None
644 while True:
645 parent_line = readline()
646 if not parent_line.startswith(b"parent"):
647 next_line = parent_line
648 break
649 # END abort reading parents
650 self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii"))))
651 # END for each parent line
652 self.parents = tuple(self.parents)
654 # we don't know actual author encoding before we have parsed it, so keep the lines around
655 author_line = next_line
656 committer_line = readline()
658 # we might run into one or more mergetag blocks, skip those for now
659 next_line = readline()
660 while next_line.startswith(b"mergetag "):
661 next_line = readline()
662 while next_line.startswith(b" "):
663 next_line = readline()
664 # end skip mergetags
666 # now we can have the encoding line, or an empty line followed by the optional
667 # message.
668 self.encoding = self.default_encoding
669 self.gpgsig = ""
671 # read headers
672 enc = next_line
673 buf = enc.strip()
674 while buf:
675 if buf[0:10] == b"encoding ":
676 self.encoding = buf[buf.find(b" ") + 1 :].decode(self.encoding, "ignore")
677 elif buf[0:7] == b"gpgsig ":
678 sig = buf[buf.find(b" ") + 1 :] + b"\n"
679 is_next_header = False
680 while True:
681 sigbuf = readline()
682 if not sigbuf:
683 break
684 if sigbuf[0:1] != b" ":
685 buf = sigbuf.strip()
686 is_next_header = True
687 break
688 sig += sigbuf[1:]
689 # end read all signature
690 self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, "ignore")
691 if is_next_header:
692 continue
693 buf = readline().strip()
694 # decode the authors name
696 try:
697 (
698 self.author,
699 self.authored_date,
700 self.author_tz_offset,
701 ) = parse_actor_and_date(author_line.decode(self.encoding, "replace"))
702 except UnicodeDecodeError:
703 log.error(
704 "Failed to decode author line '%s' using encoding %s",
705 author_line,
706 self.encoding,
707 exc_info=True,
708 )
710 try:
711 (
712 self.committer,
713 self.committed_date,
714 self.committer_tz_offset,
715 ) = parse_actor_and_date(committer_line.decode(self.encoding, "replace"))
716 except UnicodeDecodeError:
717 log.error(
718 "Failed to decode committer line '%s' using encoding %s",
719 committer_line,
720 self.encoding,
721 exc_info=True,
722 )
723 # END handle author's encoding
725 # a stream from our data simply gives us the plain message
726 # The end of our message stream is marked with a newline that we strip
727 self.message = stream.read()
728 try:
729 self.message = self.message.decode(self.encoding, "replace")
730 except UnicodeDecodeError:
731 log.error(
732 "Failed to decode message '%s' using encoding %s",
733 self.message,
734 self.encoding,
735 exc_info=True,
736 )
737 # END exception handling
739 return self
741 # } END serializable implementation
743 @property
744 def co_authors(self) -> List[Actor]:
745 """
746 Search the commit message for any co-authors of this commit.
747 Details on co-authors: https://github.blog/2018-01-29-commit-together-with-co-authors/
749 :return: List of co-authors for this commit (as Actor objects).
750 """
751 co_authors = []
753 if self.message:
754 results = re.findall(
755 r"^Co-authored-by: (.*) <(.*?)>$",
756 self.message,
757 re.MULTILINE,
758 )
759 for author in results:
760 co_authors.append(Actor(*author))
762 return co_authors