Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/gitdb/db/base.py: 30%

112 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors 

2# 

3# This module is part of GitDB and is released under 

4# the New BSD License: http://www.opensource.org/licenses/bsd-license.php 

5"""Contains implementations of database retrieveing objects""" 

6from gitdb.util import ( 

7 join, 

8 LazyMixin, 

9 hex_to_bin 

10) 

11 

12from gitdb.utils.encoding import force_text 

13from gitdb.exc import ( 

14 BadObject, 

15 AmbiguousObjectName 

16) 

17 

18from itertools import chain 

19from functools import reduce 

20 

21 

22__all__ = ('ObjectDBR', 'ObjectDBW', 'FileDBBase', 'CompoundDB', 'CachingDB') 

23 

24 

25class ObjectDBR: 

26 

27 """Defines an interface for object database lookup. 

28 Objects are identified either by their 20 byte bin sha""" 

29 

30 def __contains__(self, sha): 

31 return self.has_obj 

32 

33 #{ Query Interface 

34 def has_object(self, sha): 

35 """ 

36 Whether the object identified by the given 20 bytes 

37 binary sha is contained in the database 

38 

39 :return: True if the object identified by the given 20 bytes 

40 binary sha is contained in the database""" 

41 raise NotImplementedError("To be implemented in subclass") 

42 

43 def info(self, sha): 

44 """ :return: OInfo instance 

45 :param sha: bytes binary sha 

46 :raise BadObject:""" 

47 raise NotImplementedError("To be implemented in subclass") 

48 

49 def stream(self, sha): 

50 """:return: OStream instance 

51 :param sha: 20 bytes binary sha 

52 :raise BadObject:""" 

53 raise NotImplementedError("To be implemented in subclass") 

54 

55 def size(self): 

56 """:return: amount of objects in this database""" 

57 raise NotImplementedError() 

58 

59 def sha_iter(self): 

60 """Return iterator yielding 20 byte shas for all objects in this data base""" 

61 raise NotImplementedError() 

62 

63 #} END query interface 

64 

65 

66class ObjectDBW: 

67 

68 """Defines an interface to create objects in the database""" 

69 

70 def __init__(self, *args, **kwargs): 

71 self._ostream = None 

72 

73 #{ Edit Interface 

74 def set_ostream(self, stream): 

75 """ 

76 Adjusts the stream to which all data should be sent when storing new objects 

77 

78 :param stream: if not None, the stream to use, if None the default stream 

79 will be used. 

80 :return: previously installed stream, or None if there was no override 

81 :raise TypeError: if the stream doesn't have the supported functionality""" 

82 cstream = self._ostream 

83 self._ostream = stream 

84 return cstream 

85 

86 def ostream(self): 

87 """ 

88 Return the output stream 

89 

90 :return: overridden output stream this instance will write to, or None 

91 if it will write to the default stream""" 

92 return self._ostream 

93 

94 def store(self, istream): 

95 """ 

96 Create a new object in the database 

97 :return: the input istream object with its sha set to its corresponding value 

98 

99 :param istream: IStream compatible instance. If its sha is already set 

100 to a value, the object will just be stored in the our database format, 

101 in which case the input stream is expected to be in object format ( header + contents ). 

102 :raise IOError: if data could not be written""" 

103 raise NotImplementedError("To be implemented in subclass") 

104 

105 #} END edit interface 

106 

107 

108class FileDBBase: 

109 

110 """Provides basic facilities to retrieve files of interest, including 

111 caching facilities to help mapping hexsha's to objects""" 

112 

113 def __init__(self, root_path): 

114 """Initialize this instance to look for its files at the given root path 

115 All subsequent operations will be relative to this path 

116 :raise InvalidDBRoot: 

117 **Note:** The base will not perform any accessablity checking as the base 

118 might not yet be accessible, but become accessible before the first 

119 access.""" 

120 super().__init__() 

121 self._root_path = root_path 

122 

123 #{ Interface 

124 def root_path(self): 

125 """:return: path at which this db operates""" 

126 return self._root_path 

127 

128 def db_path(self, rela_path): 

129 """ 

130 :return: the given relative path relative to our database root, allowing 

131 to pontentially access datafiles""" 

132 return join(self._root_path, force_text(rela_path)) 

133 #} END interface 

134 

135 

136class CachingDB: 

137 

138 """A database which uses caches to speed-up access""" 

139 

140 #{ Interface 

141 def update_cache(self, force=False): 

