Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/coreapi/codecs/download.py: 19%
85 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# coding: utf-8
2from coreapi.codecs.base import BaseCodec
3from coreapi.compat import urlparse
4from coreapi.utils import DownloadedFile, guess_extension
5import cgi
6import os
7import posixpath
8import tempfile
11def _unique_output_path(path):
12 """
13 Given a path like '/a/b/c.txt'
15 Return the first available filename that doesn't already exist,
16 using an incrementing suffix if needed.
18 For example: '/a/b/c.txt' or '/a/b/c (1).txt' or '/a/b/c (2).txt'...
19 """
20 basename, ext = os.path.splitext(path)
21 idx = 0
22 while os.path.exists(path):
23 idx += 1
24 path = "%s (%d)%s" % (basename, idx, ext)
25 return path
28def _safe_filename(filename):
29 """
30 Sanitize output filenames, to remove any potentially unsafe characters.
31 """
32 filename = os.path.basename(filename)
34 keepcharacters = (' ', '.', '_', '-')
35 filename = ''.join(
36 char for char in filename
37 if char.isalnum() or char in keepcharacters
38 ).strip().strip('.')
40 return filename
43def _get_filename_from_content_disposition(content_disposition):
44 """
45 Determine an output filename based on the `Content-Disposition` header.
46 """
47 params = value, params = cgi.parse_header(content_disposition)
49 if 'filename*' in params:
50 try:
51 charset, lang, filename = params['filename*'].split('\'', 2)
52 filename = urlparse.unquote(filename)
53 filename = filename.encode('iso-8859-1').decode(charset)
54 return _safe_filename(filename)
55 except (ValueError, LookupError):
56 pass
58 if 'filename' in params:
59 filename = params['filename']
60 return _safe_filename(filename)
62 return None
65def _get_filename_from_url(url, content_type=None):
66 """
67 Determine an output filename based on the download URL.
68 """
69 parsed = urlparse.urlparse(url)
70 final_path_component = posixpath.basename(parsed.path.rstrip('/'))
71 filename = _safe_filename(final_path_component)
72 suffix = guess_extension(content_type or '')
74 if filename:
75 if '.' not in filename:
76 return filename + suffix
77 return filename
78 elif suffix:
79 return 'download' + suffix
81 return None
84def _get_filename(base_url=None, content_type=None, content_disposition=None):
85 """
86 Determine an output filename to use for the download.
87 """
88 filename = None
89 if content_disposition:
90 filename = _get_filename_from_content_disposition(content_disposition)
91 if base_url and not filename:
92 filename = _get_filename_from_url(base_url, content_type)
93 if not filename:
94 return None # Ensure empty filenames return as `None` for consistency.
95 return filename
98class DownloadCodec(BaseCodec):
99 """
100 A codec to handle raw file downloads, such as images and other media.
101 """
102 media_type = '*/*'
103 format = 'download'
105 def __init__(self, download_dir=None):
106 """
107 `download_dir` - The path to use for file downloads.
108 """
109 self._delete_on_close = download_dir is None
110 self._download_dir = download_dir
112 @property
113 def download_dir(self):
114 return self._download_dir
116 def decode(self, bytestring, **options):
117 base_url = options.get('base_url')
118 content_type = options.get('content_type')
119 content_disposition = options.get('content_disposition')
121 # Write the download to a temporary .download file.
122 fd, temp_path = tempfile.mkstemp(suffix='.download')
123 file_handle = os.fdopen(fd, 'wb')
124 file_handle.write(bytestring)
125 file_handle.close()
127 # Determine the output filename.
128 output_filename = _get_filename(base_url, content_type, content_disposition)
129 if output_filename is None:
130 output_filename = os.path.basename(temp_path)
132 # Determine the output directory.
133 output_dir = self._download_dir
134 if output_dir is None:
135 output_dir = os.path.dirname(temp_path)
137 # Determine the full output path.
138 output_path = os.path.join(output_dir, output_filename)
140 # Move the temporary download file to the final location.
141 if output_path != temp_path:
142 output_path = _unique_output_path(output_path)
143 os.rename(temp_path, output_path)
145 # Open the file and return the file object.
146 output_file = open(output_path, 'rb')
147 downloaded = DownloadedFile(output_file, output_path, delete=self._delete_on_close)
148 downloaded.basename = output_filename
149 return downloaded