Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/core/cache/backends/db.py: 16%

149 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1"Database cache backend." 

2import base64 

3import pickle 

4from datetime import datetime 

5 

6from django.conf import settings 

7from django.core.cache.backends.base import DEFAULT_TIMEOUT, BaseCache 

8from django.db import DatabaseError, connections, models, router, transaction 

9from django.utils import timezone 

10 

11 

12class Options: 

13 """A class that will quack like a Django model _meta class. 

14 

15 This allows cache operations to be controlled by the router 

16 """ 

17 

18 def __init__(self, table): 

19 self.db_table = table 

20 self.app_label = "django_cache" 

21 self.model_name = "cacheentry" 

22 self.verbose_name = "cache entry" 

23 self.verbose_name_plural = "cache entries" 

24 self.object_name = "CacheEntry" 

25 self.abstract = False 

26 self.managed = True 

27 self.proxy = False 

28 self.swapped = False 

29 

30 

31class BaseDatabaseCache(BaseCache): 

32 def __init__(self, table, params): 

33 super().__init__(params) 

34 self._table = table 

35 

36 class CacheEntry: 

37 _meta = Options(table) 

38 

39 self.cache_model_class = CacheEntry 

40 

41 

42class DatabaseCache(BaseDatabaseCache): 

43 

44 # This class uses cursors provided by the database connection. This means 

45 # it reads expiration values as aware or naive datetimes, depending on the 

46 # value of USE_TZ and whether the database supports time zones. The ORM's 

47 # conversion and adaptation infrastructure is then used to avoid comparing 

48 # aware and naive datetimes accidentally. 

49 

50 pickle_protocol = pickle.HIGHEST_PROTOCOL 

51 

52 def get(self, key, default=None, version=None): 

53 return self.get_many([key], version).get(key, default) 

54 

55 def get_many(self, keys, version=None): 

56 if not keys: 

57 return {} 

58 

59 key_map = { 

60 self.make_and_validate_key(key, version=version): key for key in keys 

61 } 

62 

63 db = router.db_for_read(self.cache_model_class) 

64 connection = connections[db] 

65 quote_name = connection.ops.quote_name 

66 table = quote_name(self._table) 

67 

68 with connection.cursor() as cursor: 

69 cursor.execute( 

70 "SELECT %s, %s, %s FROM %s WHERE %s IN (%s)" 

71 % ( 

72 quote_name("cache_key"), 

73 quote_name("value"), 

74 quote_name("expires"), 

75 table, 

76 quote_name("cache_key"), 

77 ", ".join(["%s"] * len(key_map)), 

78 ), 

79 list(key_map), 

80 ) 

81 rows = cursor.fetchall() 

82 

83 result = {} 

84 expired_keys = [] 

85 expression = models.Expression(output_field=models.DateTimeField()) 

86 converters = connection.ops.get_db_converters( 

87 expression 

88 ) + expression.get_db_converters(connection) 

89 for key, value, expires in rows: 

90 for converter in converters: 

91 expires = converter(expires, expression, connection) 

92 if expires < timezone.now(): 

93 expired_keys.append(key) 

94 else: 

95 value = connection.ops.process_clob(value) 

96 value = pickle.loads(base64.b64decode(value.encode())) 

97 result[key_map.get(key)] = value 

98 self._base_delete_many(expired_keys) 

99 return result 

100 

101 def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None): 

102 key = self.make_and_validate_key(key, version=version) 

103 self._base_set("set", key, value, timeout) 

104 

105 def add(self, key, value, timeout=DEFAULT_TIMEOUT, version=None): 

106 key = self.make_and_validate_key(key, version=version) 

107 return self._base_set("add", key, value, timeout) 

108 

109 def touch(self, key, timeout=DEFAULT_TIMEOUT, version=None): 

110 key = self.make_and_validate_key(key, version=version) 

111 return self._base_set("touch", key, None, timeout) 

112 

113 def _base_set(self, mode, key, value, timeout=DEFAULT_TIMEOUT): 

114 timeout = self.get_backend_timeout(timeout) 

115 db = router.db_for_write(self.cache_model_class) 

116 connection = connections[db] 

117 quote_name = connection.ops.quote_name 

118 table = quote_name(self._table) 

119 

120 with connection.cursor() as cursor: 

121 cursor.execute("SELECT COUNT(*) FROM %s" % table) 

122 num = cursor.fetchone()[0] 

123 now = timezone.now() 

124 now = now.replace(microsecond=0) 

125 if timeout is None: 

126 exp = datetime.max 

127 else: 

128 tz = timezone.utc if settings.USE_TZ else None 

129 exp = datetime.fromtimestamp(timeout, tz=tz) 

130 exp = exp.replace(microsecond=0) 

131 if num > self._max_entries: 

132 self._cull(db, cursor, now, num) 

133 pickled = pickle.dumps(value, self.pickle_protocol) 

134 # The DB column is expecting a string, so make sure the value is a 

135 # string, not bytes. Refs #19274. 

136 b64encoded = base64.b64encode(pickled).decode("latin1") 

137 try: 

138 # Note: typecasting for datetimes is needed by some 3rd party 