142 """ 

143 Call this method if the underlying data changed to trigger an update 

144 of the internal caching structures. 

145 

146 :param force: if True, the update must be performed. Otherwise the implementation 

147 may decide not to perform an update if it thinks nothing has changed. 

148 :return: True if an update was performed as something change indeed""" 

149 

150 # END interface 

151 

152 

153def _databases_recursive(database, output): 

154 """Fill output list with database from db, in order. Deals with Loose, Packed 

155 and compound databases.""" 

156 if isinstance(database, CompoundDB): 

157 dbs = database.databases() 

158 output.extend(db for db in dbs if not isinstance(db, CompoundDB)) 

159 for cdb in (db for db in dbs if isinstance(db, CompoundDB)): 

160 _databases_recursive(cdb, output) 

161 else: 

162 output.append(database) 

163 # END handle database type 

164 

165 

166class CompoundDB(ObjectDBR, LazyMixin, CachingDB): 

167 

168 """A database which delegates calls to sub-databases. 

169 

170 Databases are stored in the lazy-loaded _dbs attribute. 

171 Define _set_cache_ to update it with your databases""" 

172 

173 def _set_cache_(self, attr): 

174 if attr == '_dbs': 

175 self._dbs = list() 

176 elif attr == '_db_cache': 

177 self._db_cache = dict() 

178 else: 

179 super()._set_cache_(attr) 

180 

181 def _db_query(self, sha): 

182 """:return: database containing the given 20 byte sha 

183 :raise BadObject:""" 

184 # most databases use binary representations, prevent converting 

185 # it every time a database is being queried 

186 try: 

187 return self._db_cache[sha] 

188 except KeyError: 

189 pass 

190 # END first level cache 

191 

192 for db in self._dbs: 

193 if db.has_object(sha): 

194 self._db_cache[sha] = db 

195 return db 

196 # END for each database 

197 raise BadObject(sha) 

198 

199 #{ ObjectDBR interface 

200 

201 def has_object(self, sha): 

202 try: 

203 self._db_query(sha) 

204 return True 

205 except BadObject: 

206 return False 

207 # END handle exceptions 

208 

209 def info(self, sha): 

210 return self._db_query(sha).info(sha) 

211 

212 def stream(self, sha): 

213 return self._db_query(sha).stream(sha) 

214 

215 def size(self): 

216 """:return: total size of all contained databases""" 

217 return reduce(lambda x, y: x + y, (db.size() for db in self._dbs), 0) 

218 

219 def sha_iter(self): 

220 return chain(*(db.sha_iter() for db in self._dbs)) 

221 

222 #} END object DBR Interface 

223 

224 #{ Interface 

225 

226 def databases(self): 

227 """:return: tuple of database instances we use for lookups""" 

228 return tuple(self._dbs) 

229 

230 def update_cache(self, force=False): 

231 # something might have changed, clear everything 

232 self._db_cache.clear() 

233 stat = False 

234 for db in self._dbs: 

235 if isinstance(db, CachingDB): 

236 stat |= db.update_cache(force) 

237 # END if is caching db 

238 # END for each database to update 

239 return stat 

240 

241 def partial_to_complete_sha_hex(self, partial_hexsha): 

242 """ 

243 :return: 20 byte binary sha1 from the given less-than-40 byte hexsha (bytes or str) 

244 :param partial_hexsha: hexsha with less than 40 byte 

245 :raise AmbiguousObjectName: """ 

246 databases = list() 

247 _databases_recursive(self, databases) 

248 partial_hexsha = force_text(partial_hexsha) 

249 len_partial_hexsha = len(partial_hexsha) 

250 if len_partial_hexsha % 2 != 0: 

251 partial_binsha = hex_to_bin(partial_hexsha + "0") 

252 else: 

253 partial_binsha = hex_to_bin(partial_hexsha) 

254 # END assure successful binary conversion 

255 

256 candidate = None 

257 for db in databases: 

258 full_bin_sha = None 

259 try: 

260 if hasattr(db, 'partial_to_complete_sha_hex'): 

261 full_bin_sha = db.partial_to_complete_sha_hex(partial_hexsha) 

262 else: 

263 full_bin_sha = db.partial_to_complete_sha(partial_binsha, len_partial_hexsha) 

264 # END handle database type 

265 except BadObject: 

266 continue 

267 # END ignore bad objects 

268 if full_bin_sha: 

269 if candidate and candidate != full_bin_sha: 

270 raise AmbiguousObjectName(partial_hexsha) 

271 candidate = full_bin_sha 

272 # END handle candidate 

273 # END for each db 

274 if not candidate: 

275 raise BadObject(partial_binsha) 

276 return candidate 

277 

278 #} END interface