Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/core/cache/backends/filebased.py: 22%

112 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1"File-based cache backend" 

2import glob 

3import hashlib 

4import os 

5import pickle 

6import random 

7import tempfile 

8import time 

9import zlib 

10 

11from django.core.cache.backends.base import DEFAULT_TIMEOUT, BaseCache 

12from django.core.files import locks 

13from django.core.files.move import file_move_safe 

14 

15 

16class FileBasedCache(BaseCache): 

17 cache_suffix = ".djcache" 

18 pickle_protocol = pickle.HIGHEST_PROTOCOL 

19 

20 def __init__(self, dir, params): 

21 super().__init__(params) 

22 self._dir = os.path.abspath(dir) 

23 self._createdir() 

24 

25 def add(self, key, value, timeout=DEFAULT_TIMEOUT, version=None): 

26 if self.has_key(key, version): 

27 return False 

28 self.set(key, value, timeout, version) 

29 return True 

30 

31 def get(self, key, default=None, version=None): 

32 fname = self._key_to_file(key, version) 

33 try: 

34 with open(fname, "rb") as f: 

35 if not self._is_expired(f): 

36 return pickle.loads(zlib.decompress(f.read())) 

37 except FileNotFoundError: 

38 pass 

39 return default 

40 

41 def _write_content(self, file, timeout, value): 

42 expiry = self.get_backend_timeout(timeout) 

43 file.write(pickle.dumps(expiry, self.pickle_protocol)) 

44 file.write(zlib.compress(pickle.dumps(value, self.pickle_protocol))) 

45 

46 def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None): 

47 self._createdir() # Cache dir can be deleted at any time. 

48 fname = self._key_to_file(key, version) 

49 self._cull() # make some room if necessary 

50 fd, tmp_path = tempfile.mkstemp(dir=self._dir) 

51 renamed = False 

52 try: 

53 with open(fd, "wb") as f: 

54 self._write_content(f, timeout, value) 

55 file_move_safe(tmp_path, fname, allow_overwrite=True) 

56 renamed = True 

57 finally: 

58 if not renamed: 

59 os.remove(tmp_path) 

60 

61 def touch(self, key, timeout=DEFAULT_TIMEOUT, version=None): 

62 try: 

63 with open(self._key_to_file(key, version), "r+b") as f: 

64 try: 

65 locks.lock(f, locks.LOCK_EX) 

66 if self._is_expired(f): 

67 return False 

68 else: 

69 previous_value = pickle.loads(zlib.decompress(f.read())) 

70 f.seek(0) 

71 self._write_content(f, timeout, previous_value) 

72 return True 

73 finally: 

74 locks.unlock(f) 

75 except FileNotFoundError: 

76 return False 

77 

78 def delete(self, key, version=None): 

79 return self._delete(self._key_to_file(key, version)) 

80 

81 def _delete(self, fname): 

82 if not fname.startswith(self._dir) or not os.path.exists(fname): 

83 return False 

84 try: 

85 os.remove(fname) 

86 except FileNotFoundError: 

87 # The file may have been removed by another process. 

88 return False 

89 return True 

90 

91 def has_key(self, key, version=None): 

92 fname = self._key_to_file(key, version) 

93 if os.path.exists(fname): 

94 with open(fname, "rb") as f: 

95 return not self._is_expired(f) 

96 return False 

97 

98 def _cull(self): 

99 """ 

100 Remove random cache entries if max_entries is reached at a ratio 

101 of num_entries / cull_frequency. A value of 0 for CULL_FREQUENCY means 

102 that the entire cache will be purged. 

103 """ 

104 filelist = self._list_cache_files() 

105 num_entries = len(filelist) 

106 if num_entries < self._max_entries: 

107 return # return early if no culling is required 

108 if self._cull_frequency == 0: 

109 return self.clear() # Clear the cache when CULL_FREQUENCY = 0 

110 # Delete a random selection of entries 

111 filelist = random.sample(filelist, int(num_entries / self._cull_frequency)) 

112 for fname in filelist: 

113 self._delete(fname) 

114 

115 def _createdir(self): 

116 # Set the umask because os.makedirs() doesn't apply the "mode" argument 

117 # to intermediate-level directories. 

118 old_umask = os.umask(0o077) 

119 try: 

120 os.makedirs(self._dir, 0o700, exist_ok=True) 

121 finally: 

122 os.umask(old_umask) 

123 

124 def _key_to_file(self, key, version=None): 

125 """ 

126 Convert a key into a cache file path. Basically this is the 

127 root cache path joined with the md5sum of the key and a suffix. 

128 """ 

129 key = self.make_and_validate_key(key, version=version) 

130 return os.path.join( 

131 self._dir, 

132 "".join([hashlib.md5(key.encode()).hexdigest(), self.cache_suffix]), 

133 ) 

134 

135 def clear(self): 

136 """ 

137 Remove all the cache files. 

138 """ 

139 for fname in self._list_cache_files(): 

140 self._delete(fname) 

141 

142 def _is_expired(self, f): 

143 """ 

144 Take an open cache file `f` and delete it if it's expired. 

145 """ 

146 try: 

147 exp = pickle.load(f) 

148 except EOFError: 

149 exp = 0 # An empty file is considered expired. 

150 if exp is not None and exp < time.time(): 

151 f.close() # On Windows a file has to be closed before deleting 

152 self._delete(f.name) 

153 return True 

154 return False 

155 

156 def _list_cache_files(self): 

157 """ 

158 Get a list of paths to all the cache files. These are all the files 

159 in the root cache dir that end on the cache_suffix. 

160 """ 

161 return [ 

162 os.path.join(self._dir, fname) 

163 for fname in glob.glob1(self._dir, "*%s" % self.cache_suffix) 

164 ]