Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/stripe/api_requestor.py: 11%

207 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import absolute_import, division, print_function 

2 

3import calendar 

4import datetime 

5import json 

6import platform 

7import time 

8import uuid 

9import warnings 

10from collections import OrderedDict 

11 

12import stripe 

13from stripe import error, oauth_error, http_client, version, util, six 

14from stripe.multipart_data_generator import MultipartDataGenerator 

15from stripe.six.moves.urllib.parse import urlencode, urlsplit, urlunsplit 

16from stripe.stripe_response import StripeResponse, StripeStreamResponse 

17 

18 

19def _encode_datetime(dttime): 

20 if dttime.tzinfo and dttime.tzinfo.utcoffset(dttime) is not None: 

21 utc_timestamp = calendar.timegm(dttime.utctimetuple()) 

22 else: 

23 utc_timestamp = time.mktime(dttime.timetuple()) 

24 

25 return int(utc_timestamp) 

26 

27 

28def _encode_nested_dict(key, data, fmt="%s[%s]"): 

29 d = OrderedDict() 

30 for subkey, subvalue in six.iteritems(data): 

31 d[fmt % (key, subkey)] = subvalue 

32 return d 

33 

34 

35def _api_encode(data): 

36 for key, value in six.iteritems(data): 

37 key = util.utf8(key) 

38 if value is None: 

39 continue 

40 elif hasattr(value, "stripe_id"): 

41 yield (key, value.stripe_id) 

42 elif isinstance(value, list) or isinstance(value, tuple): 

43 for i, sv in enumerate(value): 

44 if isinstance(sv, dict): 

45 subdict = _encode_nested_dict("%s[%d]" % (key, i), sv) 

46 for k, v in _api_encode(subdict): 

47 yield (k, v) 

48 else: 

49 yield ("%s[%d]" % (key, i), util.utf8(sv)) 

50 elif isinstance(value, dict): 

51 subdict = _encode_nested_dict(key, value) 

52 for subkey, subvalue in _api_encode(subdict): 

53 yield (subkey, subvalue) 

54 elif isinstance(value, datetime.datetime): 

55 yield (key, _encode_datetime(value)) 

56 else: 

57 yield (key, util.utf8(value)) 

58 

59 

60def _build_api_url(url, query): 

61 scheme, netloc, path, base_query, fragment = urlsplit(url) 

62 

63 if base_query: 

64 query = "%s&%s" % (base_query, query) 

65 

66 return urlunsplit((scheme, netloc, path, query, fragment)) 

67 

68 

69class APIRequestor(object): 

70 def __init__( 

71 self, 

72 key=None, 

73 client=None, 

74 api_base=None, 

75 api_version=None, 

76 account=None, 

77 ): 

78 self.api_base = api_base or stripe.api_base 

79 self.api_key = key 

80 self.api_version = api_version or stripe.api_version 

81 self.stripe_account = account 

82 

83 self._default_proxy = None 

84 

85 from stripe import verify_ssl_certs as verify 

86 from stripe import proxy 

87 

88 if client: 

89 self._client = client 

90 elif stripe.default_http_client: 

91 self._client = stripe.default_http_client 

92 if proxy != self._default_proxy: 

93 warnings.warn( 

94 "stripe.proxy was updated after sending a " 

95 "request - this is a no-op. To use a different proxy, " 

96 "set stripe.default_http_client to a new client " 

97 "configured with the proxy." 

98 ) 

99 else: 

100 # If the stripe.default_http_client has not been set by the user 

101 # yet, we'll set it here. This way, we aren't creating a new 

102 # HttpClient for every request. 

103 stripe.default_http_client = http_client.new_default_http_client( 

104 verify_ssl_certs=verify, proxy=proxy 

105 ) 

106 self._client = stripe.default_http_client 

107 self._default_proxy = proxy 

108 

109 @classmethod 

110 def format_app_info(cls, info): 

111 str = info["name"] 

112 if info["version"]: 

113 str += "/%s" % (info["version"],) 

114 if info["url"]: 

115 str += " (%s)" % (info["url"],) 

116 return str 

117 

118 def request(self, method, url, params=None, headers=None): 

119 rbody, rcode, rheaders, my_api_key = self.request_raw( 

120 method.lower(), url, params, headers, is_streaming=False 

121 ) 

122 resp = self.interpret_response(rbody, rcode, rheaders) 

123 return resp, my_api_key 

124 

125 def request_stream(self, method, url, params=None, headers=None): 

126 stream, rcode, rheaders, my_api_key = self.request_raw( 

127 method.lower(), url, params, headers, is_streaming=True 

128 ) 

129 resp = self.interpret_streaming_response(stream, rcode, rheaders) 

130 return resp, my_api_key 

131 

132 def handle_error_response(self, rbody, rcode, resp, rheaders): 

133 try: 

134 error_data = resp["error"] 

135 except (KeyError, TypeError): 

