Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/core/files/storage.py: 62%

200 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import os 

2import pathlib 

3from datetime import datetime 

4from urllib.parse import urljoin 

5 

6from django.conf import settings 

7from django.core.exceptions import SuspiciousFileOperation 

8from django.core.files import File, locks 

9from django.core.files.move import file_move_safe 

10from django.core.files.utils import validate_file_name 

11from django.core.signals import setting_changed 

12from django.utils import timezone 

13from django.utils._os import safe_join 

14from django.utils.crypto import get_random_string 

15from django.utils.deconstruct import deconstructible 

16from django.utils.encoding import filepath_to_uri 

17from django.utils.functional import LazyObject, cached_property 

18from django.utils.module_loading import import_string 

19from django.utils.text import get_valid_filename 

20 

21__all__ = ( 

22 "Storage", 

23 "FileSystemStorage", 

24 "DefaultStorage", 

25 "default_storage", 

26 "get_storage_class", 

27) 

28 

29 

30class Storage: 

31 """ 

32 A base storage class, providing some default behaviors that all other 

33 storage systems can inherit or override, as necessary. 

34 """ 

35 

36 # The following methods represent a public interface to private methods. 

37 # These shouldn't be overridden by subclasses unless absolutely necessary. 

38 

39 def open(self, name, mode="rb"): 

40 """Retrieve the specified file from storage.""" 

41 return self._open(name, mode) 

42 

43 def save(self, name, content, max_length=None): 

44 """ 

45 Save new content to the file specified by name. The content should be 

46 a proper File object or any Python file-like object, ready to be read 

47 from the beginning. 

48 """ 

49 # Get the proper name for the file, as it will actually be saved. 

50 if name is None: 50 ↛ 51line 50 didn't jump to line 51, because the condition on line 50 was never true

51 name = content.name 

52 

53 if not hasattr(content, "chunks"): 53 ↛ 54line 53 didn't jump to line 54, because the condition on line 53 was never true

54 content = File(content, name) 

55 

56 name = self.get_available_name(name, max_length=max_length) 

57 name = self._save(name, content) 

58 # Ensure that the name returned from the storage system is still valid. 

59 validate_file_name(name, allow_relative_path=True) 

60 return name 

61 

62 # These methods are part of the public API, with default implementations. 

63 

64 def get_valid_name(self, name): 

65 """ 

66 Return a filename, based on the provided filename, that's suitable for 

67 use in the target storage system. 

68 """ 

69 return get_valid_filename(name) 

70 

71 def get_alternative_name(self, file_root, file_ext): 

72 """ 

73 Return an alternative filename, by adding an underscore and a random 7 

74 character alphanumeric string (before the file extension, if one 

75 exists) to the filename. 

76 """ 

77 return "%s_%s%s" % (file_root, get_random_string(7), file_ext) 

78 

79 def get_available_name(self, name, max_length=None): 

80 """ 

81 Return a filename that's free on the target storage system and 

82 available for new content to be written to. 

83 """ 

84 name = str(name).replace("\\", "/") 

85 dir_name, file_name = os.path.split(name) 

86 if ".." in pathlib.PurePath(dir_name).parts: 86 ↛ 87line 86 didn't jump to line 87, because the condition on line 86 was never true

87 raise SuspiciousFileOperation( 

88 "Detected path traversal attempt in '%s'" % dir_name 

89 ) 

90 validate_file_name(file_name) 

91 file_root, file_ext = os.path.splitext(file_name) 

92 # If the filename already exists, generate an alternative filename 

93 # until it doesn't exist. 

94 # Truncate original name if required, so the new filename does not 

95 # exceed the max_length. 

96 while self.exists(name) or (max_length and len(name) > max_length): 

97 # file_ext includes the dot. 

98 name = os.path.join( 

99 dir_name, self.get_alternative_name(file_root, file_ext) 

100 ) 

101 if max_length is None: 101 ↛ 102line 101 didn't jump to line 102, because the condition on line 101 was never true

102 continue 

103 # Truncate file_root if max_length exceeded. 

