Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/coreapi/transports/http.py: 17%

219 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# coding: utf-8 

2from __future__ import unicode_literals 

3from collections import OrderedDict 

4from coreapi import exceptions, utils 

5from coreapi.compat import cookiejar, urlparse 

6from coreapi.document import Document, Object, Link, Array, Error 

7from coreapi.transports.base import BaseTransport 

8from coreapi.utils import guess_filename, is_file, File 

9import collections 

10import requests 

11import itypes 

12import mimetypes 

13import uritemplate 

14import warnings 

15 

16 

17Params = collections.namedtuple('Params', ['path', 'query', 'data', 'files']) 

18empty_params = Params({}, {}, {}, {}) 

19 

20 

21class ForceMultiPartDict(dict): 

22 """ 

23 A dictionary that always evaluates as True. 

24 Allows us to force requests to use multipart encoding, even when no 

25 file parameters are passed. 

26 """ 

27 def __bool__(self): 

28 return True 

29 

30 def __nonzero__(self): 

31 return True 

32 

33 

34class BlockAll(cookiejar.CookiePolicy): 

35 """ 

36 A cookie policy that rejects all cookies. 

37 Used to override the default `requests` behavior. 

38 """ 

39 return_ok = set_ok = domain_return_ok = path_return_ok = lambda self, *args, **kwargs: False 39 ↛ exitline 39 didn't run the lambda on line 39

40 netscape = True 

41 rfc2965 = hide_cookie2 = False 

42 

43 

44class DomainCredentials(requests.auth.AuthBase): 

45 """ 

46 Custom auth class to support deprecated 'credentials' argument. 

47 """ 

48 allow_cookies = False 

49 credentials = None 

50 

51 def __init__(self, credentials=None): 

52 self.credentials = credentials 

53 

54 def __call__(self, request): 

55 if not self.credentials: 

56 return request 

57 

58 # Include any authorization credentials relevant to this domain. 

59 url_components = urlparse.urlparse(request.url) 

60 host = url_components.hostname 

61 if host in self.credentials: 

62 request.headers['Authorization'] = self.credentials[host] 

63 return request 

64 

65 

66class CallbackAdapter(requests.adapters.HTTPAdapter): 

67 """ 

68 Custom requests HTTP adapter, to support deprecated callback arguments. 

69 """ 

70 def __init__(self, request_callback=None, response_callback=None): 

71 self.request_callback = request_callback 

72 self.response_callback = response_callback 

73 

74 def send(self, request, **kwargs): 

75 if self.request_callback is not None: 

76 self.request_callback(request) 

77 response = super(CallbackAdapter, self).send(request, **kwargs) 

78 if self.response_callback is not None: 

79 self.response_callback(response) 

80 return response 

81 

82 

83def _get_method(action): 

84 if not action: 

85 return 'GET' 

86 return action.upper() 

87 

88 

89def _get_encoding(encoding): 

90 if not encoding: 

91 return 'application/json' 

92 return encoding 

93 

94 

95def _get_params(method, encoding, fields, params=None): 

96 """ 

97 Separate the params into the various types. 

98 """ 

99 if params is None: 

100 return empty_params 

101 

102 field_map = {field.name: field for field in fields} 

103 

104 path = {} 

105 query = {} 

106 data = {} 

107 files = {} 

108 

109 errors = {} 

110 

111 # Ensure graceful behavior in edge-case where both location='body' and 

112 # location='form' fields are present. 

113 seen_body = False 

114 

115 for key, value in params.items(): 

116 if key not in field_map or not field_map[key].location: 

117 # Default is 'query' for 'GET' and 'DELETE', and 'form' for others. 

118 location = 'query' if method in ('GET', 'DELETE') else 'form' 

119 else: 

120 location = field_map[key].location 

121 

122 if location == 'form' and encoding == 'application/octet-stream': 

123 # Raw uploads should always use 'body', not 'form'. 

124 location = 'body' 

125 

126 try: 

127 if location == 'path': 

128 path[key] = utils.validate_path_param(value) 

129 elif location == 'query': 

130 query[key] = utils.validate_query_param(value) 

131 elif location == 'body': 

132 data = utils.validate_body_param(value, encoding=encoding) 

133 seen_body = True 

134 elif location == 'form': 

135 if not seen_body: 