136 raise error.APIError( 

137 "Invalid response object from API: %r (HTTP response code " 

138 "was %d)" % (rbody, rcode), 

139 rbody, 

140 rcode, 

141 resp, 

142 ) 

143 

144 err = None 

145 

146 # OAuth errors are a JSON object where `error` is a string. In 

147 # contrast, in API errors, `error` is a hash with sub-keys. We use 

148 # this property to distinguish between OAuth and API errors. 

149 if isinstance(error_data, six.string_types): 

150 err = self.specific_oauth_error( 

151 rbody, rcode, resp, rheaders, error_data 

152 ) 

153 

154 if err is None: 

155 err = self.specific_api_error( 

156 rbody, rcode, resp, rheaders, error_data 

157 ) 

158 

159 raise err 

160 

161 def specific_api_error(self, rbody, rcode, resp, rheaders, error_data): 

162 util.log_info( 

163 "Stripe API error received", 

164 error_code=error_data.get("code"), 

165 error_type=error_data.get("type"), 

166 error_message=error_data.get("message"), 

167 error_param=error_data.get("param"), 

168 ) 

169 

170 # Rate limits were previously coded as 400's with code 'rate_limit' 

171 if rcode == 429 or ( 

172 rcode == 400 and error_data.get("code") == "rate_limit" 

173 ): 

174 return error.RateLimitError( 

175 error_data.get("message"), rbody, rcode, resp, rheaders 

176 ) 

177 elif rcode in [400, 404]: 

178 if error_data.get("type") == "idempotency_error": 

179 return error.IdempotencyError( 

180 error_data.get("message"), rbody, rcode, resp, rheaders 

181 ) 

182 else: 

183 return error.InvalidRequestError( 

184 error_data.get("message"), 

185 error_data.get("param"), 

186 error_data.get("code"), 

187 rbody, 

188 rcode, 

189 resp, 

190 rheaders, 

191 ) 

192 elif rcode == 401: 

193 return error.AuthenticationError( 

194 error_data.get("message"), rbody, rcode, resp, rheaders 

195 ) 

196 elif rcode == 402: 

197 return error.CardError( 

198 error_data.get("message"), 

199 error_data.get("param"), 

200 error_data.get("code"), 

201 rbody, 

202 rcode, 

203 resp, 

204 rheaders, 

205 ) 

206 elif rcode == 403: 

207 return error.PermissionError( 

208 error_data.get("message"), rbody, rcode, resp, rheaders 

209 ) 

210 else: 

211 return error.APIError( 

212 error_data.get("message"), rbody, rcode, resp, rheaders 

213 ) 

214 

215 def specific_oauth_error(self, rbody, rcode, resp, rheaders, error_code): 

216 description = resp.get("error_description", error_code) 

217 

218 util.log_info( 

219 "Stripe OAuth error received", 

220 error_code=error_code, 

221 error_description=description, 

222 ) 

223 

224 args = [error_code, description, rbody, rcode, resp, rheaders] 

225 

226 if error_code == "invalid_client": 

227 return oauth_error.InvalidClientError(*args) 

228 elif error_code == "invalid_grant": 

229 return oauth_error.InvalidGrantError(*args) 

230 elif error_code == "invalid_request": 

231 return oauth_error.InvalidRequestError(*args) 

232 elif error_code == "invalid_scope": 

233 return oauth_error.InvalidScopeError(*args) 

234 elif error_code == "unsupported_grant_type": 

235 return oauth_error.UnsupportedGrantTypError(*args) 

236 elif error_code == "unsupported_response_type": 

237 return oauth_error.UnsupportedResponseTypError(*args) 

238 

239 return None 

240 

241 def request_headers(self, api_key, method): 

242 user_agent = "Stripe/v1 PythonBindings/%s" % (version.VERSION,) 

243 if stripe.app_info: 

244 user_agent += " " + self.format_app_info(stripe.app_info) 

245 

246 ua = { 

247 "bindings_version": version.VERSION, 

248 "lang": "python", 

249 "publisher": "stripe", 

250 "httplib": self._client.name, 

251 } 

252 for attr, func in [ 

253 ["lang_version", platform.python_version], 

254 ["platform", platform.platform], 

255 ["uname", lambda: " ".join(platform.uname())], 

256 ]: 

257 try: 

258 val = func() 

259 except Exception: 

260 val = "(disabled)" 

261 ua[attr] = val 

262 if stripe.app_info: 

263 ua["application"] = stripe.app_info 

264 

265 headers = { 

266 "X-Stripe-Client-User-Agent": json.dumps(ua), 

267 "User-Agent": user_agent, 

268 "Authorization": "Bearer %s" % (api_key,), 

269 } 

270 

271 if self.stripe_account: 

272 headers["Stripe-Account"] = self.stripe_account 

273 

274 if method == "post": 

275 headers["Content-Type"] = "application/x-www-form-urlencoded" 

276 headers.setdefault("Idempotency-Key", str(uuid.uuid4())) 

277 

278 if self.api_version is not None: 