104 truncation = len(name) - max_length 

105 if truncation > 0: 105 ↛ 106line 105 didn't jump to line 106, because the condition on line 105 was never true

106 file_root = file_root[:-truncation] 

107 # Entire file_root was truncated in attempt to find an 

108 # available filename. 

109 if not file_root: 

110 raise SuspiciousFileOperation( 

111 'Storage can not find an available filename for "%s". ' 

112 "Please make sure that the corresponding file field " 

113 'allows sufficient "max_length".' % name 

114 ) 

115 name = os.path.join( 

116 dir_name, self.get_alternative_name(file_root, file_ext) 

117 ) 

118 return name 

119 

120 def generate_filename(self, filename): 

121 """ 

122 Validate the filename by calling get_valid_name() and return a filename 

123 to be passed to the save() method. 

124 """ 

125 filename = str(filename).replace("\\", "/") 

126 # `filename` may include a path as returned by FileField.upload_to. 

127 dirname, filename = os.path.split(filename) 

128 if ".." in pathlib.PurePath(dirname).parts: 128 ↛ 129line 128 didn't jump to line 129, because the condition on line 128 was never true

129 raise SuspiciousFileOperation( 

130 "Detected path traversal attempt in '%s'" % dirname 

131 ) 

132 return os.path.normpath(os.path.join(dirname, self.get_valid_name(filename))) 

133 

134 def path(self, name): 

135 """ 

136 Return a local filesystem path where the file can be retrieved using 

137 Python's built-in open() function. Storage systems that can't be 

138 accessed using open() should *not* implement this method. 

139 """ 

140 raise NotImplementedError("This backend doesn't support absolute paths.") 

141 

142 # The following methods form the public API for storage systems, but with 

143 # no default implementations. Subclasses must implement *all* of these. 

144 

145 def delete(self, name): 

146 """ 

147 Delete the specified file from the storage system. 

148 """ 

149 raise NotImplementedError( 

150 "subclasses of Storage must provide a delete() method" 

151 ) 

152 

153 def exists(self, name): 

154 """ 

155 Return True if a file referenced by the given name already exists in the 

156 storage system, or False if the name is available for a new file. 

157 """ 

158 raise NotImplementedError( 

159 "subclasses of Storage must provide an exists() method" 

160 ) 

161 

162 def listdir(self, path): 

163 """ 

164 List the contents of the specified path. Return a 2-tuple of lists: 

165 the first item being directories, the second item being files. 

166 """ 

167 raise NotImplementedError( 

168 "subclasses of Storage must provide a listdir() method" 

169 ) 

170 

171 def size(self, name): 

172 """ 

173 Return the total size, in bytes, of the file specified by name. 

174 """ 

175 raise NotImplementedError("subclasses of Storage must provide a size() method") 

176 

177 def url(self, name): 

178 """ 

179 Return an absolute URL where the file's contents can be accessed 

180 directly by a web browser. 

181 """ 

182 raise NotImplementedError("subclasses of Storage must provide a url() method") 

183 

184 def get_accessed_time(self, name): 

185 """ 

186 Return the last accessed time (as a datetime) of the file specified by 

187 name. The datetime will be timezone-aware if USE_TZ=True. 

188 """ 

189 raise NotImplementedError( 

190 "subclasses of Storage must provide a get_accessed_time() method" 

191 ) 

192 

193 def get_created_time(self, name): 

194 """ 

195 Return the creation time (as a datetime) of the file specified by name. 

196 The datetime will be timezone-aware if USE_TZ=True. 

197 """ 

198 raise NotImplementedError( 

199 "subclasses of Storage must provide a get_created_time() method" 

200 ) 

201 

202 def get_modified_time(self, name): 

203 """ 

204 Return the last modified time (as a datetime) of the file specified by 

205 name. The datetime will be timezone-aware if USE_TZ=True. 

206 """ 

207 raise NotImplementedError( 

208 "subclasses of Storage must provide a get_modified_time() method" 

209 ) 

210 

211 

212@deconstructible 

