Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/jwt/api_jwt.py: 51%

130 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import annotations 

2 

3import json 

4import warnings 

5from calendar import timegm 

6from collections.abc import Iterable, Mapping 

7from datetime import datetime, timedelta, timezone 

8from typing import Any, Dict, List, Optional, Type, Union 

9 

10from . import api_jws 

11from .exceptions import ( 

12 DecodeError, 

13 ExpiredSignatureError, 

14 ImmatureSignatureError, 

15 InvalidAudienceError, 

16 InvalidIssuedAtError, 

17 InvalidIssuerError, 

18 MissingRequiredClaimError, 

19) 

20from .warnings import RemovedInPyjwt3Warning 

21 

22 

23class PyJWT: 

24 def __init__(self, options=None): 

25 if options is None: 25 ↛ 27line 25 didn't jump to line 27, because the condition on line 25 was never false

26 options = {} 

27 self.options = {**self._get_default_options(), **options} 

28 

29 @staticmethod 

30 def _get_default_options() -> Dict[str, Union[bool, List[str]]]: 

31 return { 

32 "verify_signature": True, 

33 "verify_exp": True, 

34 "verify_nbf": True, 

35 "verify_iat": True, 

36 "verify_aud": True, 

37 "verify_iss": True, 

38 "require": [], 

39 } 

40 

41 def encode( 

42 self, 

43 payload: Dict[str, Any], 

44 key: str, 

45 algorithm: Optional[str] = "HS256", 

46 headers: Optional[Dict[str, Any]] = None, 

47 json_encoder: Optional[Type[json.JSONEncoder]] = None, 

48 ) -> str: 

49 # Check that we get a mapping 

50 if not isinstance(payload, Mapping): 50 ↛ 51line 50 didn't jump to line 51, because the condition on line 50 was never true

51 raise TypeError( 

52 "Expecting a mapping object, as JWT only supports " 

53 "JSON objects as payloads." 

54 ) 

55 

56 # Payload 

57 payload = payload.copy() 

58 for time_claim in ["exp", "iat", "nbf"]: 

59 # Convert datetime to a intDate value in known time-format claims 

60 if isinstance(payload.get(time_claim), datetime): 60 ↛ 61line 60 didn't jump to line 61, because the condition on line 60 was never true

61 payload[time_claim] = timegm(payload[time_claim].utctimetuple()) 

62 

63 json_payload = json.dumps( 

64 payload, separators=(",", ":"), cls=json_encoder 

65 ).encode("utf-8") 

66 

67 return api_jws.encode(json_payload, key, algorithm, headers, json_encoder) 

68 

69 def decode_complete( 

70 self, 

71 jwt: str, 

72 key: str = "", 

73 algorithms: Optional[List[str]] = None, 

74 options: Optional[Dict[str, Any]] = None, 

75 # deprecated arg, remove in pyjwt3 

76 verify: Optional[bool] = None, 

77 # could be used as passthrough to api_jws, consider removal in pyjwt3 

78 detached_payload: Optional[bytes] = None, 

79 # passthrough arguments to _validate_claims 

80 # consider putting in options 

81 audience: Optional[Union[str, Iterable[str]]] = None, 

82 issuer: Optional[str] = None, 

83 leeway: Union[int, float, timedelta] = 0, 

84 # kwargs 

85 **kwargs, 

86 ) -> Dict[str, Any]: 

87 if kwargs: 87 ↛ 88line 87 didn't jump to line 88, because the condition on line 87 was never true

88 warnings.warn( 

89 "passing additional kwargs to decode_complete() is deprecated " 

90 "and will be removed in pyjwt version 3. " 

91 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

92 RemovedInPyjwt3Warning, 

93 ) 

94 options = dict(options or {}) # shallow-copy or initialize an empty dict 

95 options.setdefault("verify_signature", True) 

96 

97 # If the user has set the legacy `verify` argument, and it doesn't match 

98 # what the relevant `options` entry for the argument is, inform the user 

99 # that they're likely making a mistake. 

