Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/stripe/util.py: 22%

123 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import absolute_import, division, print_function 

2 

3import functools 

4import hmac 

5import io 

6import logging 

7import sys 

8import os 

9import re 

10 

11import stripe 

12from stripe import six 

13from stripe.six.moves.urllib.parse import parse_qsl, quote_plus 

14 

15STRIPE_LOG = os.environ.get("STRIPE_LOG") 

16 

17logger = logging.getLogger("stripe") 

18 

19__all__ = [ 

20 "io", 

21 "parse_qsl", 

22 "utf8", 

23 "log_info", 

24 "log_debug", 

25 "dashboard_link", 

26 "logfmt", 

27] 

28 

29 

30def utf8(value): 

31 if six.PY2 and isinstance(value, six.text_type): 

32 return value.encode("utf-8") 

33 else: 

34 return value 

35 

36 

37def is_appengine_dev(): 

38 return "APPENGINE_RUNTIME" in os.environ and "Dev" in os.environ.get( 

39 "SERVER_SOFTWARE", "" 

40 ) 

41 

42 

43def _console_log_level(): 

44 if stripe.log in ["debug", "info"]: 

45 return stripe.log 

46 elif STRIPE_LOG in ["debug", "info"]: 

47 return STRIPE_LOG 

48 else: 

49 return None 

50 

51 

52def log_debug(message, **params): 

53 msg = logfmt(dict(message=message, **params)) 

54 if _console_log_level() == "debug": 

55 print(msg, file=sys.stderr) 

56 logger.debug(msg) 

57 

58 

59def log_info(message, **params): 

60 msg = logfmt(dict(message=message, **params)) 

61 if _console_log_level() in ["debug", "info"]: 

62 print(msg, file=sys.stderr) 

63 logger.info(msg) 

64 

65 

66def _test_or_live_environment(): 

67 if stripe.api_key is None: 

68 return 

69 match = re.match(r"sk_(live|test)_", stripe.api_key) 

70 if match is None: 

71 return 

72 return match.groups()[0] 

73 

74 

75def dashboard_link(request_id): 

76 return "https://dashboard.stripe.com/{env}/logs/{reqid}".format( 

77 env=_test_or_live_environment() or "test", reqid=request_id 

78 ) 

79 

80 

81def logfmt(props): 

82 def fmt(key, val): 

83 # Handle case where val is a bytes or bytesarray 

84 if six.PY3 and hasattr(val, "decode"): 

85 val = val.decode("utf-8") 

86 # Check if val is already a string to avoid re-encoding into 

87 # ascii. Since the code is sent through 2to3, we can't just 

88 # use unicode(val, encoding='utf8') since it will be 

89 # translated incorrectly. 

90 if not isinstance(val, six.string_types): 

91 val = six.text_type(val) 

92 if re.search(r"\s", val): 

93 val = repr(val) 

94 # key should already be a string 

95 if re.search(r"\s", key): 

96 key = repr(key) 

97 return u"{key}={val}".format(key=key, val=val) 

98 

99 return u" ".join([fmt(key, val) for key, val in sorted(props.items())]) 

100 

101 

102# Borrowed from Django's source code 

103if hasattr(hmac, "compare_digest"): 103 ↛ 110line 103 didn't jump to line 110, because the condition on line 103 was never false

104 # Prefer the stdlib implementation, when available. 

105 def secure_compare(val1, val2): 

106 return hmac.compare_digest(utf8(val1), utf8(val2)) 

107 

108else: 

109 

110 def secure_compare(val1, val2): 

111 """ 

112 Returns True if the two strings are equal, False otherwise. 

113 The time taken is independent of the number of characters that match. 

114 For the sake of simplicity, this function executes in constant time 

115 only when the two strings have the same length. It short-circuits when 

116 they have different lengths. 

117 """ 

118 val1, val2 = utf8(val1), utf8(val2) 

119 if len(val1) != len(val2): 

120 return False 

121 result = 0 

122 if six.PY3 and isinstance(val1, bytes) and isinstance(val2, bytes): 