213class FileSystemStorage(Storage): 

214 """ 

215 Standard filesystem storage 

216 """ 

217 

218 # The combination of O_CREAT and O_EXCL makes os.open() raise OSError if 

219 # the file already exists before it's opened. 

220 OS_OPEN_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0) 

221 

222 def __init__( 

223 self, 

224 location=None, 

225 base_url=None, 

226 file_permissions_mode=None, 

227 directory_permissions_mode=None, 

228 ): 

229 self._location = location 

230 self._base_url = base_url 

231 self._file_permissions_mode = file_permissions_mode 

232 self._directory_permissions_mode = directory_permissions_mode 

233 setting_changed.connect(self._clear_cached_properties) 

234 

235 def _clear_cached_properties(self, setting, **kwargs): 

236 """Reset setting based property values.""" 

237 if setting == "MEDIA_ROOT": 

238 self.__dict__.pop("base_location", None) 

239 self.__dict__.pop("location", None) 

240 elif setting == "MEDIA_URL": 

241 self.__dict__.pop("base_url", None) 

242 elif setting == "FILE_UPLOAD_PERMISSIONS": 

243 self.__dict__.pop("file_permissions_mode", None) 

244 elif setting == "FILE_UPLOAD_DIRECTORY_PERMISSIONS": 

245 self.__dict__.pop("directory_permissions_mode", None) 

246 

247 def _value_or_setting(self, value, setting): 

248 return setting if value is None else value 

249 

250 @cached_property 

251 def base_location(self): 

252 return self._value_or_setting(self._location, settings.MEDIA_ROOT) 

253 

254 @cached_property 

255 def location(self): 

256 return os.path.abspath(self.base_location) 

257 

258 @cached_property 

259 def base_url(self): 

260 if self._base_url is not None and not self._base_url.endswith("/"): 260 ↛ 261line 260 didn't jump to line 261, because the condition on line 260 was never true

261 self._base_url += "/" 

262 return self._value_or_setting(self._base_url, settings.MEDIA_URL) 

263 

264 @cached_property 

265 def file_permissions_mode(self): 

266 return self._value_or_setting( 

267 self._file_permissions_mode, settings.FILE_UPLOAD_PERMISSIONS 

268 ) 

269 

270 @cached_property 

271 def directory_permissions_mode(self): 

272 return self._value_or_setting( 

273 self._directory_permissions_mode, settings.FILE_UPLOAD_DIRECTORY_PERMISSIONS 

274 ) 

275 

276 def _open(self, name, mode="rb"): 

277 return File(open(self.path(name), mode)) 

278 

279 def _save(self, name, content): 

280 full_path = self.path(name) 

281 

282 # Create any intermediate directories that do not exist. 

283 directory = os.path.dirname(full_path) 

284 try: 

285 if self.directory_permissions_mode is not None: 285 ↛ 288line 285 didn't jump to line 288, because the condition on line 285 was never true

286 # Set the umask because os.makedirs() doesn't apply the "mode" 

287 # argument to intermediate-level directories. 

288 old_umask = os.umask(0o777 & ~self.directory_permissions_mode) 

289 try: 

290 os.makedirs( 

291 directory, self.directory_permissions_mode, exist_ok=True 

292 ) 

293 finally: 

294 os.umask(old_umask) 

295 else: 

296 os.makedirs(directory, exist_ok=True) 

297 except FileExistsError: 

298 raise FileExistsError("%s exists and is not a directory." % directory) 

299 

300 # There's a potential race condition between get_available_name and 

301 # saving the file; it's possible that two threads might return the 

302 # same name, at which point all sorts of fun happens. So we need to 

303 # try to create the file, but if it already exists we have to go back 

304 # to get_available_name() and try again. 

305 

306 while True: 

307 try: 

308 # This file has a file path that we can move. 

309 if hasattr(content, "temporary_file_path"): 309 ↛ 310line 309 didn't jump to line 310, because the condition on line 309 was never true

310 file_move_safe(content.temporary_file_path(), full_path) 

311 