100 if verify is not None and verify != options["verify_signature"]: 100 ↛ 101line 100 didn't jump to line 101, because the condition on line 100 was never true

101 warnings.warn( 

102 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. " 

103 "The equivalent is setting `verify_signature` to False in the `options` dictionary. " 

104 "This invocation has a mismatch between the kwarg and the option entry.", 

105 category=DeprecationWarning, 

106 ) 

107 

108 if not options["verify_signature"]: 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true

109 options.setdefault("verify_exp", False) 

110 options.setdefault("verify_nbf", False) 

111 options.setdefault("verify_iat", False) 

112 options.setdefault("verify_aud", False) 

113 options.setdefault("verify_iss", False) 

114 

115 if options["verify_signature"] and not algorithms: 115 ↛ 116line 115 didn't jump to line 116, because the condition on line 115 was never true

116 raise DecodeError( 

117 'It is required that you pass in a value for the "algorithms" argument when calling decode().' 

118 ) 

119 

120 decoded = api_jws.decode_complete( 

121 jwt, 

122 key=key, 

123 algorithms=algorithms, 

124 options=options, 

125 detached_payload=detached_payload, 

126 ) 

127 

128 try: 

129 payload = json.loads(decoded["payload"]) 

130 except ValueError as e: 

131 raise DecodeError(f"Invalid payload string: {e}") 

132 if not isinstance(payload, dict): 132 ↛ 133line 132 didn't jump to line 133, because the condition on line 132 was never true

133 raise DecodeError("Invalid payload string: must be a json object") 

134 

135 merged_options = {**self.options, **options} 

136 self._validate_claims( 

137 payload, merged_options, audience=audience, issuer=issuer, leeway=leeway 

138 ) 

139 

140 decoded["payload"] = payload 

141 return decoded 

142 

143 def decode( 

144 self, 

145 jwt: str, 

146 key: str = "", 

147 algorithms: Optional[List[str]] = None, 

148 options: Optional[Dict[str, Any]] = None, 

149 # deprecated arg, remove in pyjwt3 

150 verify: Optional[bool] = None, 

151 # could be used as passthrough to api_jws, consider removal in pyjwt3 

152 detached_payload: Optional[bytes] = None, 

153 # passthrough arguments to _validate_claims 

154 # consider putting in options 

155 audience: Optional[Union[str, Iterable[str]]] = None, 

156 issuer: Optional[str] = None, 

157 leeway: Union[int, float, timedelta] = 0, 

158 # kwargs 

159 **kwargs, 

160 ) -> Dict[str, Any]: 

161 if kwargs: 161 ↛ 162line 161 didn't jump to line 162, because the condition on line 161 was never true

162 warnings.warn( 

163 "passing additional kwargs to decode() is deprecated " 

164 "and will be removed in pyjwt version 3. " 

165 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

166 RemovedInPyjwt3Warning, 

167 ) 

168 decoded = self.decode_complete( 

169 jwt, 

170 key, 

171 algorithms, 

172 options, 

173 verify=verify, 

174 detached_payload=detached_payload, 

175 audience=audience, 

176 issuer=issuer, 

177 leeway=leeway, 

178 ) 

179 return decoded["payload"] 

180 

181 def _validate_claims(self, payload, options, audience=None, issuer=None, leeway=0): 

182 if isinstance(leeway, timedelta): 182 ↛ 185line 182 didn't jump to line 185, because the condition on line 182 was never false

183 leeway = leeway.total_seconds() 

184 

185 if audience is not None and not isinstance(audience, (str, Iterable)): 185 ↛ 186line 185 didn't jump to line 186, because the condition on line 185 was never true

186 raise TypeError("audience must be a string, iterable or None") 

187 

188 self._validate_required_claims(payload, options) 

189 

190 now = timegm(datetime.now(tz=timezone.utc).utctimetuple()) 

191 

192 if "iat" in payload and options["verify_iat"]: 192 ↛ 195line 192 didn't jump to line 195, because the condition on line 192 was never false

