Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/rest_framework_simplejwt/backends.py: 62%
63 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import json
2from datetime import timedelta
3from typing import Optional, Type, Union
5import jwt
6from django.utils.translation import gettext_lazy as _
7from jwt import InvalidAlgorithmError, InvalidTokenError, algorithms
9from .exceptions import TokenBackendError
10from .utils import format_lazy
12try:
13 from jwt import PyJWKClient
15 JWK_CLIENT_AVAILABLE = True
16except ImportError:
17 JWK_CLIENT_AVAILABLE = False
19ALLOWED_ALGORITHMS = {
20 "HS256",
21 "HS384",
22 "HS512",
23 "RS256",
24 "RS384",
25 "RS512",
26 "ES256",
27 "ES384",
28 "ES512",
29}
32class TokenBackend:
33 def __init__(
34 self,
35 algorithm,
36 signing_key=None,
37 verifying_key="",
38 audience=None,
39 issuer=None,
40 jwk_url: str = None,
41 leeway: Union[float, int, timedelta] = None,
42 json_encoder: Optional[Type[json.JSONEncoder]] = None,
43 ):
44 self._validate_algorithm(algorithm)
46 self.algorithm = algorithm
47 self.signing_key = signing_key
48 self.verifying_key = verifying_key
49 self.audience = audience
50 self.issuer = issuer
52 if JWK_CLIENT_AVAILABLE: 52 ↛ 55line 52 didn't jump to line 55, because the condition on line 52 was never false
53 self.jwks_client = PyJWKClient(jwk_url) if jwk_url else None
54 else:
55 self.jwks_client = None
57 self.leeway = leeway
58 self.json_encoder = json_encoder
60 def _validate_algorithm(self, algorithm):
61 """
62 Ensure that the nominated algorithm is recognized, and that cryptography is installed for those
63 algorithms that require it
64 """
65 if algorithm not in ALLOWED_ALGORITHMS: 65 ↛ 66line 65 didn't jump to line 66, because the condition on line 65 was never true
66 raise TokenBackendError(
67 format_lazy(_("Unrecognized algorithm type '{}'"), algorithm)
68 )
70 if algorithm in algorithms.requires_cryptography and not algorithms.has_crypto: 70 ↛ 71line 70 didn't jump to line 71, because the condition on line 70 was never true
71 raise TokenBackendError(
72 format_lazy(
73 _("You must have cryptography installed to use {}."), algorithm
74 )
75 )
77 def get_leeway(self) -> timedelta:
78 if self.leeway is None: 78 ↛ 79line 78 didn't jump to line 79, because the condition on line 78 was never true
79 return timedelta(seconds=0)
80 elif isinstance(self.leeway, (int, float)): 80 ↛ 82line 80 didn't jump to line 82, because the condition on line 80 was never false
81 return timedelta(seconds=self.leeway)
82 elif isinstance(self.leeway, timedelta):
83 return self.leeway
84 else:
85 raise TokenBackendError(
86 format_lazy(
87 _(
88 "Unrecognized type '{}', 'leeway' must be of type int, float or timedelta."
89 ),
90 type(self.leeway),
91 )
92 )
94 def get_verifying_key(self, token):
95 if self.algorithm.startswith("HS"): 95 ↛ 98line 95 didn't jump to line 98, because the condition on line 95 was never false
96 return self.signing_key
98 if self.jwks_client:
99 return self.jwks_client.get_signing_key_from_jwt(token).key
101 return self.verifying_key
103 def encode(self, payload):
104 """
105 Returns an encoded token for the given payload dictionary.
106 """
107 jwt_payload = payload.copy()
108 if self.audience is not None: 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true
109 jwt_payload["aud"] = self.audience
110 if self.issuer is not None: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true
111 jwt_payload["iss"] = self.issuer
113 token = jwt.encode(
114 jwt_payload,
115 self.signing_key,
116 algorithm=self.algorithm,
117 json_encoder=self.json_encoder,
118 )
119 if isinstance(token, bytes): 119 ↛ 121line 119 didn't jump to line 121, because the condition on line 119 was never true
120 # For PyJWT <= 1.7.1
121 return token.decode("utf-8")
122 # For PyJWT >= 2.0.0a1
123 return token
125 def decode(self, token, verify=True):
126 """
127 Performs a validation of the given token and returns its payload
128 dictionary.
130 Raises a `TokenBackendError` if the token is malformed, if its
131 signature check fails, or if its 'exp' claim indicates it has expired.
132 """
133 try:
134 return jwt.decode(
135 token,
136 self.get_verifying_key(token),
137 algorithms=[self.algorithm],
138 audience=self.audience,
139 issuer=self.issuer,
140 leeway=self.get_leeway(),
141 options={
142 "verify_aud": self.audience is not None,
143 "verify_signature": verify,
144 },
145 )
146 except InvalidAlgorithmError as ex:
147 raise TokenBackendError(_("Invalid algorithm specified")) from ex
148 except InvalidTokenError:
149 raise TokenBackendError(_("Token is invalid or expired"))