312 # This is a normal uploadedfile that we can stream. 

313 else: 

314 # The current umask value is masked out by os.open! 

315 fd = os.open(full_path, self.OS_OPEN_FLAGS, 0o666) 

316 _file = None 

317 try: 

318 locks.lock(fd, locks.LOCK_EX) 

319 for chunk in content.chunks(): 

320 if _file is None: 320 ↛ 323line 320 didn't jump to line 323, because the condition on line 320 was never false

321 mode = "wb" if isinstance(chunk, bytes) else "wt" 

322 _file = os.fdopen(fd, mode) 

323 _file.write(chunk) 

324 finally: 

325 locks.unlock(fd) 

326 if _file is not None: 326 ↛ 329line 326 didn't jump to line 329, because the condition on line 326 was never false

327 _file.close() 

328 else: 

329 os.close(fd) 

330 except FileExistsError: 

331 # A new name is needed if the file exists. 

332 name = self.get_available_name(name) 

333 full_path = self.path(name) 

334 else: 

335 # OK, the file save worked. Break out of the loop. 

336 break 

337 

338 if self.file_permissions_mode is not None: 338 ↛ 342line 338 didn't jump to line 342, because the condition on line 338 was never false

339 os.chmod(full_path, self.file_permissions_mode) 

340 

341 # Ensure the saved path is always relative to the storage root. 

342 name = os.path.relpath(full_path, self.location) 

343 # Store filenames with forward slashes, even on Windows. 

344 return str(name).replace("\\", "/") 

345 

346 def delete(self, name): 

347 if not name: 

348 raise ValueError("The name must be given to delete().") 

349 name = self.path(name) 

350 # If the file or directory exists, delete it from the filesystem. 

351 try: 

352 if os.path.isdir(name): 

353 os.rmdir(name) 

354 else: 

355 os.remove(name) 

356 except FileNotFoundError: 

357 # FileNotFoundError is raised if the file or directory was removed 

358 # concurrently. 

359 pass 

360 

361 def exists(self, name): 

362 return os.path.lexists(self.path(name)) 

363 

364 def listdir(self, path): 

365 path = self.path(path) 

366 directories, files = [], [] 

367 with os.scandir(path) as entries: 

368 for entry in entries: 

369 if entry.is_dir(): 

370 directories.append(entry.name) 

371 else: 

372 files.append(entry.name) 

373 return directories, files 

374 

375 def path(self, name): 

376 return safe_join(self.location, name) 

377 

378 def size(self, name): 

379 return os.path.getsize(self.path(name)) 

380 

381 def url(self, name): 

382 if self.base_url is None: 382 ↛ 383line 382 didn't jump to line 383, because the condition on line 382 was never true

383 raise ValueError("This file is not accessible via a URL.") 

384 url = filepath_to_uri(name) 

385 if url is not None: 385 ↛ 387line 385 didn't jump to line 387, because the condition on line 385 was never false

386 url = url.lstrip("/") 

387 return urljoin(self.base_url, url) 

388 

389 def _datetime_from_timestamp(self, ts): 

390 """ 

391 If timezone support is enabled, make an aware datetime object in UTC; 

392 otherwise make a naive one in the local timezone. 

393 """ 

394 tz = timezone.utc if settings.USE_TZ else None 

395 return datetime.fromtimestamp(ts, tz=tz) 

396 

397 def get_accessed_time(self, name): 

398 return self._datetime_from_timestamp(os.path.getatime(self.path(name))) 

399 

400 def get_created_time(self, name): 

401 return self._datetime_from_timestamp(os.path.getctime(self.path(name))) 

402 

403 def get_modified_time(self, name): 

404 return self._datetime_from_timestamp(os.path.getmtime(self.path(name))) 

405 

406 

407def get_storage_class(import_path=None): 

408 return import_string(import_path or settings.DEFAULT_FILE_STORAGE) 

409 

410 

411class DefaultStorage(LazyObject): 

412 def _setup(self): 

413 self._wrapped = get_storage_class()() 

414 

415 

416default_storage = DefaultStorage()