1from __future__ import annotations 


3import json 

4import warnings 

5from calendar import timegm 

6from import Iterable, Mapping 

7from datetime import datetime, timedelta, timezone 

8from typing import Any, Dict, List, Optional, Type, Union 


10from . import api_jws 

11from .exceptions import ( 

12 DecodeError, 

13 ExpiredSignatureError, 

14 ImmatureSignatureError, 

15 InvalidAudienceError, 

16 InvalidIssuedAtError, 

17 InvalidIssuerError, 

18 MissingRequiredClaimError, 


20from .warnings import RemovedInPyjwt3Warning 



23class PyJWT: 

24 def __init__(self, options=None): 

25 if options is None: 25 ↛ 27line 25 didn't jump to line 27, because the condition on line 25 was never false

26 options = {} 

27 self.options = {**self._get_default_options(), **options} 


29 @staticmethod 

30 def _get_default_options() -> Dict[str, Union[bool, List[str]]]: 

31 return { 

32 "verify_signature": True, 

33 "verify_exp": True, 

34 "verify_nbf": True, 

35 "verify_iat": True, 

36 "verify_aud": True, 

37 "verify_iss": True, 

38 "require": [], 

39 } 


41 def encode( 

42 self, 

43 payload: Dict[str, Any], 

44 key: str, 

45 algorithm: Optional[str] = "HS256", 

46 headers: Optional[Dict[str, Any]] = None, 

47 json_encoder: Optional[Type[json.JSONEncoder]] = None, 

48 ) -> str: 

49 # Check that we get a mapping 

50 if not isinstance(payload, Mapping): 50 ↛ 51line 50 didn't jump to line 51, because the condition on line 50 was never true

51 raise TypeError( 

52 "Expecting a mapping object, as JWT only supports " 

53 "JSON objects as payloads." 

54 ) 


56 # Payload 

57 payload = payload.copy() 

58 for time_claim in ["exp", "iat", "nbf"]: 

59 # Convert datetime to a intDate value in known time-format claims 

60 if isinstance(payload.get(time_claim), datetime): 60 ↛ 61line 60 didn't jump to line 61, because the condition on line 60 was never true

61 payload[time_claim] = timegm(payload[time_claim].utctimetuple()) 


63 json_payload = json.dumps( 

64 payload, separators=(",", ":"), cls=json_encoder 

65 ).encode("utf-8") 


67 return api_jws.encode(json_payload, key, algorithm, headers, json_encoder) 


69 def decode_complete( 

70 self, 

71 jwt: str, 

72 key: str = "", 

73 algorithms: Optional[List[str]] = None, 

74 options: Optional[Dict[str, Any]] = None, 

75 # deprecated arg, remove in pyjwt3 

76 verify: Optional[bool] = None, 

77 # could be used as passthrough to api_jws, consider removal in pyjwt3 

78 detached_payload: Optional[bytes] = None, 

79 # passthrough arguments to _validate_claims 

80 # consider putting in options 

81 audience: Optional[Union[str, Iterable[str]]] = None, 

82 issuer: Optional[str] = None, 

83 leeway: Union[int, float, timedelta] = 0, 

84 # kwargs 

85 **kwargs, 

86 ) -> Dict[str, Any]: 

87 if kwargs: 87 ↛ 88line 87 didn't jump to line 88, because the condition on line 87 was never true

88 warnings.warn( 

89 "passing additional kwargs to decode_complete() is deprecated " 

90 "and will be removed in pyjwt version 3. " 

91 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

92 RemovedInPyjwt3Warning, 

93 ) 

94 options = dict(options or {}) # shallow-copy or initialize an empty dict 

95 options.setdefault("verify_signature", True) 


97 # If the user has set the legacy `verify` argument, and it doesn't match 

98 # what the relevant `options` entry for the argument is, inform the user 

99 # that they're likely making a mistake. 

100 if verify is not None and verify != options["verify_signature"]: 100 ↛ 101line 100 didn't jump to line 101, because the condition on line 100 was never true

101 warnings.warn( 

102 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. " 

103 "The equivalent is setting `verify_signature` to False in the `options` dictionary. " 

104 "This invocation has a mismatch between the kwarg and the option entry.", 

105 category=DeprecationWarning, 

106 ) 


108 if not options["verify_signature"]: 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true

109 options.setdefault("verify_exp", False) 

110 options.setdefault("verify_nbf", False) 

111 options.setdefault("verify_iat", False) 

112 options.setdefault("verify_aud", False) 

113 options.setdefault("verify_iss", False) 


115 if options["verify_signature"] and not algorithms: 115 ↛ 116line 115 didn't jump to line 116, because the condition on line 115 was never true

116 raise DecodeError( 

117 'It is required that you pass in a value for the "algorithms" argument when calling decode().' 

118 ) 


120 decoded = api_jws.decode_complete( 

121 jwt, 

122 key=key, 

123 algorithms=algorithms, 

124 options=options, 

125 detached_payload=detached_payload, 

126 ) 


128 try: 

129 payload = json.loads(decoded["payload"]) 

130 except ValueError as e: 

131 raise DecodeError(f"Invalid payload string: {e}") 

132 if not isinstance(payload, dict): 132 ↛ 133line 132 didn't jump to line 133, because the condition on line 132 was never true

133 raise DecodeError("Invalid payload string: must be a json object") 


135 merged_options = {**self.options, **options} 

136 self._validate_claims( 

137 payload, merged_options, audience=audience, issuer=issuer, leeway=leeway 

138 ) 


140 decoded["payload"] = payload 

141 return decoded 


143 def decode( 

144 self, 

145 jwt: str, 

146 key: str = "", 

147 algorithms: Optional[List[str]] = None, 

148 options: Optional[Dict[str, Any]] = None, 

149 # deprecated arg, remove in pyjwt3 

150 verify: Optional[bool] = None, 

151 # could be used as passthrough to api_jws, consider removal in pyjwt3 

152 detached_payload: Optional[bytes] = None, 

153 # passthrough arguments to _validate_claims 

154 # consider putting in options 

155 audience: Optional[Union[str, Iterable[str]]] = None, 

156 issuer: Optional[str] = None, 

157 leeway: Union[int, float, timedelta] = 0, 

158 # kwargs 

159 **kwargs, 

160 ) -> Dict[str, Any]: 

161 if kwargs: 161 ↛ 162line 161 didn't jump to line 162, because the condition on line 161 was never true

162 warnings.warn( 

163 "passing additional kwargs to decode() is deprecated " 

164 "and will be removed in pyjwt version 3. " 

165 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

166 RemovedInPyjwt3Warning, 

167 ) 

168 decoded = self.decode_complete( 

169 jwt, 

170 key, 

171 algorithms, 

172 options, 

173 verify=verify, 

174 detached_payload=detached_payload, 

175 audience=audience, 

176 issuer=issuer, 

177 leeway=leeway, 

178 ) 

179 return decoded["payload"] 


181 def _validate_claims(self, payload, options, audience=None, issuer=None, leeway=0): 

182 if isinstance(leeway, timedelta): 182 ↛ 185line 182 didn't jump to line 185, because the condition on line 182 was never false

183 leeway = leeway.total_seconds() 


185 if audience is not None and not isinstance(audience, (str, Iterable)): 185 ↛ 186line 185 didn't jump to line 186, because the condition on line 185 was never true

186 raise TypeError("audience must be a string, iterable or None") 


188 self._validate_required_claims(payload, options) 


190 now = timegm( 


192 if "iat" in payload and options["verify_iat"]: 192 ↛ 195line 192 didn't jump to line 195, because the condition on line 192 was never false

193 self._validate_iat(payload, now, leeway) 


195 if "nbf" in payload and options["verify_nbf"]: 195 ↛ 196line 195 didn't jump to line 196, because the condition on line 195 was never true

196 self._validate_nbf(payload, now, leeway) 


198 if "exp" in payload and options["verify_exp"]: 198 ↛ 201line 198 didn't jump to line 201, because the condition on line 198 was never false

199 self._validate_exp(payload, now, leeway) 


201 if options["verify_iss"]: 201 ↛ 204line 201 didn't jump to line 204, because the condition on line 201 was never false

202 self._validate_iss(payload, issuer) 


204 if options["verify_aud"]: 204 ↛ 205line 204 didn't jump to line 205, because the condition on line 204 was never true

205 self._validate_aud(payload, audience) 


207 def _validate_required_claims(self, payload, options): 

208 for claim in options["require"]: 208 ↛ 209line 208 didn't jump to line 209, because the loop on line 208 never started

209 if payload.get(claim) is None: 

210 raise MissingRequiredClaimError(claim) 


212 def _validate_iat(self, payload, now, leeway): 

213 iat = payload["iat"] 

214 try: 

215 int(iat) 

216 except ValueError: 

217 raise InvalidIssuedAtError("Issued At claim (iat) must be an integer.") 

218 if iat > (now + leeway): 218 ↛ 219line 218 didn't jump to line 219, because the condition on line 218 was never true

219 raise ImmatureSignatureError("The token is not yet valid (iat)") 


221 def _validate_nbf(self, payload, now, leeway): 

222 try: 

223 nbf = int(payload["nbf"]) 

224 except ValueError: 

225 raise DecodeError("Not Before claim (nbf) must be an integer.") 


227 if nbf > (now + leeway): 

228 raise ImmatureSignatureError("The token is not yet valid (nbf)") 


230 def _validate_exp(self, payload, now, leeway): 

231 try: 

232 exp = int(payload["exp"]) 

233 except ValueError: 

234 raise DecodeError("Expiration Time claim (exp) must be an" " integer.") 


236 if exp <= (now - leeway): 236 ↛ 237line 236 didn't jump to line 237, because the condition on line 236 was never true

237 raise ExpiredSignatureError("Signature has expired") 


239 def _validate_aud(self, payload, audience): 

240 if audience is None: 

241 if "aud" not in payload or not payload["aud"]: 

242 return 

243 # Application did not specify an audience, but 

244 # the token has the 'aud' claim 

245 raise InvalidAudienceError("Invalid audience") 


247 if "aud" not in payload or not payload["aud"]: 

248 # Application specified an audience, but it could not be 

249 # verified since the token does not contain a claim. 

250 raise MissingRequiredClaimError("aud") 


252 audience_claims = payload["aud"] 


254 if isinstance(audience_claims, str): 

255 audience_claims = [audience_claims] 

256 if not isinstance(audience_claims, list): 

257 raise InvalidAudienceError("Invalid claim format in token") 

258 if any(not isinstance(c, str) for c in audience_claims): 

259 raise InvalidAudienceError("Invalid claim format in token") 


261 if isinstance(audience, str): 

262 audience = [audience] 


264 if all(aud not in audience_claims for aud in audience): 

265 raise InvalidAudienceError("Invalid audience") 


267 def _validate_iss(self, payload, issuer): 

268 if issuer is None: 268 ↛ 271line 268 didn't jump to line 271, because the condition on line 268 was never false

269 return 


271 if "iss" not in payload: 

272 raise MissingRequiredClaimError("iss") 


274 if payload["iss"] != issuer: 

275 raise InvalidIssuerError("Invalid issuer") 



278_jwt_global_obj = PyJWT() 

279encode = _jwt_global_obj.encode 

280decode_complete = _jwt_global_obj.decode_complete 

281decode = _jwt_global_obj.decode