139 # database backends. All core backends work without typecasting, 

140 # so be careful about changes here - test suite will NOT pick 

141 # regressions. 

142 with transaction.atomic(using=db): 

143 cursor.execute( 

144 "SELECT %s, %s FROM %s WHERE %s = %%s" 

145 % ( 

146 quote_name("cache_key"), 

147 quote_name("expires"), 

148 table, 

149 quote_name("cache_key"), 

150 ), 

151 [key], 

152 ) 

153 result = cursor.fetchone() 

154 

155 if result: 

156 current_expires = result[1] 

157 expression = models.Expression( 

158 output_field=models.DateTimeField() 

159 ) 

160 for converter in connection.ops.get_db_converters( 

161 expression 

162 ) + expression.get_db_converters(connection): 

163 current_expires = converter( 

164 current_expires, expression, connection 

165 ) 

166 

167 exp = connection.ops.adapt_datetimefield_value(exp) 

168 if result and mode == "touch": 

169 cursor.execute( 

170 "UPDATE %s SET %s = %%s WHERE %s = %%s" 

171 % (table, quote_name("expires"), quote_name("cache_key")), 

172 [exp, key], 

173 ) 

174 elif result and ( 

175 mode == "set" or (mode == "add" and current_expires < now) 

176 ): 

177 cursor.execute( 

178 "UPDATE %s SET %s = %%s, %s = %%s WHERE %s = %%s" 

179 % ( 

180 table, 

181 quote_name("value"), 

182 quote_name("expires"), 

183 quote_name("cache_key"), 

184 ), 

185 [b64encoded, exp, key], 

186 ) 

187 elif mode != "touch": 

188 cursor.execute( 

189 "INSERT INTO %s (%s, %s, %s) VALUES (%%s, %%s, %%s)" 

190 % ( 

191 table, 

192 quote_name("cache_key"), 

193 quote_name("value"), 

194 quote_name("expires"), 

195 ), 

196 [key, b64encoded, exp], 

197 ) 

198 else: 

199 return False # touch failed. 

200 except DatabaseError: 

201 # To be threadsafe, updates/inserts are allowed to fail silently 

202 return False 

203 else: 

204 return True 

205 

206 def delete(self, key, version=None): 

207 key = self.make_and_validate_key(key, version=version) 

208 return self._base_delete_many([key]) 

209 

210 def delete_many(self, keys, version=None): 

211 keys = [self.make_and_validate_key(key, version=version) for key in keys] 

212 self._base_delete_many(keys) 

213 

214 def _base_delete_many(self, keys): 

215 if not keys: 

216 return False 

217 

218 db = router.db_for_write(self.cache_model_class) 

219 connection = connections[db] 

220 quote_name = connection.ops.quote_name 

221 table = quote_name(self._table) 

222 

223 with connection.cursor() as cursor: 

224 cursor.execute( 

225 "DELETE FROM %s WHERE %s IN (%s)" 

226 % ( 

227 table, 

228 quote_name("cache_key"), 

229 ", ".join(["%s"] * len(keys)), 

230 ), 

231 keys, 

232 ) 

233 return bool(cursor.rowcount) 

234 

235 def has_key(self, key, version=None): 

236 key = self.make_and_validate_key(key, version=version) 

237 

238 db = router.db_for_read(self.cache_model_class) 

239 connection = connections[db] 

240 quote_name = connection.ops.quote_name 

241 

242 now = timezone.now().replace(microsecond=0, tzinfo=None) 

243 

244 with connection.cursor() as cursor: 

245 cursor.execute( 

246 "SELECT %s FROM %s WHERE %s = %%s and expires > %%s" 

247 % ( 

248 quote_name("cache_key"), 

249 quote_name(self._table), 

250 quote_name("cache_key"), 

251 ), 

252 [key, connection.ops.adapt_datetimefield_value(now)], 

253 ) 

254 return cursor.fetchone() is not None 

255 

256 def _cull(self, db, cursor, now, num): 

257 if self._cull_frequency == 0: 

258 self.clear() 

259 else: 

260 connection = connections[db] 

261 table = connection.ops.quote_name(self._table) 

262 cursor.execute( 

263 "DELETE FROM %s WHERE expires < %%s" % table, 

264 [connection.ops.adapt_datetimefield_value(now)], 

265 ) 

266 deleted_count = cursor.rowcount 

267 remaining_num = num - deleted_count 

268 if remaining_num > self._max_entries: 

269 cull_num = remaining_num // self._cull_frequency 

270 cursor.execute( 

271 connection.ops.cache_key_culling_sql() % table, [cull_num] 

272 ) 

273 last_cache_key = cursor.fetchone() 

274 if last_cache_key: 

275 cursor.execute( 

276 "DELETE FROM %s WHERE cache_key < %%s" % table, 

277 [last_cache_key[0]], 

278 ) 

279 

280 def clear(self): 

281 db = router.db_for_write(self.cache_model_class) 

282 connection = connections[db] 

283 table = connection.ops.quote_name(self._table) 

284 with connection.cursor() as cursor: 

285 cursor.execute("DELETE FROM %s" % table)