123 for x, y in zip(val1, val2): 

124 result |= x ^ y 

125 else: 

126 for x, y in zip(val1, val2): 

127 result |= ord(x) ^ ord(y) 

128 return result == 0 

129 

130 

131def get_object_classes(): 

132 # This is here to avoid a circular dependency 

133 from stripe.object_classes import OBJECT_CLASSES 

134 

135 return OBJECT_CLASSES 

136 

137 

138def convert_to_stripe_object( 

139 resp, api_key=None, stripe_version=None, stripe_account=None 

140): 

141 # If we get a StripeResponse, we'll want to return a 

142 # StripeObject with the last_response field filled out with 

143 # the raw API response information 

144 stripe_response = None 

145 

146 if isinstance(resp, stripe.stripe_response.StripeResponse): 

147 stripe_response = resp 

148 resp = stripe_response.data 

149 

150 if isinstance(resp, list): 

151 return [ 

152 convert_to_stripe_object( 

153 i, api_key, stripe_version, stripe_account 

154 ) 

155 for i in resp 

156 ] 

157 elif isinstance(resp, dict) and not isinstance( 

158 resp, stripe.stripe_object.StripeObject 

159 ): 

160 resp = resp.copy() 

161 klass_name = resp.get("object") 

162 if isinstance(klass_name, six.string_types): 

163 klass = get_object_classes().get( 

164 klass_name, stripe.stripe_object.StripeObject 

165 ) 

166 else: 

167 klass = stripe.stripe_object.StripeObject 

168 

169 return klass.construct_from( 

170 resp, 

171 api_key, 

172 stripe_version=stripe_version, 

173 stripe_account=stripe_account, 

174 last_response=stripe_response, 

175 ) 

176 else: 

177 return resp 

178 

179 

180def convert_to_dict(obj): 

181 """Converts a StripeObject back to a regular dict. 

182 

183 Nested StripeObjects are also converted back to regular dicts. 

184 

185 :param obj: The StripeObject to convert. 

186 

187 :returns: The StripeObject as a dict. 

188 """ 

189 if isinstance(obj, list): 

190 return [convert_to_dict(i) for i in obj] 

191 # This works by virtue of the fact that StripeObjects _are_ dicts. The dict 

192 # comprehension returns a regular dict and recursively applies the 

193 # conversion to each value. 

194 elif isinstance(obj, dict): 

195 return {k: convert_to_dict(v) for k, v in six.iteritems(obj)} 

196 else: 

197 return obj 

198 

199 

200def populate_headers(idempotency_key): 

201 if idempotency_key is not None: 

202 return {"Idempotency-Key": idempotency_key} 

203 return None 

204 

205 

206def merge_dicts(x, y): 

207 z = x.copy() 

208 z.update(y) 

209 return z 

210 

211 

212def sanitize_id(id): 

213 utf8id = utf8(id) 

214 quotedId = quote_plus(utf8id) 

215 return quotedId 

216 

217 

218class class_method_variant(object): 

219 def __init__(self, class_method_name): 

220 self.class_method_name = class_method_name 

221 

222 def __call__(self, method): 

223 self.method = method 

224 return self 

225 

226 def __get__(self, obj, objtype=None): 

227 @functools.wraps(self.method) 

228 def _wrapper(*args, **kwargs): 

229 if obj is not None: 

230 # Method was called as an instance method, e.g. 

231 # instance.method(...) 

232 return self.method(obj, *args, **kwargs) 

233 elif len(args) > 0 and isinstance(args[0], objtype): 

234 # Method was called as a class method with the instance as the 

235 # first argument, e.g. Class.method(instance, ...) which in 

236 # Python is the same thing as calling an instance method 

237 return self.method(args[0], *args[1:], **kwargs) 

238 else: 

239 # Method was called as a class method, e.g. Class.method(...) 

240 class_method = getattr(objtype, self.class_method_name) 

241 return class_method(*args, **kwargs) 

242 

243 return _wrapper