Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/stripe/util.py: 22%
123 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import absolute_import, division, print_function
3import functools
4import hmac
5import io
6import logging
7import sys
8import os
9import re
11import stripe
12from stripe import six
13from stripe.six.moves.urllib.parse import parse_qsl, quote_plus
15STRIPE_LOG = os.environ.get("STRIPE_LOG")
17logger = logging.getLogger("stripe")
19__all__ = [
20 "io",
21 "parse_qsl",
22 "utf8",
23 "log_info",
24 "log_debug",
25 "dashboard_link",
26 "logfmt",
27]
30def utf8(value):
31 if six.PY2 and isinstance(value, six.text_type):
32 return value.encode("utf-8")
33 else:
34 return value
37def is_appengine_dev():
38 return "APPENGINE_RUNTIME" in os.environ and "Dev" in os.environ.get(
39 "SERVER_SOFTWARE", ""
40 )
43def _console_log_level():
44 if stripe.log in ["debug", "info"]:
45 return stripe.log
46 elif STRIPE_LOG in ["debug", "info"]:
47 return STRIPE_LOG
48 else:
49 return None
52def log_debug(message, **params):
53 msg = logfmt(dict(message=message, **params))
54 if _console_log_level() == "debug":
55 print(msg, file=sys.stderr)
56 logger.debug(msg)
59def log_info(message, **params):
60 msg = logfmt(dict(message=message, **params))
61 if _console_log_level() in ["debug", "info"]:
62 print(msg, file=sys.stderr)
63 logger.info(msg)
66def _test_or_live_environment():
67 if stripe.api_key is None:
68 return
69 match = re.match(r"sk_(live|test)_", stripe.api_key)
70 if match is None:
71 return
72 return match.groups()[0]
75def dashboard_link(request_id):
76 return "https://dashboard.stripe.com/{env}/logs/{reqid}".format(
77 env=_test_or_live_environment() or "test", reqid=request_id
78 )
81def logfmt(props):
82 def fmt(key, val):
83 # Handle case where val is a bytes or bytesarray
84 if six.PY3 and hasattr(val, "decode"):
85 val = val.decode("utf-8")
86 # Check if val is already a string to avoid re-encoding into
87 # ascii. Since the code is sent through 2to3, we can't just
88 # use unicode(val, encoding='utf8') since it will be
89 # translated incorrectly.
90 if not isinstance(val, six.string_types):
91 val = six.text_type(val)
92 if re.search(r"\s", val):
93 val = repr(val)
94 # key should already be a string
95 if re.search(r"\s", key):
96 key = repr(key)
97 return u"{key}={val}".format(key=key, val=val)
99 return u" ".join([fmt(key, val) for key, val in sorted(props.items())])
102# Borrowed from Django's source code
103if hasattr(hmac, "compare_digest"): 103 ↛ 110line 103 didn't jump to line 110, because the condition on line 103 was never false
104 # Prefer the stdlib implementation, when available.
105 def secure_compare(val1, val2):
106 return hmac.compare_digest(utf8(val1), utf8(val2))
108else:
110 def secure_compare(val1, val2):
111 """
112 Returns True if the two strings are equal, False otherwise.
113 The time taken is independent of the number of characters that match.
114 For the sake of simplicity, this function executes in constant time
115 only when the two strings have the same length. It short-circuits when
116 they have different lengths.
117 """
118 val1, val2 = utf8(val1), utf8(val2)
119 if len(val1) != len(val2):
120 return False
121 result = 0
122 if six.PY3 and isinstance(val1, bytes) and isinstance(val2, bytes):
123 for x, y in zip(val1, val2):
124 result |= x ^ y
125 else:
126 for x, y in zip(val1, val2):
127 result |= ord(x) ^ ord(y)
128 return result == 0
131def get_object_classes():
132 # This is here to avoid a circular dependency
133 from stripe.object_classes import OBJECT_CLASSES
135 return OBJECT_CLASSES
138def convert_to_stripe_object(
139 resp, api_key=None, stripe_version=None, stripe_account=None
140):
141 # If we get a StripeResponse, we'll want to return a
142 # StripeObject with the last_response field filled out with
143 # the raw API response information
144 stripe_response = None
146 if isinstance(resp, stripe.stripe_response.StripeResponse):
147 stripe_response = resp
148 resp = stripe_response.data
150 if isinstance(resp, list):
151 return [
152 convert_to_stripe_object(
153 i, api_key, stripe_version, stripe_account
154 )
155 for i in resp
156 ]
157 elif isinstance(resp, dict) and not isinstance(
158 resp, stripe.stripe_object.StripeObject
159 ):
160 resp = resp.copy()
161 klass_name = resp.get("object")
162 if isinstance(klass_name, six.string_types):
163 klass = get_object_classes().get(
164 klass_name, stripe.stripe_object.StripeObject
165 )
166 else:
167 klass = stripe.stripe_object.StripeObject
169 return klass.construct_from(
170 resp,
171 api_key,
172 stripe_version=stripe_version,
173 stripe_account=stripe_account,
174 last_response=stripe_response,
175 )
176 else:
177 return resp
180def convert_to_dict(obj):
181 """Converts a StripeObject back to a regular dict.
183 Nested StripeObjects are also converted back to regular dicts.
185 :param obj: The StripeObject to convert.
187 :returns: The StripeObject as a dict.
188 """
189 if isinstance(obj, list):
190 return [convert_to_dict(i) for i in obj]
191 # This works by virtue of the fact that StripeObjects _are_ dicts. The dict
192 # comprehension returns a regular dict and recursively applies the
193 # conversion to each value.
194 elif isinstance(obj, dict):
195 return {k: convert_to_dict(v) for k, v in six.iteritems(obj)}
196 else:
197 return obj
200def populate_headers(idempotency_key):
201 if idempotency_key is not None:
202 return {"Idempotency-Key": idempotency_key}
203 return None
206def merge_dicts(x, y):
207 z = x.copy()
208 z.update(y)
209 return z
212def sanitize_id(id):
213 utf8id = utf8(id)
214 quotedId = quote_plus(utf8id)
215 return quotedId
218class class_method_variant(object):
219 def __init__(self, class_method_name):
220 self.class_method_name = class_method_name
222 def __call__(self, method):
223 self.method = method
224 return self
226 def __get__(self, obj, objtype=None):
227 @functools.wraps(self.method)
228 def _wrapper(*args, **kwargs):
229 if obj is not None:
230 # Method was called as an instance method, e.g.
231 # instance.method(...)
232 return self.method(obj, *args, **kwargs)
233 elif len(args) > 0 and isinstance(args[0], objtype):
234 # Method was called as a class method with the instance as the
235 # first argument, e.g. Class.method(instance, ...) which in
236 # Python is the same thing as calling an instance method
237 return self.method(args[0], *args[1:], **kwargs)
238 else:
239 # Method was called as a class method, e.g. Class.method(...)
240 class_method = getattr(objtype, self.class_method_name)
241 return class_method(*args, **kwargs)
243 return _wrapper