279 headers["Stripe-Version"] = self.api_version 

280 

281 return headers 

282 

283 def request_raw( 

284 self, 

285 method, 

286 url, 

287 params=None, 

288 supplied_headers=None, 

289 is_streaming=False, 

290 ): 

291 """ 

292 Mechanism for issuing an API call 

293 """ 

294 

295 if self.api_key: 

296 my_api_key = self.api_key 

297 else: 

298 from stripe import api_key 

299 

300 my_api_key = api_key 

301 

302 if my_api_key is None: 

303 raise error.AuthenticationError( 

304 "No API key provided. (HINT: set your API key using " 

305 '"stripe.api_key = <API-KEY>"). You can generate API keys ' 

306 "from the Stripe web interface. See https://stripe.com/api " 

307 "for details, or email support@stripe.com if you have any " 

308 "questions." 

309 ) 

310 

311 abs_url = "%s%s" % (self.api_base, url) 

312 

313 encoded_params = urlencode(list(_api_encode(params or {}))) 

314 

315 # Don't use strict form encoding by changing the square bracket control 

316 # characters back to their literals. This is fine by the server, and 

317 # makes these parameter strings easier to read. 

318 encoded_params = encoded_params.replace("%5B", "[").replace("%5D", "]") 

319 

320 if method == "get" or method == "delete": 

321 if params: 

322 abs_url = _build_api_url(abs_url, encoded_params) 

323 post_data = None 

324 elif method == "post": 

325 if ( 

326 supplied_headers is not None 

327 and supplied_headers.get("Content-Type") 

328 == "multipart/form-data" 

329 ): 

330 generator = MultipartDataGenerator() 

331 generator.add_params(params or {}) 

332 post_data = generator.get_post_data() 

333 supplied_headers[ 

334 "Content-Type" 

335 ] = "multipart/form-data; boundary=%s" % (generator.boundary,) 

336 else: 

337 post_data = encoded_params 

338 else: 

339 raise error.APIConnectionError( 

340 "Unrecognized HTTP method %r. This may indicate a bug in the " 

341 "Stripe bindings. Please contact support@stripe.com for " 

342 "assistance." % (method,) 

343 ) 

344 

345 headers = self.request_headers(my_api_key, method) 

346 if supplied_headers is not None: 

347 for key, value in six.iteritems(supplied_headers): 

348 headers[key] = value 

349 

350 util.log_info("Request to Stripe api", method=method, path=abs_url) 

351 util.log_debug( 

352 "Post details", 

353 post_data=encoded_params, 

354 api_version=self.api_version, 

355 ) 

356 

357 if is_streaming: 

358 ( 

359 rcontent, 

360 rcode, 

361 rheaders, 

362 ) = self._client.request_stream_with_retries( 

363 method, abs_url, headers, post_data 

364 ) 

365 else: 

366 rcontent, rcode, rheaders = self._client.request_with_retries( 

367 method, abs_url, headers, post_data 

368 ) 

369 

370 util.log_info("Stripe API response", path=abs_url, response_code=rcode) 

371 util.log_debug("API response body", body=rcontent) 

372 

373 if "Request-Id" in rheaders: 

374 request_id = rheaders["Request-Id"] 

375 util.log_debug( 

376 "Dashboard link for request", 

377 link=util.dashboard_link(request_id), 

378 ) 

379 

380 return rcontent, rcode, rheaders, my_api_key 

381 

382 def _should_handle_code_as_error(self, rcode): 

383 return not 200 <= rcode < 300 

384 

385 def interpret_response(self, rbody, rcode, rheaders): 

386 try: 

387 if hasattr(rbody, "decode"): 

388 rbody = rbody.decode("utf-8") 

389 resp = StripeResponse(rbody, rcode, rheaders) 

390 except Exception: 

391 raise error.APIError( 

392 "Invalid response body from API: %s " 

393 "(HTTP response code was %d)" % (rbody, rcode), 

394 rbody, 

395 rcode, 

396 rheaders, 

397 ) 

398 if self._should_handle_code_as_error(rcode): 

399 self.handle_error_response(rbody, rcode, resp.data, rheaders) 

400 return resp 

401 

402 def interpret_streaming_response(self, stream, rcode, rheaders): 

403 # Streaming response are handled with minimal processing for the success 

404 # case (ie. we don't want to read the content). When an error is 

405 # received, we need to read from the stream and parse the received JSON, 

406 # treating it like a standard JSON response. 

407 if self._should_handle_code_as_error(rcode): 

408 if hasattr(stream, "getvalue"): 

409 json_content = stream.getvalue() 

410 elif hasattr(stream, "read"): 

411 json_content = stream.read() 

412 else: 

413 raise NotImplementedError( 

414 "HTTP client %s does not return an IOBase object which " 

415 "can be consumed when streaming a response." 

416 ) 

417 

418 return self.interpret_response(json_content, rcode, rheaders) 

419 else: 

420 return StripeStreamResponse(stream, rcode, rheaders)