136 data[key] = utils.validate_form_param(value, encoding=encoding) 

137 except exceptions.ParameterError as exc: 

138 errors[key] = "%s" % exc 

139 

140 if errors: 

141 raise exceptions.ParameterError(errors) 

142 

143 # Move any files from 'data' into 'files'. 

144 if isinstance(data, dict): 

145 for key, value in list(data.items()): 

146 if is_file(data[key]): 

147 files[key] = data.pop(key) 

148 

149 return Params(path, query, data, files) 

150 

151 

152def _get_url(url, path_params): 

153 """ 

154 Given a templated URL and some parameters that have been provided, 

155 expand the URL. 

156 """ 

157 if path_params: 

158 return uritemplate.expand(url, path_params) 

159 return url 

160 

161 

162def _get_headers(url, decoders): 

163 """ 

164 Return a dictionary of HTTP headers to use in the outgoing request. 

165 """ 

166 accept_media_types = decoders[0].get_media_types() 

167 if '*/*' not in accept_media_types: 

168 accept_media_types.append('*/*') 

169 

170 headers = { 

171 'accept': ', '.join(accept_media_types), 

172 'user-agent': 'coreapi' 

173 } 

174 

175 return headers 

176 

177 

178def _get_upload_headers(file_obj): 

179 """ 

180 When a raw file upload is made, determine the Content-Type and 

181 Content-Disposition headers to use with the request. 

182 """ 

183 name = guess_filename(file_obj) 

184 content_type = None 

185 content_disposition = None 

186 

187 # Determine the content type of the upload. 

188 if getattr(file_obj, 'content_type', None): 

189 content_type = file_obj.content_type 

190 elif name: 

191 content_type, encoding = mimetypes.guess_type(name) 

192 

193 # Determine the content disposition of the upload. 

194 if name: 

195 content_disposition = 'attachment; filename="%s"' % name 

196 

197 return { 

198 'Content-Type': content_type or 'application/octet-stream', 

199 'Content-Disposition': content_disposition or 'attachment' 

200 } 

201 

202 

203def _build_http_request(session, url, method, headers=None, encoding=None, params=empty_params): 

204 """ 

205 Make an HTTP request and return an HTTP response. 

206 """ 

207 opts = { 

208 "headers": headers or {} 

209 } 

210 

211 if params.query: 

212 opts['params'] = params.query 

213 

214 if params.data or params.files: 

215 if encoding == 'application/json': 

216 opts['json'] = params.data 

217 elif encoding == 'multipart/form-data': 

218 opts['data'] = params.data 

219 opts['files'] = ForceMultiPartDict(params.files) 

220 elif encoding == 'application/x-www-form-urlencoded': 

221 opts['data'] = params.data 

222 elif encoding == 'application/octet-stream': 

223 if isinstance(params.data, File): 

224 opts['data'] = params.data.content 

225 else: 

226 opts['data'] = params.data 

227 upload_headers = _get_upload_headers(params.data) 

228 opts['headers'].update(upload_headers) 

229 

230 request = requests.Request(method, url, **opts) 

231 return session.prepare_request(request) 

232 

233 

234def _coerce_to_error_content(node): 

235 """ 

236 Errors should not contain nested documents or links. 

237 If we get a 4xx or 5xx response with a Document, then coerce 

238 the document content into plain data. 

239 """ 

240 if isinstance(node, (Document, Object)): 

241 # Strip Links from Documents, treat Documents as plain dicts. 

242 return OrderedDict([ 

243 (key, _coerce_to_error_content(value)) 

244 for key, value in node.data.items() 

245 ]) 

246 elif isinstance(node, Array): 

247 # Strip Links from Arrays. 

248 return [ 

249 _coerce_to_error_content(item) 

250 for item in node 

251 if not isinstance(item, Link) 

252 ] 

253 return node 

254 

255 

256def _coerce_to_error(obj, default_title): 

257 """ 

258 Given an arbitrary return result, coerce it into an Error instance. 

259 """ 

260 if isinstance(obj, Document): 

261 return Error( 

262 title=obj.title or default_title, 

263 content=_coerce_to_error_content(obj) 

264 ) 

265 elif isinstance(obj, dict): 

266 return Error(title=default_title, content=obj) 

267 elif isinstance(obj, list): 

268 return Error(title=default_title, content={'messages': obj}) 

