Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/gitdb/db/base.py: 30%
112 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
2#
3# This module is part of GitDB and is released under
4# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
5"""Contains implementations of database retrieveing objects"""
6from gitdb.util import (
7 join,
8 LazyMixin,
9 hex_to_bin
10)
12from gitdb.utils.encoding import force_text
13from gitdb.exc import (
14 BadObject,
15 AmbiguousObjectName
16)
18from itertools import chain
19from functools import reduce
22__all__ = ('ObjectDBR', 'ObjectDBW', 'FileDBBase', 'CompoundDB', 'CachingDB')
25class ObjectDBR:
27 """Defines an interface for object database lookup.
28 Objects are identified either by their 20 byte bin sha"""
30 def __contains__(self, sha):
31 return self.has_obj
33 #{ Query Interface
34 def has_object(self, sha):
35 """
36 Whether the object identified by the given 20 bytes
37 binary sha is contained in the database
39 :return: True if the object identified by the given 20 bytes
40 binary sha is contained in the database"""
41 raise NotImplementedError("To be implemented in subclass")
43 def info(self, sha):
44 """ :return: OInfo instance
45 :param sha: bytes binary sha
46 :raise BadObject:"""
47 raise NotImplementedError("To be implemented in subclass")
49 def stream(self, sha):
50 """:return: OStream instance
51 :param sha: 20 bytes binary sha
52 :raise BadObject:"""
53 raise NotImplementedError("To be implemented in subclass")
55 def size(self):
56 """:return: amount of objects in this database"""
57 raise NotImplementedError()
59 def sha_iter(self):
60 """Return iterator yielding 20 byte shas for all objects in this data base"""
61 raise NotImplementedError()
63 #} END query interface
66class ObjectDBW:
68 """Defines an interface to create objects in the database"""
70 def __init__(self, *args, **kwargs):
71 self._ostream = None
73 #{ Edit Interface
74 def set_ostream(self, stream):
75 """
76 Adjusts the stream to which all data should be sent when storing new objects
78 :param stream: if not None, the stream to use, if None the default stream
79 will be used.
80 :return: previously installed stream, or None if there was no override
81 :raise TypeError: if the stream doesn't have the supported functionality"""
82 cstream = self._ostream
83 self._ostream = stream
84 return cstream
86 def ostream(self):
87 """
88 Return the output stream
90 :return: overridden output stream this instance will write to, or None
91 if it will write to the default stream"""
92 return self._ostream
94 def store(self, istream):
95 """
96 Create a new object in the database
97 :return: the input istream object with its sha set to its corresponding value
99 :param istream: IStream compatible instance. If its sha is already set
100 to a value, the object will just be stored in the our database format,
101 in which case the input stream is expected to be in object format ( header + contents ).
102 :raise IOError: if data could not be written"""
103 raise NotImplementedError("To be implemented in subclass")
105 #} END edit interface
108class FileDBBase:
110 """Provides basic facilities to retrieve files of interest, including
111 caching facilities to help mapping hexsha's to objects"""
113 def __init__(self, root_path):
114 """Initialize this instance to look for its files at the given root path
115 All subsequent operations will be relative to this path
116 :raise InvalidDBRoot:
117 **Note:** The base will not perform any accessablity checking as the base
118 might not yet be accessible, but become accessible before the first
119 access."""
120 super().__init__()
121 self._root_path = root_path
123 #{ Interface
124 def root_path(self):
125 """:return: path at which this db operates"""
126 return self._root_path
128 def db_path(self, rela_path):
129 """
130 :return: the given relative path relative to our database root, allowing
131 to pontentially access datafiles"""
132 return join(self._root_path, force_text(rela_path))
133 #} END interface
136class CachingDB:
138 """A database which uses caches to speed-up access"""
140 #{ Interface
141 def update_cache(self, force=False):
142 """
143 Call this method if the underlying data changed to trigger an update
144 of the internal caching structures.
146 :param force: if True, the update must be performed. Otherwise the implementation
147 may decide not to perform an update if it thinks nothing has changed.
148 :return: True if an update was performed as something change indeed"""
150 # END interface
153def _databases_recursive(database, output):
154 """Fill output list with database from db, in order. Deals with Loose, Packed
155 and compound databases."""
156 if isinstance(database, CompoundDB):
157 dbs = database.databases()
158 output.extend(db for db in dbs if not isinstance(db, CompoundDB))
159 for cdb in (db for db in dbs if isinstance(db, CompoundDB)):
160 _databases_recursive(cdb, output)
161 else:
162 output.append(database)
163 # END handle database type
166class CompoundDB(ObjectDBR, LazyMixin, CachingDB):
168 """A database which delegates calls to sub-databases.
170 Databases are stored in the lazy-loaded _dbs attribute.
171 Define _set_cache_ to update it with your databases"""
173 def _set_cache_(self, attr):
174 if attr == '_dbs':
175 self._dbs = list()
176 elif attr == '_db_cache':
177 self._db_cache = dict()
178 else:
179 super()._set_cache_(attr)
181 def _db_query(self, sha):
182 """:return: database containing the given 20 byte sha
183 :raise BadObject:"""
184 # most databases use binary representations, prevent converting
185 # it every time a database is being queried
186 try:
187 return self._db_cache[sha]
188 except KeyError:
189 pass
190 # END first level cache
192 for db in self._dbs:
193 if db.has_object(sha):
194 self._db_cache[sha] = db
195 return db
196 # END for each database
197 raise BadObject(sha)
199 #{ ObjectDBR interface
201 def has_object(self, sha):
202 try:
203 self._db_query(sha)
204 return True
205 except BadObject:
206 return False
207 # END handle exceptions
209 def info(self, sha):
210 return self._db_query(sha).info(sha)
212 def stream(self, sha):
213 return self._db_query(sha).stream(sha)
215 def size(self):
216 """:return: total size of all contained databases"""
217 return reduce(lambda x, y: x + y, (db.size() for db in self._dbs), 0)
219 def sha_iter(self):
220 return chain(*(db.sha_iter() for db in self._dbs))
222 #} END object DBR Interface
224 #{ Interface
226 def databases(self):
227 """:return: tuple of database instances we use for lookups"""
228 return tuple(self._dbs)
230 def update_cache(self, force=False):
231 # something might have changed, clear everything
232 self._db_cache.clear()
233 stat = False
234 for db in self._dbs:
235 if isinstance(db, CachingDB):
236 stat |= db.update_cache(force)
237 # END if is caching db
238 # END for each database to update
239 return stat
241 def partial_to_complete_sha_hex(self, partial_hexsha):
242 """
243 :return: 20 byte binary sha1 from the given less-than-40 byte hexsha (bytes or str)
244 :param partial_hexsha: hexsha with less than 40 byte
245 :raise AmbiguousObjectName: """
246 databases = list()
247 _databases_recursive(self, databases)
248 partial_hexsha = force_text(partial_hexsha)
249 len_partial_hexsha = len(partial_hexsha)
250 if len_partial_hexsha % 2 != 0:
251 partial_binsha = hex_to_bin(partial_hexsha + "0")
252 else:
253 partial_binsha = hex_to_bin(partial_hexsha)
254 # END assure successful binary conversion
256 candidate = None
257 for db in databases:
258 full_bin_sha = None
259 try:
260 if hasattr(db, 'partial_to_complete_sha_hex'):
261 full_bin_sha = db.partial_to_complete_sha_hex(partial_hexsha)
262 else:
263 full_bin_sha = db.partial_to_complete_sha(partial_binsha, len_partial_hexsha)
264 # END handle database type
265 except BadObject:
266 continue
267 # END ignore bad objects
268 if full_bin_sha:
269 if candidate and candidate != full_bin_sha:
270 raise AmbiguousObjectName(partial_hexsha)
271 candidate = full_bin_sha
272 # END handle candidate
273 # END for each db
274 if not candidate:
275 raise BadObject(partial_binsha)
276 return candidate
278 #} END interface