Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/coreapi/utils.py: 13%
137 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from coreapi import exceptions
2from coreapi.compat import string_types, text_type, urlparse, _TemporaryFileWrapper
3from collections import namedtuple
4import os
5import pkg_resources
6import tempfile
9def domain_matches(request, domain):
10 """
11 Domain string matching against an outgoing request.
12 Patterns starting with '*' indicate a wildcard domain.
13 """
14 if (domain is None) or (domain == '*'):
15 return True
17 host = urlparse.urlparse(request.url).hostname
18 if domain.startswith('*'):
19 return host.endswith(domain[1:])
20 return host == domain
23def get_installed_codecs():
24 packages = [
25 (package, package.load()) for package in
26 pkg_resources.iter_entry_points(group='coreapi.codecs')
27 ]
28 return {
29 package.name: cls() for (package, cls) in packages
30 }
33# File utilities for upload and download support.
35File = namedtuple('File', 'name content content_type')
36File.__new__.__defaults__ = (None,)
39def is_file(obj):
40 if isinstance(obj, File):
41 return True
43 if hasattr(obj, '__iter__') and not isinstance(obj, (string_types, list, tuple, dict)):
44 # A stream object.
45 return True
47 return False
50def guess_filename(obj):
51 name = getattr(obj, 'name', None)
52 if name and isinstance(name, string_types) and name[0] != '<' and name[-1] != '>':
53 return os.path.basename(name)
54 return None
57def guess_extension(content_type):
58 """
59 Python's `mimetypes.guess_extension` is no use because it simply returns
60 the first of an unordered set. We use the same set of media types here,
61 but take a reasonable preference on what extension to map to.
62 """
63 return {
64 'application/javascript': '.js',
65 'application/msword': '.doc',
66 'application/octet-stream': '.bin',
67 'application/oda': '.oda',
68 'application/pdf': '.pdf',
69 'application/pkcs7-mime': '.p7c',
70 'application/postscript': '.ps',
71 'application/vnd.apple.mpegurl': '.m3u',
72 'application/vnd.ms-excel': '.xls',
73 'application/vnd.ms-powerpoint': '.ppt',
74 'application/x-bcpio': '.bcpio',
75 'application/x-cpio': '.cpio',
76 'application/x-csh': '.csh',
77 'application/x-dvi': '.dvi',
78 'application/x-gtar': '.gtar',
79 'application/x-hdf': '.hdf',
80 'application/x-latex': '.latex',
81 'application/x-mif': '.mif',
82 'application/x-netcdf': '.nc',
83 'application/x-pkcs12': '.p12',
84 'application/x-pn-realaudio': '.ram',
85 'application/x-python-code': '.pyc',
86 'application/x-sh': '.sh',
87 'application/x-shar': '.shar',
88 'application/x-shockwave-flash': '.swf',
89 'application/x-sv4cpio': '.sv4cpio',
90 'application/x-sv4crc': '.sv4crc',
91 'application/x-tar': '.tar',
92 'application/x-tcl': '.tcl',
93 'application/x-tex': '.tex',
94 'application/x-texinfo': '.texinfo',
95 'application/x-troff': '.tr',
96 'application/x-troff-man': '.man',
97 'application/x-troff-me': '.me',
98 'application/x-troff-ms': '.ms',
99 'application/x-ustar': '.ustar',
100 'application/x-wais-source': '.src',
101 'application/xml': '.xml',
102 'application/zip': '.zip',
103 'audio/basic': '.au',
104 'audio/mpeg': '.mp3',
105 'audio/x-aiff': '.aif',
106 'audio/x-pn-realaudio': '.ra',
107 'audio/x-wav': '.wav',
108 'image/gif': '.gif',
109 'image/ief': '.ief',
110 'image/jpeg': '.jpe',
111 'image/png': '.png',
112 'image/svg+xml': '.svg',
113 'image/tiff': '.tiff',
114 'image/vnd.microsoft.icon': '.ico',
115 'image/x-cmu-raster': '.ras',
116 'image/x-ms-bmp': '.bmp',
117 'image/x-portable-anymap': '.pnm',
118 'image/x-portable-bitmap': '.pbm',
119 'image/x-portable-graymap': '.pgm',
120 'image/x-portable-pixmap': '.ppm',
121 'image/x-rgb': '.rgb',
122 'image/x-xbitmap': '.xbm',
123 'image/x-xpixmap': '.xpm',
124 'image/x-xwindowdump': '.xwd',
125 'message/rfc822': '.eml',
126 'text/css': '.css',
127 'text/csv': '.csv',
128 'text/html': '.html',
129 'text/plain': '.txt',
130 'text/richtext': '.rtx',
131 'text/tab-separated-values': '.tsv',
132 'text/x-python': '.py',
133 'text/x-setext': '.etx',
134 'text/x-sgml': '.sgml',
135 'text/x-vcard': '.vcf',
136 'text/xml': '.xml',
137 'video/mp4': '.mp4',
138 'video/mpeg': '.mpeg',
139 'video/quicktime': '.mov',
140 'video/webm': '.webm',
141 'video/x-msvideo': '.avi',
142 'video/x-sgi-movie': '.movie'
143 }.get(content_type, '')
146if _TemporaryFileWrapper: 146 ↛ 162line 146 didn't jump to line 162, because the condition on line 146 was never false
147 # Ideally we subclass this so that we can present a custom representation.
148 class DownloadedFile(_TemporaryFileWrapper):
149 basename = None
151 def __repr__(self):
152 state = "closed" if self.closed else "open"
153 mode = "" if self.closed else " '%s'" % self.file.mode
154 return "<DownloadedFile '%s', %s%s>" % (self.name, state, mode)
156 def __str__(self):
157 return self.__repr__()
158else:
159 # On some platforms (eg GAE) the private _TemporaryFileWrapper may not be
160 # available, just use the standard `NamedTemporaryFile` function
161 # in this case.
162 DownloadedFile = tempfile.NamedTemporaryFile
165# Negotiation utilities. USed to determine which codec or transport class
166# should be used, given a list of supported instances.
168def determine_transport(transports, url):
169 """
170 Given a URL determine the appropriate transport instance.
171 """
172 url_components = urlparse.urlparse(url)
173 scheme = url_components.scheme.lower()
174 netloc = url_components.netloc
176 if not scheme:
177 raise exceptions.NetworkError("URL missing scheme '%s'." % url)
179 if not netloc:
180 raise exceptions.NetworkError("URL missing hostname '%s'." % url)
182 for transport in transports:
183 if scheme in transport.schemes:
184 return transport
186 raise exceptions.NetworkError("Unsupported URL scheme '%s'." % scheme)
189def negotiate_decoder(decoders, content_type=None):
190 """
191 Given the value of a 'Content-Type' header, return the appropriate
192 codec for decoding the request content.
193 """
194 if content_type is None:
195 return decoders[0]
197 content_type = content_type.split(';')[0].strip().lower()
198 main_type = content_type.split('/')[0] + '/*'
199 wildcard_type = '*/*'
201 for codec in decoders:
202 for media_type in codec.get_media_types():
203 if media_type in (content_type, main_type, wildcard_type):
204 return codec
206 msg = "Unsupported media in Content-Type header '%s'" % content_type
207 raise exceptions.NoCodecAvailable(msg)
210def negotiate_encoder(encoders, accept=None):
211 """
212 Given the value of a 'Accept' header, return the appropriate codec for
213 encoding the response content.
214 """
215 if accept is None:
216 return encoders[0]
218 acceptable = set([
219 item.split(';')[0].strip().lower()
220 for item in accept.split(',')
221 ])
223 for codec in encoders:
224 for media_type in codec.get_media_types():
225 if media_type in acceptable:
226 return codec
228 for codec in encoders:
229 for media_type in codec.get_media_types():
230 if codec.media_type.split('/')[0] + '/*' in acceptable:
231 return codec
233 if '*/*' in acceptable:
234 return encoders[0]
236 msg = "Unsupported media in Accept header '%s'" % accept
237 raise exceptions.NoCodecAvailable(msg)
240# Validation utilities. Used to ensure that we get consistent validation
241# exceptions when invalid types are passed as a parameter, rather than
242# an exception occuring when the request is made.
244def validate_path_param(value):
245 value = _validate_form_field(value, allow_list=False)
246 if not value:
247 msg = 'Parameter %s: May not be empty.'
248 raise exceptions.ParameterError(msg)
249 return value
252def validate_query_param(value):
253 return _validate_form_field(value)
256def validate_body_param(value, encoding):
257 if encoding == 'application/json':
258 return _validate_json_data(value)
259 elif encoding == 'multipart/form-data':
260 return _validate_form_object(value, allow_files=True)
261 elif encoding == 'application/x-www-form-urlencoded':
262 return _validate_form_object(value)
263 elif encoding == 'application/octet-stream':
264 if not is_file(value):
265 msg = 'Must be an file upload.'
266 raise exceptions.ParameterError(msg)
267 return value
268 msg = 'Unsupported encoding "%s" for outgoing request.'
269 raise exceptions.NetworkError(msg % encoding)
272def validate_form_param(value, encoding):
273 if encoding == 'application/json':
274 return _validate_json_data(value)
275 elif encoding == 'multipart/form-data':
276 return _validate_form_field(value, allow_files=True)
277 elif encoding == 'application/x-www-form-urlencoded':
278 return _validate_form_field(value)
279 msg = 'Unsupported encoding "%s" for outgoing request.'
280 raise exceptions.NetworkError(msg % encoding)
283def _validate_form_object(value, allow_files=False):
284 """
285 Ensure that `value` can be encoded as form data or as query parameters.
286 """
287 if not isinstance(value, dict):
288 msg = 'Must be an object.'
289 raise exceptions.ParameterError(msg)
290 return {
291 text_type(item_key): _validate_form_field(item_val, allow_files=allow_files)
292 for item_key, item_val in value.items()
293 }
296def _validate_form_field(value, allow_files=False, allow_list=True):
297 """
298 Ensure that `value` can be encoded as a single form data or a query parameter.
299 Basic types that has a simple string representation are supported.
300 A list of basic types is also valid.
301 """
302 if isinstance(value, string_types):
303 return value
304 elif isinstance(value, bool) or (value is None):
305 return {True: 'true', False: 'false', None: ''}[value]
306 elif isinstance(value, (int, float)):
307 return "%s" % value
308 elif allow_list and isinstance(value, (list, tuple)) and not is_file(value):
309 # Only the top-level element may be a list.
310 return [
311 _validate_form_field(item, allow_files=False, allow_list=False)
312 for item in value
313 ]
314 elif allow_files and is_file(value):
315 return value
317 msg = 'Must be a primitive type.'
318 raise exceptions.ParameterError(msg)
321def _validate_json_data(value):
322 """
323 Ensure that `value` can be encoded into JSON.
324 """
325 if (value is None) or isinstance(value, (bool, int, float, string_types)):
326 return value
327 elif isinstance(value, (list, tuple)) and not is_file(value):
328 return [_validate_json_data(item) for item in value]
329 elif isinstance(value, dict):
330 return {
331 text_type(item_key): _validate_json_data(item_val)
332 for item_key, item_val in value.items()
333 }
335 msg = 'Must be a JSON primitive.'
336 raise exceptions.ParameterError(msg)