269 elif obj is None: 

270 return Error(title=default_title) 

271 return Error(title=default_title, content={'message': obj}) 

272 

273 

274def _decode_result(response, decoders, force_codec=False): 

275 """ 

276 Given an HTTP response, return the decoded Core API document. 

277 """ 

278 if response.content: 

279 # Content returned in response. We should decode it. 

280 if force_codec: 

281 codec = decoders[0] 

282 else: 

283 content_type = response.headers.get('content-type') 

284 codec = utils.negotiate_decoder(decoders, content_type) 

285 

286 options = { 

287 'base_url': response.url 

288 } 

289 if 'content-type' in response.headers: 

290 options['content_type'] = response.headers['content-type'] 

291 if 'content-disposition' in response.headers: 

292 options['content_disposition'] = response.headers['content-disposition'] 

293 

294 result = codec.load(response.content, **options) 

295 else: 

296 # No content returned in response. 

297 result = None 

298 

299 # Coerce 4xx and 5xx codes into errors. 

300 is_error = response.status_code >= 400 and response.status_code <= 599 

301 if is_error and not isinstance(result, Error): 

302 default_title = '%d %s' % (response.status_code, response.reason) 

303 result = _coerce_to_error(result, default_title=default_title) 

304 

305 return result 

306 

307 

308def _handle_inplace_replacements(document, link, link_ancestors): 

309 """ 

310 Given a new document, and the link/ancestors it was created, 

311 determine if we should: 

312 

313 * Make an inline replacement and then return the modified document tree. 

314 * Return the new document as-is. 

315 """ 

316 if not link.transform: 

317 if link.action.lower() in ('put', 'patch', 'delete'): 

318 transform = 'inplace' 

319 else: 

320 transform = 'new' 

321 else: 

322 transform = link.transform 

323 

324 if transform == 'inplace': 

325 root = link_ancestors[0].document 

326 keys_to_link_parent = link_ancestors[-1].keys 

327 if document is None: 

328 return root.delete_in(keys_to_link_parent) 

329 return root.set_in(keys_to_link_parent, document) 

330 

331 return document 

332 

333 

334class HTTPTransport(BaseTransport): 

335 schemes = ['http', 'https'] 

336 

337 def __init__(self, credentials=None, headers=None, auth=None, session=None, request_callback=None, response_callback=None): 

338 if headers: 

339 headers = {key.lower(): value for key, value in headers.items()} 

340 if session is None: 

341 session = requests.Session() 

342 if auth is not None: 

343 session.auth = auth 

344 if not getattr(session.auth, 'allow_cookies', False): 

345 session.cookies.set_policy(BlockAll()) 

346 

347 if credentials is not None: 

348 warnings.warn( 

349 "The 'credentials' argument is now deprecated in favor of 'auth'.", 

350 DeprecationWarning 

351 ) 

352 auth = DomainCredentials(credentials) 

353 if request_callback is not None or response_callback is not None: 

354 warnings.warn( 

355 "The 'request_callback' and 'response_callback' arguments are now deprecated. " 

356 "Use a custom 'session' instance instead.", 

357 DeprecationWarning 

358 ) 

359 session.mount('https://', CallbackAdapter(request_callback, response_callback)) 

360 session.mount('http://', CallbackAdapter(request_callback, response_callback)) 

361 

362 self._headers = itypes.Dict(headers or {}) 

363 self._session = session 

364 

365 @property 

366 def headers(self): 

367 return self._headers 

368 

369 def transition(self, link, decoders, params=None, link_ancestors=None, force_codec=False): 

370 session = self._session 

371 method = _get_method(link.action) 

372 encoding = _get_encoding(link.encoding) 

373 params = _get_params(method, encoding, link.fields, params) 

374 url = _get_url(link.url, params.path) 

375 headers = _get_headers(url, decoders) 

376 headers.update(self.headers) 

377 

378 request = _build_http_request(session, url, method, headers, encoding, params) 

379 response = session.send(request) 

380 result = _decode_result(response, decoders, force_codec) 

381 

382 if isinstance(result, Document) and link_ancestors: 

383 result = _handle_inplace_replacements(result, link, link_ancestors) 

384 

385 if isinstance(result, Error): 

386 raise exceptions.ErrorMessage(result) 

387 

388 return result