Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/coreapi/transports/http.py: 17%
219 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# coding: utf-8
2from __future__ import unicode_literals
3from collections import OrderedDict
4from coreapi import exceptions, utils
5from coreapi.compat import cookiejar, urlparse
6from coreapi.document import Document, Object, Link, Array, Error
7from coreapi.transports.base import BaseTransport
8from coreapi.utils import guess_filename, is_file, File
9import collections
10import requests
11import itypes
12import mimetypes
13import uritemplate
14import warnings
17Params = collections.namedtuple('Params', ['path', 'query', 'data', 'files'])
18empty_params = Params({}, {}, {}, {})
21class ForceMultiPartDict(dict):
22 """
23 A dictionary that always evaluates as True.
24 Allows us to force requests to use multipart encoding, even when no
25 file parameters are passed.
26 """
27 def __bool__(self):
28 return True
30 def __nonzero__(self):
31 return True
34class BlockAll(cookiejar.CookiePolicy):
35 """
36 A cookie policy that rejects all cookies.
37 Used to override the default `requests` behavior.
38 """
39 return_ok = set_ok = domain_return_ok = path_return_ok = lambda self, *args, **kwargs: False 39 ↛ exitline 39 didn't run the lambda on line 39
40 netscape = True
41 rfc2965 = hide_cookie2 = False
44class DomainCredentials(requests.auth.AuthBase):
45 """
46 Custom auth class to support deprecated 'credentials' argument.
47 """
48 allow_cookies = False
49 credentials = None
51 def __init__(self, credentials=None):
52 self.credentials = credentials
54 def __call__(self, request):
55 if not self.credentials:
56 return request
58 # Include any authorization credentials relevant to this domain.
59 url_components = urlparse.urlparse(request.url)
60 host = url_components.hostname
61 if host in self.credentials:
62 request.headers['Authorization'] = self.credentials[host]
63 return request
66class CallbackAdapter(requests.adapters.HTTPAdapter):
67 """
68 Custom requests HTTP adapter, to support deprecated callback arguments.
69 """
70 def __init__(self, request_callback=None, response_callback=None):
71 self.request_callback = request_callback
72 self.response_callback = response_callback
74 def send(self, request, **kwargs):
75 if self.request_callback is not None:
76 self.request_callback(request)
77 response = super(CallbackAdapter, self).send(request, **kwargs)
78 if self.response_callback is not None:
79 self.response_callback(response)
80 return response
83def _get_method(action):
84 if not action:
85 return 'GET'
86 return action.upper()
89def _get_encoding(encoding):
90 if not encoding:
91 return 'application/json'
92 return encoding
95def _get_params(method, encoding, fields, params=None):
96 """
97 Separate the params into the various types.
98 """
99 if params is None:
100 return empty_params
102 field_map = {field.name: field for field in fields}
104 path = {}
105 query = {}
106 data = {}
107 files = {}
109 errors = {}
111 # Ensure graceful behavior in edge-case where both location='body' and
112 # location='form' fields are present.
113 seen_body = False
115 for key, value in params.items():
116 if key not in field_map or not field_map[key].location:
117 # Default is 'query' for 'GET' and 'DELETE', and 'form' for others.
118 location = 'query' if method in ('GET', 'DELETE') else 'form'
119 else:
120 location = field_map[key].location
122 if location == 'form' and encoding == 'application/octet-stream':
123 # Raw uploads should always use 'body', not 'form'.
124 location = 'body'
126 try:
127 if location == 'path':
128 path[key] = utils.validate_path_param(value)
129 elif location == 'query':
130 query[key] = utils.validate_query_param(value)
131 elif location == 'body':
132 data = utils.validate_body_param(value, encoding=encoding)
133 seen_body = True
134 elif location == 'form':
135 if not seen_body:
136 data[key] = utils.validate_form_param(value, encoding=encoding)
137 except exceptions.ParameterError as exc:
138 errors[key] = "%s" % exc
140 if errors:
141 raise exceptions.ParameterError(errors)
143 # Move any files from 'data' into 'files'.
144 if isinstance(data, dict):
145 for key, value in list(data.items()):
146 if is_file(data[key]):
147 files[key] = data.pop(key)
149 return Params(path, query, data, files)
152def _get_url(url, path_params):
153 """
154 Given a templated URL and some parameters that have been provided,
155 expand the URL.
156 """
157 if path_params:
158 return uritemplate.expand(url, path_params)
159 return url
162def _get_headers(url, decoders):
163 """
164 Return a dictionary of HTTP headers to use in the outgoing request.
165 """
166 accept_media_types = decoders[0].get_media_types()
167 if '*/*' not in accept_media_types:
168 accept_media_types.append('*/*')
170 headers = {
171 'accept': ', '.join(accept_media_types),
172 'user-agent': 'coreapi'
173 }
175 return headers
178def _get_upload_headers(file_obj):
179 """
180 When a raw file upload is made, determine the Content-Type and
181 Content-Disposition headers to use with the request.
182 """
183 name = guess_filename(file_obj)
184 content_type = None
185 content_disposition = None
187 # Determine the content type of the upload.
188 if getattr(file_obj, 'content_type', None):
189 content_type = file_obj.content_type
190 elif name:
191 content_type, encoding = mimetypes.guess_type(name)
193 # Determine the content disposition of the upload.
194 if name:
195 content_disposition = 'attachment; filename="%s"' % name
197 return {
198 'Content-Type': content_type or 'application/octet-stream',
199 'Content-Disposition': content_disposition or 'attachment'
200 }
203def _build_http_request(session, url, method, headers=None, encoding=None, params=empty_params):
204 """
205 Make an HTTP request and return an HTTP response.
206 """
207 opts = {
208 "headers": headers or {}
209 }
211 if params.query:
212 opts['params'] = params.query
214 if params.data or params.files:
215 if encoding == 'application/json':
216 opts['json'] = params.data
217 elif encoding == 'multipart/form-data':
218 opts['data'] = params.data
219 opts['files'] = ForceMultiPartDict(params.files)
220 elif encoding == 'application/x-www-form-urlencoded':
221 opts['data'] = params.data
222 elif encoding == 'application/octet-stream':
223 if isinstance(params.data, File):
224 opts['data'] = params.data.content
225 else:
226 opts['data'] = params.data
227 upload_headers = _get_upload_headers(params.data)
228 opts['headers'].update(upload_headers)
230 request = requests.Request(method, url, **opts)
231 return session.prepare_request(request)
234def _coerce_to_error_content(node):
235 """
236 Errors should not contain nested documents or links.
237 If we get a 4xx or 5xx response with a Document, then coerce
238 the document content into plain data.
239 """
240 if isinstance(node, (Document, Object)):
241 # Strip Links from Documents, treat Documents as plain dicts.
242 return OrderedDict([
243 (key, _coerce_to_error_content(value))
244 for key, value in node.data.items()
245 ])
246 elif isinstance(node, Array):
247 # Strip Links from Arrays.
248 return [
249 _coerce_to_error_content(item)
250 for item in node
251 if not isinstance(item, Link)
252 ]
253 return node
256def _coerce_to_error(obj, default_title):
257 """
258 Given an arbitrary return result, coerce it into an Error instance.
259 """
260 if isinstance(obj, Document):
261 return Error(
262 title=obj.title or default_title,
263 content=_coerce_to_error_content(obj)
264 )
265 elif isinstance(obj, dict):
266 return Error(title=default_title, content=obj)
267 elif isinstance(obj, list):
268 return Error(title=default_title, content={'messages': obj})
269 elif obj is None:
270 return Error(title=default_title)
271 return Error(title=default_title, content={'message': obj})
274def _decode_result(response, decoders, force_codec=False):
275 """
276 Given an HTTP response, return the decoded Core API document.
277 """
278 if response.content:
279 # Content returned in response. We should decode it.
280 if force_codec:
281 codec = decoders[0]
282 else:
283 content_type = response.headers.get('content-type')
284 codec = utils.negotiate_decoder(decoders, content_type)
286 options = {
287 'base_url': response.url
288 }
289 if 'content-type' in response.headers:
290 options['content_type'] = response.headers['content-type']
291 if 'content-disposition' in response.headers:
292 options['content_disposition'] = response.headers['content-disposition']
294 result = codec.load(response.content, **options)
295 else:
296 # No content returned in response.
297 result = None
299 # Coerce 4xx and 5xx codes into errors.
300 is_error = response.status_code >= 400 and response.status_code <= 599
301 if is_error and not isinstance(result, Error):
302 default_title = '%d %s' % (response.status_code, response.reason)
303 result = _coerce_to_error(result, default_title=default_title)
305 return result
308def _handle_inplace_replacements(document, link, link_ancestors):
309 """
310 Given a new document, and the link/ancestors it was created,
311 determine if we should:
313 * Make an inline replacement and then return the modified document tree.
314 * Return the new document as-is.
315 """
316 if not link.transform:
317 if link.action.lower() in ('put', 'patch', 'delete'):
318 transform = 'inplace'
319 else:
320 transform = 'new'
321 else:
322 transform = link.transform
324 if transform == 'inplace':
325 root = link_ancestors[0].document
326 keys_to_link_parent = link_ancestors[-1].keys
327 if document is None:
328 return root.delete_in(keys_to_link_parent)
329 return root.set_in(keys_to_link_parent, document)
331 return document
334class HTTPTransport(BaseTransport):
335 schemes = ['http', 'https']
337 def __init__(self, credentials=None, headers=None, auth=None, session=None, request_callback=None, response_callback=None):
338 if headers:
339 headers = {key.lower(): value for key, value in headers.items()}
340 if session is None:
341 session = requests.Session()
342 if auth is not None:
343 session.auth = auth
344 if not getattr(session.auth, 'allow_cookies', False):
345 session.cookies.set_policy(BlockAll())
347 if credentials is not None:
348 warnings.warn(
349 "The 'credentials' argument is now deprecated in favor of 'auth'.",
350 DeprecationWarning
351 )
352 auth = DomainCredentials(credentials)
353 if request_callback is not None or response_callback is not None:
354 warnings.warn(
355 "The 'request_callback' and 'response_callback' arguments are now deprecated. "
356 "Use a custom 'session' instance instead.",
357 DeprecationWarning
358 )
359 session.mount('https://', CallbackAdapter(request_callback, response_callback))
360 session.mount('http://', CallbackAdapter(request_callback, response_callback))
362 self._headers = itypes.Dict(headers or {})
363 self._session = session
365 @property
366 def headers(self):
367 return self._headers
369 def transition(self, link, decoders, params=None, link_ancestors=None, force_codec=False):
370 session = self._session
371 method = _get_method(link.action)
372 encoding = _get_encoding(link.encoding)
373 params = _get_params(method, encoding, link.fields, params)
374 url = _get_url(link.url, params.path)
375 headers = _get_headers(url, decoders)
376 headers.update(self.headers)
378 request = _build_http_request(session, url, method, headers, encoding, params)
379 response = session.send(request)
380 result = _decode_result(response, decoders, force_codec)
382 if isinstance(result, Document) and link_ancestors:
383 result = _handle_inplace_replacements(result, link, link_ancestors)
385 if isinstance(result, Error):
386 raise exceptions.ErrorMessage(result)
388 return result