193 self._validate_iat(payload, now, leeway) 

194 

195 if "nbf" in payload and options["verify_nbf"]: 195 ↛ 196line 195 didn't jump to line 196, because the condition on line 195 was never true

196 self._validate_nbf(payload, now, leeway) 

197 

198 if "exp" in payload and options["verify_exp"]: 198 ↛ 201line 198 didn't jump to line 201, because the condition on line 198 was never false

199 self._validate_exp(payload, now, leeway) 

200 

201 if options["verify_iss"]: 201 ↛ 204line 201 didn't jump to line 204, because the condition on line 201 was never false

202 self._validate_iss(payload, issuer) 

203 

204 if options["verify_aud"]: 204 ↛ 205line 204 didn't jump to line 205, because the condition on line 204 was never true

205 self._validate_aud(payload, audience) 

206 

207 def _validate_required_claims(self, payload, options): 

208 for claim in options["require"]: 208 ↛ 209line 208 didn't jump to line 209, because the loop on line 208 never started

209 if payload.get(claim) is None: 

210 raise MissingRequiredClaimError(claim) 

211 

212 def _validate_iat(self, payload, now, leeway): 

213 iat = payload["iat"] 

214 try: 

215 int(iat) 

216 except ValueError: 

217 raise InvalidIssuedAtError("Issued At claim (iat) must be an integer.") 

218 if iat > (now + leeway): 218 ↛ 219line 218 didn't jump to line 219, because the condition on line 218 was never true

219 raise ImmatureSignatureError("The token is not yet valid (iat)") 

220 

221 def _validate_nbf(self, payload, now, leeway): 

222 try: 

223 nbf = int(payload["nbf"]) 

224 except ValueError: 

225 raise DecodeError("Not Before claim (nbf) must be an integer.") 

226 

227 if nbf > (now + leeway): 

228 raise ImmatureSignatureError("The token is not yet valid (nbf)") 

229 

230 def _validate_exp(self, payload, now, leeway): 

231 try: 

232 exp = int(payload["exp"]) 

233 except ValueError: 

234 raise DecodeError("Expiration Time claim (exp) must be an" " integer.") 

235 

236 if exp <= (now - leeway): 236 ↛ 237line 236 didn't jump to line 237, because the condition on line 236 was never true

237 raise ExpiredSignatureError("Signature has expired") 

238 

239 def _validate_aud(self, payload, audience): 

240 if audience is None: 

241 if "aud" not in payload or not payload["aud"]: 

242 return 

243 # Application did not specify an audience, but 

244 # the token has the 'aud' claim 

245 raise InvalidAudienceError("Invalid audience") 

246 

247 if "aud" not in payload or not payload["aud"]: 

248 # Application specified an audience, but it could not be 

249 # verified since the token does not contain a claim. 

250 raise MissingRequiredClaimError("aud") 

251 

252 audience_claims = payload["aud"] 

253 

254 if isinstance(audience_claims, str): 

255 audience_claims = [audience_claims] 

256 if not isinstance(audience_claims, list): 

257 raise InvalidAudienceError("Invalid claim format in token") 

258 if any(not isinstance(c, str) for c in audience_claims): 

259 raise InvalidAudienceError("Invalid claim format in token") 

260 

261 if isinstance(audience, str): 

262 audience = [audience] 

263 

264 if all(aud not in audience_claims for aud in audience): 

265 raise InvalidAudienceError("Invalid audience") 

266 

267 def _validate_iss(self, payload, issuer): 

268 if issuer is None: 268 ↛ 271line 268 didn't jump to line 271, because the condition on line 268 was never false

269 return 

270 

271 if "iss" not in payload: 

272 raise MissingRequiredClaimError("iss") 

273 

274 if payload["iss"] != issuer: 

275 raise InvalidIssuerError("Invalid issuer") 

276 

277 

278_jwt_global_obj = PyJWT() 

279encode = _jwt_global_obj.encode 

280decode_complete = _jwt_global_obj.decode_complete 

281decode = _jwt_global_obj.decode