Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/core/files/storage.py: 62%
200 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import os
2import pathlib
3from datetime import datetime
4from urllib.parse import urljoin
6from django.conf import settings
7from django.core.exceptions import SuspiciousFileOperation
8from django.core.files import File, locks
9from django.core.files.move import file_move_safe
10from django.core.files.utils import validate_file_name
11from django.core.signals import setting_changed
12from django.utils import timezone
13from django.utils._os import safe_join
14from django.utils.crypto import get_random_string
15from django.utils.deconstruct import deconstructible
16from django.utils.encoding import filepath_to_uri
17from django.utils.functional import LazyObject, cached_property
18from django.utils.module_loading import import_string
19from django.utils.text import get_valid_filename
21__all__ = (
22 "Storage",
23 "FileSystemStorage",
24 "DefaultStorage",
25 "default_storage",
26 "get_storage_class",
27)
30class Storage:
31 """
32 A base storage class, providing some default behaviors that all other
33 storage systems can inherit or override, as necessary.
34 """
36 # The following methods represent a public interface to private methods.
37 # These shouldn't be overridden by subclasses unless absolutely necessary.
39 def open(self, name, mode="rb"):
40 """Retrieve the specified file from storage."""
41 return self._open(name, mode)
43 def save(self, name, content, max_length=None):
44 """
45 Save new content to the file specified by name. The content should be
46 a proper File object or any Python file-like object, ready to be read
47 from the beginning.
48 """
49 # Get the proper name for the file, as it will actually be saved.
50 if name is None: 50 ↛ 51line 50 didn't jump to line 51, because the condition on line 50 was never true
51 name = content.name
53 if not hasattr(content, "chunks"): 53 ↛ 54line 53 didn't jump to line 54, because the condition on line 53 was never true
54 content = File(content, name)
56 name = self.get_available_name(name, max_length=max_length)
57 name = self._save(name, content)
58 # Ensure that the name returned from the storage system is still valid.
59 validate_file_name(name, allow_relative_path=True)
60 return name
62 # These methods are part of the public API, with default implementations.
64 def get_valid_name(self, name):
65 """
66 Return a filename, based on the provided filename, that's suitable for
67 use in the target storage system.
68 """
69 return get_valid_filename(name)
71 def get_alternative_name(self, file_root, file_ext):
72 """
73 Return an alternative filename, by adding an underscore and a random 7
74 character alphanumeric string (before the file extension, if one
75 exists) to the filename.
76 """
77 return "%s_%s%s" % (file_root, get_random_string(7), file_ext)
79 def get_available_name(self, name, max_length=None):
80 """
81 Return a filename that's free on the target storage system and
82 available for new content to be written to.
83 """
84 name = str(name).replace("\\", "/")
85 dir_name, file_name = os.path.split(name)
86 if ".." in pathlib.PurePath(dir_name).parts: 86 ↛ 87line 86 didn't jump to line 87, because the condition on line 86 was never true
87 raise SuspiciousFileOperation(
88 "Detected path traversal attempt in '%s'" % dir_name
89 )
90 validate_file_name(file_name)
91 file_root, file_ext = os.path.splitext(file_name)
92 # If the filename already exists, generate an alternative filename
93 # until it doesn't exist.
94 # Truncate original name if required, so the new filename does not
95 # exceed the max_length.
96 while self.exists(name) or (max_length and len(name) > max_length):
97 # file_ext includes the dot.
98 name = os.path.join(
99 dir_name, self.get_alternative_name(file_root, file_ext)
100 )
101 if max_length is None: 101 ↛ 102line 101 didn't jump to line 102, because the condition on line 101 was never true
102 continue
103 # Truncate file_root if max_length exceeded.
104 truncation = len(name) - max_length
105 if truncation > 0: 105 ↛ 106line 105 didn't jump to line 106, because the condition on line 105 was never true
106 file_root = file_root[:-truncation]
107 # Entire file_root was truncated in attempt to find an
108 # available filename.
109 if not file_root:
110 raise SuspiciousFileOperation(
111 'Storage can not find an available filename for "%s". '
112 "Please make sure that the corresponding file field "
113 'allows sufficient "max_length".' % name
114 )
115 name = os.path.join(
116 dir_name, self.get_alternative_name(file_root, file_ext)
117 )
118 return name
120 def generate_filename(self, filename):
121 """
122 Validate the filename by calling get_valid_name() and return a filename
123 to be passed to the save() method.
124 """
125 filename = str(filename).replace("\\", "/")
126 # `filename` may include a path as returned by FileField.upload_to.
127 dirname, filename = os.path.split(filename)
128 if ".." in pathlib.PurePath(dirname).parts: 128 ↛ 129line 128 didn't jump to line 129, because the condition on line 128 was never true
129 raise SuspiciousFileOperation(
130 "Detected path traversal attempt in '%s'" % dirname
131 )
132 return os.path.normpath(os.path.join(dirname, self.get_valid_name(filename)))
134 def path(self, name):
135 """
136 Return a local filesystem path where the file can be retrieved using
137 Python's built-in open() function. Storage systems that can't be
138 accessed using open() should *not* implement this method.
139 """
140 raise NotImplementedError("This backend doesn't support absolute paths.")
142 # The following methods form the public API for storage systems, but with
143 # no default implementations. Subclasses must implement *all* of these.
145 def delete(self, name):
146 """
147 Delete the specified file from the storage system.
148 """
149 raise NotImplementedError(
150 "subclasses of Storage must provide a delete() method"
151 )
153 def exists(self, name):
154 """
155 Return True if a file referenced by the given name already exists in the
156 storage system, or False if the name is available for a new file.
157 """
158 raise NotImplementedError(
159 "subclasses of Storage must provide an exists() method"
160 )
162 def listdir(self, path):
163 """
164 List the contents of the specified path. Return a 2-tuple of lists:
165 the first item being directories, the second item being files.
166 """
167 raise NotImplementedError(
168 "subclasses of Storage must provide a listdir() method"
169 )
171 def size(self, name):
172 """
173 Return the total size, in bytes, of the file specified by name.
174 """
175 raise NotImplementedError("subclasses of Storage must provide a size() method")
177 def url(self, name):
178 """
179 Return an absolute URL where the file's contents can be accessed
180 directly by a web browser.
181 """
182 raise NotImplementedError("subclasses of Storage must provide a url() method")
184 def get_accessed_time(self, name):
185 """
186 Return the last accessed time (as a datetime) of the file specified by
187 name. The datetime will be timezone-aware if USE_TZ=True.
188 """
189 raise NotImplementedError(
190 "subclasses of Storage must provide a get_accessed_time() method"
191 )
193 def get_created_time(self, name):
194 """
195 Return the creation time (as a datetime) of the file specified by name.
196 The datetime will be timezone-aware if USE_TZ=True.
197 """
198 raise NotImplementedError(
199 "subclasses of Storage must provide a get_created_time() method"
200 )
202 def get_modified_time(self, name):
203 """
204 Return the last modified time (as a datetime) of the file specified by
205 name. The datetime will be timezone-aware if USE_TZ=True.
206 """
207 raise NotImplementedError(
208 "subclasses of Storage must provide a get_modified_time() method"
209 )
212@deconstructible
213class FileSystemStorage(Storage):
214 """
215 Standard filesystem storage
216 """
218 # The combination of O_CREAT and O_EXCL makes os.open() raise OSError if
219 # the file already exists before it's opened.
220 OS_OPEN_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0)
222 def __init__(
223 self,
224 location=None,
225 base_url=None,
226 file_permissions_mode=None,
227 directory_permissions_mode=None,
228 ):
229 self._location = location
230 self._base_url = base_url
231 self._file_permissions_mode = file_permissions_mode
232 self._directory_permissions_mode = directory_permissions_mode
233 setting_changed.connect(self._clear_cached_properties)
235 def _clear_cached_properties(self, setting, **kwargs):
236 """Reset setting based property values."""
237 if setting == "MEDIA_ROOT":
238 self.__dict__.pop("base_location", None)
239 self.__dict__.pop("location", None)
240 elif setting == "MEDIA_URL":
241 self.__dict__.pop("base_url", None)
242 elif setting == "FILE_UPLOAD_PERMISSIONS":
243 self.__dict__.pop("file_permissions_mode", None)
244 elif setting == "FILE_UPLOAD_DIRECTORY_PERMISSIONS":
245 self.__dict__.pop("directory_permissions_mode", None)
247 def _value_or_setting(self, value, setting):
248 return setting if value is None else value
250 @cached_property
251 def base_location(self):
252 return self._value_or_setting(self._location, settings.MEDIA_ROOT)
254 @cached_property
255 def location(self):
256 return os.path.abspath(self.base_location)
258 @cached_property
259 def base_url(self):
260 if self._base_url is not None and not self._base_url.endswith("/"): 260 ↛ 261line 260 didn't jump to line 261, because the condition on line 260 was never true
261 self._base_url += "/"
262 return self._value_or_setting(self._base_url, settings.MEDIA_URL)
264 @cached_property
265 def file_permissions_mode(self):
266 return self._value_or_setting(
267 self._file_permissions_mode, settings.FILE_UPLOAD_PERMISSIONS
268 )
270 @cached_property
271 def directory_permissions_mode(self):
272 return self._value_or_setting(
273 self._directory_permissions_mode, settings.FILE_UPLOAD_DIRECTORY_PERMISSIONS
274 )
276 def _open(self, name, mode="rb"):
277 return File(open(self.path(name), mode))
279 def _save(self, name, content):
280 full_path = self.path(name)
282 # Create any intermediate directories that do not exist.
283 directory = os.path.dirname(full_path)
284 try:
285 if self.directory_permissions_mode is not None: 285 ↛ 288line 285 didn't jump to line 288, because the condition on line 285 was never true
286 # Set the umask because os.makedirs() doesn't apply the "mode"
287 # argument to intermediate-level directories.
288 old_umask = os.umask(0o777 & ~self.directory_permissions_mode)
289 try:
290 os.makedirs(
291 directory, self.directory_permissions_mode, exist_ok=True
292 )
293 finally:
294 os.umask(old_umask)
295 else:
296 os.makedirs(directory, exist_ok=True)
297 except FileExistsError:
298 raise FileExistsError("%s exists and is not a directory." % directory)
300 # There's a potential race condition between get_available_name and
301 # saving the file; it's possible that two threads might return the
302 # same name, at which point all sorts of fun happens. So we need to
303 # try to create the file, but if it already exists we have to go back
304 # to get_available_name() and try again.
306 while True:
307 try:
308 # This file has a file path that we can move.
309 if hasattr(content, "temporary_file_path"): 309 ↛ 310line 309 didn't jump to line 310, because the condition on line 309 was never true
310 file_move_safe(content.temporary_file_path(), full_path)
312 # This is a normal uploadedfile that we can stream.
313 else:
314 # The current umask value is masked out by os.open!
315 fd = os.open(full_path, self.OS_OPEN_FLAGS, 0o666)
316 _file = None
317 try:
318 locks.lock(fd, locks.LOCK_EX)
319 for chunk in content.chunks():
320 if _file is None: 320 ↛ 323line 320 didn't jump to line 323, because the condition on line 320 was never false
321 mode = "wb" if isinstance(chunk, bytes) else "wt"
322 _file = os.fdopen(fd, mode)
323 _file.write(chunk)
324 finally:
325 locks.unlock(fd)
326 if _file is not None: 326 ↛ 329line 326 didn't jump to line 329, because the condition on line 326 was never false
327 _file.close()
328 else:
329 os.close(fd)
330 except FileExistsError:
331 # A new name is needed if the file exists.
332 name = self.get_available_name(name)
333 full_path = self.path(name)
334 else:
335 # OK, the file save worked. Break out of the loop.
336 break
338 if self.file_permissions_mode is not None: 338 ↛ 342line 338 didn't jump to line 342, because the condition on line 338 was never false
339 os.chmod(full_path, self.file_permissions_mode)
341 # Ensure the saved path is always relative to the storage root.
342 name = os.path.relpath(full_path, self.location)
343 # Store filenames with forward slashes, even on Windows.
344 return str(name).replace("\\", "/")
346 def delete(self, name):
347 if not name:
348 raise ValueError("The name must be given to delete().")
349 name = self.path(name)
350 # If the file or directory exists, delete it from the filesystem.
351 try:
352 if os.path.isdir(name):
353 os.rmdir(name)
354 else:
355 os.remove(name)
356 except FileNotFoundError:
357 # FileNotFoundError is raised if the file or directory was removed
358 # concurrently.
359 pass
361 def exists(self, name):
362 return os.path.lexists(self.path(name))
364 def listdir(self, path):
365 path = self.path(path)
366 directories, files = [], []
367 with os.scandir(path) as entries:
368 for entry in entries:
369 if entry.is_dir():
370 directories.append(entry.name)
371 else:
372 files.append(entry.name)
373 return directories, files
375 def path(self, name):
376 return safe_join(self.location, name)
378 def size(self, name):
379 return os.path.getsize(self.path(name))
381 def url(self, name):
382 if self.base_url is None: 382 ↛ 383line 382 didn't jump to line 383, because the condition on line 382 was never true
383 raise ValueError("This file is not accessible via a URL.")
384 url = filepath_to_uri(name)
385 if url is not None: 385 ↛ 387line 385 didn't jump to line 387, because the condition on line 385 was never false
386 url = url.lstrip("/")
387 return urljoin(self.base_url, url)
389 def _datetime_from_timestamp(self, ts):
390 """
391 If timezone support is enabled, make an aware datetime object in UTC;
392 otherwise make a naive one in the local timezone.
393 """
394 tz = timezone.utc if settings.USE_TZ else None
395 return datetime.fromtimestamp(ts, tz=tz)
397 def get_accessed_time(self, name):
398 return self._datetime_from_timestamp(os.path.getatime(self.path(name)))
400 def get_created_time(self, name):
401 return self._datetime_from_timestamp(os.path.getctime(self.path(name)))
403 def get_modified_time(self, name):
404 return self._datetime_from_timestamp(os.path.getmtime(self.path(name)))
407def get_storage_class(import_path=None):
408 return import_string(import_path or settings.DEFAULT_FILE_STORAGE)
411class DefaultStorage(LazyObject):
412 def _setup(self):
413 self._wrapped = get_storage_class()()
416default_storage = DefaultStorage()