Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/jwt/api_jwt.py: 51%
130 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import annotations
3import json
4import warnings
5from calendar import timegm
6from collections.abc import Iterable, Mapping
7from datetime import datetime, timedelta, timezone
8from typing import Any, Dict, List, Optional, Type, Union
10from . import api_jws
11from .exceptions import (
12 DecodeError,
13 ExpiredSignatureError,
14 ImmatureSignatureError,
15 InvalidAudienceError,
16 InvalidIssuedAtError,
17 InvalidIssuerError,
18 MissingRequiredClaimError,
19)
20from .warnings import RemovedInPyjwt3Warning
23class PyJWT:
24 def __init__(self, options=None):
25 if options is None: 25 ↛ 27line 25 didn't jump to line 27, because the condition on line 25 was never false
26 options = {}
27 self.options = {**self._get_default_options(), **options}
29 @staticmethod
30 def _get_default_options() -> Dict[str, Union[bool, List[str]]]:
31 return {
32 "verify_signature": True,
33 "verify_exp": True,
34 "verify_nbf": True,
35 "verify_iat": True,
36 "verify_aud": True,
37 "verify_iss": True,
38 "require": [],
39 }
41 def encode(
42 self,
43 payload: Dict[str, Any],
44 key: str,
45 algorithm: Optional[str] = "HS256",
46 headers: Optional[Dict[str, Any]] = None,
47 json_encoder: Optional[Type[json.JSONEncoder]] = None,
48 ) -> str:
49 # Check that we get a mapping
50 if not isinstance(payload, Mapping): 50 ↛ 51line 50 didn't jump to line 51, because the condition on line 50 was never true
51 raise TypeError(
52 "Expecting a mapping object, as JWT only supports "
53 "JSON objects as payloads."
54 )
56 # Payload
57 payload = payload.copy()
58 for time_claim in ["exp", "iat", "nbf"]:
59 # Convert datetime to a intDate value in known time-format claims
60 if isinstance(payload.get(time_claim), datetime): 60 ↛ 61line 60 didn't jump to line 61, because the condition on line 60 was never true
61 payload[time_claim] = timegm(payload[time_claim].utctimetuple())
63 json_payload = json.dumps(
64 payload, separators=(",", ":"), cls=json_encoder
65 ).encode("utf-8")
67 return api_jws.encode(json_payload, key, algorithm, headers, json_encoder)
69 def decode_complete(
70 self,
71 jwt: str,
72 key: str = "",
73 algorithms: Optional[List[str]] = None,
74 options: Optional[Dict[str, Any]] = None,
75 # deprecated arg, remove in pyjwt3
76 verify: Optional[bool] = None,
77 # could be used as passthrough to api_jws, consider removal in pyjwt3
78 detached_payload: Optional[bytes] = None,
79 # passthrough arguments to _validate_claims
80 # consider putting in options
81 audience: Optional[Union[str, Iterable[str]]] = None,
82 issuer: Optional[str] = None,
83 leeway: Union[int, float, timedelta] = 0,
84 # kwargs
85 **kwargs,
86 ) -> Dict[str, Any]:
87 if kwargs: 87 ↛ 88line 87 didn't jump to line 88, because the condition on line 87 was never true
88 warnings.warn(
89 "passing additional kwargs to decode_complete() is deprecated "
90 "and will be removed in pyjwt version 3. "
91 f"Unsupported kwargs: {tuple(kwargs.keys())}",
92 RemovedInPyjwt3Warning,
93 )
94 options = dict(options or {}) # shallow-copy or initialize an empty dict
95 options.setdefault("verify_signature", True)
97 # If the user has set the legacy `verify` argument, and it doesn't match
98 # what the relevant `options` entry for the argument is, inform the user
99 # that they're likely making a mistake.
100 if verify is not None and verify != options["verify_signature"]: 100 ↛ 101line 100 didn't jump to line 101, because the condition on line 100 was never true
101 warnings.warn(
102 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. "
103 "The equivalent is setting `verify_signature` to False in the `options` dictionary. "
104 "This invocation has a mismatch between the kwarg and the option entry.",
105 category=DeprecationWarning,
106 )
108 if not options["verify_signature"]: 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true
109 options.setdefault("verify_exp", False)
110 options.setdefault("verify_nbf", False)
111 options.setdefault("verify_iat", False)
112 options.setdefault("verify_aud", False)
113 options.setdefault("verify_iss", False)
115 if options["verify_signature"] and not algorithms: 115 ↛ 116line 115 didn't jump to line 116, because the condition on line 115 was never true
116 raise DecodeError(
117 'It is required that you pass in a value for the "algorithms" argument when calling decode().'
118 )
120 decoded = api_jws.decode_complete(
121 jwt,
122 key=key,
123 algorithms=algorithms,
124 options=options,
125 detached_payload=detached_payload,
126 )
128 try:
129 payload = json.loads(decoded["payload"])
130 except ValueError as e:
131 raise DecodeError(f"Invalid payload string: {e}")
132 if not isinstance(payload, dict): 132 ↛ 133line 132 didn't jump to line 133, because the condition on line 132 was never true
133 raise DecodeError("Invalid payload string: must be a json object")
135 merged_options = {**self.options, **options}
136 self._validate_claims(
137 payload, merged_options, audience=audience, issuer=issuer, leeway=leeway
138 )
140 decoded["payload"] = payload
141 return decoded
143 def decode(
144 self,
145 jwt: str,
146 key: str = "",
147 algorithms: Optional[List[str]] = None,
148 options: Optional[Dict[str, Any]] = None,
149 # deprecated arg, remove in pyjwt3
150 verify: Optional[bool] = None,
151 # could be used as passthrough to api_jws, consider removal in pyjwt3
152 detached_payload: Optional[bytes] = None,
153 # passthrough arguments to _validate_claims
154 # consider putting in options
155 audience: Optional[Union[str, Iterable[str]]] = None,
156 issuer: Optional[str] = None,
157 leeway: Union[int, float, timedelta] = 0,
158 # kwargs
159 **kwargs,
160 ) -> Dict[str, Any]:
161 if kwargs: 161 ↛ 162line 161 didn't jump to line 162, because the condition on line 161 was never true
162 warnings.warn(
163 "passing additional kwargs to decode() is deprecated "
164 "and will be removed in pyjwt version 3. "
165 f"Unsupported kwargs: {tuple(kwargs.keys())}",
166 RemovedInPyjwt3Warning,
167 )
168 decoded = self.decode_complete(
169 jwt,
170 key,
171 algorithms,
172 options,
173 verify=verify,
174 detached_payload=detached_payload,
175 audience=audience,
176 issuer=issuer,
177 leeway=leeway,
178 )
179 return decoded["payload"]
181 def _validate_claims(self, payload, options, audience=None, issuer=None, leeway=0):
182 if isinstance(leeway, timedelta): 182 ↛ 185line 182 didn't jump to line 185, because the condition on line 182 was never false
183 leeway = leeway.total_seconds()
185 if audience is not None and not isinstance(audience, (str, Iterable)): 185 ↛ 186line 185 didn't jump to line 186, because the condition on line 185 was never true
186 raise TypeError("audience must be a string, iterable or None")
188 self._validate_required_claims(payload, options)
190 now = timegm(datetime.now(tz=timezone.utc).utctimetuple())
192 if "iat" in payload and options["verify_iat"]: 192 ↛ 195line 192 didn't jump to line 195, because the condition on line 192 was never false
193 self._validate_iat(payload, now, leeway)
195 if "nbf" in payload and options["verify_nbf"]: 195 ↛ 196line 195 didn't jump to line 196, because the condition on line 195 was never true
196 self._validate_nbf(payload, now, leeway)
198 if "exp" in payload and options["verify_exp"]: 198 ↛ 201line 198 didn't jump to line 201, because the condition on line 198 was never false
199 self._validate_exp(payload, now, leeway)
201 if options["verify_iss"]: 201 ↛ 204line 201 didn't jump to line 204, because the condition on line 201 was never false
202 self._validate_iss(payload, issuer)
204 if options["verify_aud"]: 204 ↛ 205line 204 didn't jump to line 205, because the condition on line 204 was never true
205 self._validate_aud(payload, audience)
207 def _validate_required_claims(self, payload, options):
208 for claim in options["require"]: 208 ↛ 209line 208 didn't jump to line 209, because the loop on line 208 never started
209 if payload.get(claim) is None:
210 raise MissingRequiredClaimError(claim)
212 def _validate_iat(self, payload, now, leeway):
213 iat = payload["iat"]
214 try:
215 int(iat)
216 except ValueError:
217 raise InvalidIssuedAtError("Issued At claim (iat) must be an integer.")
218 if iat > (now + leeway): 218 ↛ 219line 218 didn't jump to line 219, because the condition on line 218 was never true
219 raise ImmatureSignatureError("The token is not yet valid (iat)")
221 def _validate_nbf(self, payload, now, leeway):
222 try:
223 nbf = int(payload["nbf"])
224 except ValueError:
225 raise DecodeError("Not Before claim (nbf) must be an integer.")
227 if nbf > (now + leeway):
228 raise ImmatureSignatureError("The token is not yet valid (nbf)")
230 def _validate_exp(self, payload, now, leeway):
231 try:
232 exp = int(payload["exp"])
233 except ValueError:
234 raise DecodeError("Expiration Time claim (exp) must be an" " integer.")
236 if exp <= (now - leeway): 236 ↛ 237line 236 didn't jump to line 237, because the condition on line 236 was never true
237 raise ExpiredSignatureError("Signature has expired")
239 def _validate_aud(self, payload, audience):
240 if audience is None:
241 if "aud" not in payload or not payload["aud"]:
242 return
243 # Application did not specify an audience, but
244 # the token has the 'aud' claim
245 raise InvalidAudienceError("Invalid audience")
247 if "aud" not in payload or not payload["aud"]:
248 # Application specified an audience, but it could not be
249 # verified since the token does not contain a claim.
250 raise MissingRequiredClaimError("aud")
252 audience_claims = payload["aud"]
254 if isinstance(audience_claims, str):
255 audience_claims = [audience_claims]
256 if not isinstance(audience_claims, list):
257 raise InvalidAudienceError("Invalid claim format in token")
258 if any(not isinstance(c, str) for c in audience_claims):
259 raise InvalidAudienceError("Invalid claim format in token")
261 if isinstance(audience, str):
262 audience = [audience]
264 if all(aud not in audience_claims for aud in audience):
265 raise InvalidAudienceError("Invalid audience")
267 def _validate_iss(self, payload, issuer):
268 if issuer is None: 268 ↛ 271line 268 didn't jump to line 271, because the condition on line 268 was never false
269 return
271 if "iss" not in payload:
272 raise MissingRequiredClaimError("iss")
274 if payload["iss"] != issuer:
275 raise InvalidIssuerError("Invalid issuer")
278_jwt_global_obj = PyJWT()
279encode = _jwt_global_obj.encode
280decode_complete = _jwt_global_obj.decode_complete
281decode = _jwt_global_obj.decode