Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/rest_framework_simplejwt/tokens.py: 71%
144 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from datetime import timedelta
2from uuid import uuid4
4from django.conf import settings
5from django.utils.module_loading import import_string
6from django.utils.translation import gettext_lazy as _
8from .exceptions import TokenBackendError, TokenError
9from .settings import api_settings
10from .token_blacklist.models import BlacklistedToken, OutstandingToken
11from .utils import aware_utcnow, datetime_from_epoch, datetime_to_epoch, format_lazy
14class Token:
15 """
16 A class which validates and wraps an existing JWT or can be used to build a
17 new JWT.
18 """
20 token_type = None
21 lifetime = None
23 def __init__(self, token=None, verify=True):
24 """
25 !!!! IMPORTANT !!!! MUST raise a TokenError with a user-facing error
26 message if the given token is invalid, expired, or otherwise not safe
27 to use.
28 """
29 if self.token_type is None or self.lifetime is None: 29 ↛ 30line 29 didn't jump to line 30, because the condition on line 29 was never true
30 raise TokenError(_("Cannot create token with no type or lifetime"))
32 self.token = token
33 self.current_time = aware_utcnow()
35 # Set up token
36 if token is not None:
37 # An encoded token was provided
38 token_backend = self.get_token_backend()
40 # Decode token
41 try:
42 self.payload = token_backend.decode(token, verify=verify)
43 except TokenBackendError:
44 raise TokenError(_("Token is invalid or expired"))
46 if verify: 46 ↛ exitline 46 didn't return from function '__init__', because the condition on line 46 was never false
47 self.verify()
48 else:
49 # New token. Skip all the verification steps.
50 self.payload = {api_settings.TOKEN_TYPE_CLAIM: self.token_type}
52 # Set "exp" and "iat" claims with default value
53 self.set_exp(from_time=self.current_time, lifetime=self.lifetime)
54 self.set_iat(at_time=self.current_time)
56 # Set "jti" claim
57 self.set_jti()
59 def __repr__(self):
60 return repr(self.payload)
62 def __getitem__(self, key):
63 return self.payload[key]
65 def __setitem__(self, key, value):
66 self.payload[key] = value
68 def __delitem__(self, key):
69 del self.payload[key]
71 def __contains__(self, key):
72 return key in self.payload
74 def get(self, key, default=None):
75 return self.payload.get(key, default)
77 def __str__(self):
78 """
79 Signs and returns a token as a base64 encoded string.
80 """
81 return self.get_token_backend().encode(self.payload)
83 def verify(self):
84 """
85 Performs additional validation steps which were not performed when this
86 token was decoded. This method is part of the "public" API to indicate
87 the intention that it may be overridden in subclasses.
88 """
89 # According to RFC 7519, the "exp" claim is OPTIONAL
90 # (https://tools.ietf.org/html/rfc7519#section-4.1.4). As a more
91 # correct behavior for authorization tokens, we require an "exp"
92 # claim. We don't want any zombie tokens walking around.
93 self.check_exp()
95 # If the defaults are not None then we should enforce the
96 # requirement of these settings.As above, the spec labels
97 # these as optional.
98 if ( 98 ↛ 102line 98 didn't jump to line 102
99 api_settings.JTI_CLAIM is not None
100 and api_settings.JTI_CLAIM not in self.payload
101 ):
102 raise TokenError(_("Token has no id"))
104 if api_settings.TOKEN_TYPE_CLAIM is not None: 104 ↛ exitline 104 didn't return from function 'verify', because the condition on line 104 was never false
106 self.verify_token_type()
108 def verify_token_type(self):
109 """
110 Ensures that the token type claim is present and has the correct value.
111 """
112 try:
113 token_type = self.payload[api_settings.TOKEN_TYPE_CLAIM]
114 except KeyError:
115 raise TokenError(_("Token has no type"))
117 if self.token_type != token_type: 117 ↛ 118line 117 didn't jump to line 118, because the condition on line 117 was never true
118 raise TokenError(_("Token has wrong type"))
120 def set_jti(self):
121 """
122 Populates the configured jti claim of a token with a string where there
123 is a negligible probability that the same string will be chosen at a
124 later time.
126 See here:
127 https://tools.ietf.org/html/rfc7519#section-4.1.7
128 """
129 self.payload[api_settings.JTI_CLAIM] = uuid4().hex
131 def set_exp(self, claim="exp", from_time=None, lifetime=None):
132 """
133 Updates the expiration time of a token.
135 See here:
136 https://tools.ietf.org/html/rfc7519#section-4.1.4
137 """
138 if from_time is None: 138 ↛ 139line 138 didn't jump to line 139, because the condition on line 138 was never true
139 from_time = self.current_time
141 if lifetime is None:
142 lifetime = self.lifetime
144 self.payload[claim] = datetime_to_epoch(from_time + lifetime)
146 def set_iat(self, claim="iat", at_time=None):
147 """
148 Updates the time at which the token was issued.
150 See here:
151 https://tools.ietf.org/html/rfc7519#section-4.1.6
152 """
153 if at_time is None: 153 ↛ 154line 153 didn't jump to line 154, because the condition on line 153 was never true
154 at_time = self.current_time
156 self.payload[claim] = datetime_to_epoch(at_time)
158 def check_exp(self, claim="exp", current_time=None):
159 """
160 Checks whether a timestamp value in the given claim has passed (since
161 the given datetime value in `current_time`). Raises a TokenError with
162 a user-facing error message if so.
163 """
164 if current_time is None: 164 ↛ 167line 164 didn't jump to line 167, because the condition on line 164 was never false
165 current_time = self.current_time
167 try:
168 claim_value = self.payload[claim]
169 except KeyError:
170 raise TokenError(format_lazy(_("Token has no '{}' claim"), claim))
172 claim_time = datetime_from_epoch(claim_value)
173 leeway = self.get_token_backend().get_leeway()
174 if claim_time <= current_time - leeway: 174 ↛ 175line 174 didn't jump to line 175, because the condition on line 174 was never true
175 raise TokenError(format_lazy(_("Token '{}' claim has expired"), claim))
177 @classmethod
178 def for_user(cls, user):
179 """
180 Returns an authorization token for the given user that will be provided
181 after authenticating the user's credentials.
182 """
183 user_id = getattr(user, api_settings.USER_ID_FIELD)
184 if not isinstance(user_id, int): 184 ↛ 185line 184 didn't jump to line 185, because the condition on line 184 was never true
185 user_id = str(user_id)
187 token = cls()
188 token[api_settings.USER_ID_CLAIM] = user_id
190 return token
192 _token_backend = None
194 @property
195 def token_backend(self):
196 if self._token_backend is None:
197 self._token_backend = import_string(
198 "rest_framework_simplejwt.state.token_backend"
199 )
200 return self._token_backend
202 def get_token_backend(self):
203 # Backward compatibility.
204 return self.token_backend
207class BlacklistMixin:
208 """
209 If the `rest_framework_simplejwt.token_blacklist` app was configured to be
210 used, tokens created from `BlacklistMixin` subclasses will insert
211 themselves into an outstanding token list and also check for their
212 membership in a token blacklist.
213 """
215 if "rest_framework_simplejwt.token_blacklist" in settings.INSTALLED_APPS: 215 ↛ 217line 215 didn't jump to line 217, because the condition on line 215 was never true
217 def verify(self, *args, **kwargs):
218 self.check_blacklist()
220 super().verify(*args, **kwargs)
222 def check_blacklist(self):
223 """
224 Checks if this token is present in the token blacklist. Raises
225 `TokenError` if so.
226 """
227 jti = self.payload[api_settings.JTI_CLAIM]
229 if BlacklistedToken.objects.filter(token__jti=jti).exists():
230 raise TokenError(_("Token is blacklisted"))
232 def blacklist(self):
233 """
234 Ensures this token is included in the outstanding token list and
235 adds it to the blacklist.
236 """
237 jti = self.payload[api_settings.JTI_CLAIM]
238 exp = self.payload["exp"]
240 # Ensure outstanding token exists with given jti
241 token, _ = OutstandingToken.objects.get_or_create(
242 jti=jti,
243 defaults={
244 "token": str(self),
245 "expires_at": datetime_from_epoch(exp),
246 },
247 )
249 return BlacklistedToken.objects.get_or_create(token=token)
251 @classmethod
252 def for_user(cls, user):
253 """
254 Adds this token to the outstanding token list.
255 """
256 token = super().for_user(user)
258 jti = token[api_settings.JTI_CLAIM]
259 exp = token["exp"]
261 OutstandingToken.objects.create(
262 user=user,
263 jti=jti,
264 token=str(token),
265 created_at=token.current_time,
266 expires_at=datetime_from_epoch(exp),
267 )
269 return token
272class SlidingToken(BlacklistMixin, Token):
273 token_type = "sliding"
274 lifetime = api_settings.SLIDING_TOKEN_LIFETIME
276 def __init__(self, *args, **kwargs):
277 super().__init__(*args, **kwargs)
279 if self.token is None:
280 # Set sliding refresh expiration claim if new token
281 self.set_exp(
282 api_settings.SLIDING_TOKEN_REFRESH_EXP_CLAIM,
283 from_time=self.current_time,
284 lifetime=api_settings.SLIDING_TOKEN_REFRESH_LIFETIME,
285 )
288class AccessToken(Token):
289 token_type = "access"
290 lifetime = api_settings.ACCESS_TOKEN_LIFETIME
293class RefreshToken(BlacklistMixin, Token):
294 token_type = "refresh"
295 lifetime = api_settings.REFRESH_TOKEN_LIFETIME
296 no_copy_claims = (
297 api_settings.TOKEN_TYPE_CLAIM,
298 "exp",
299 # Both of these claims are included even though they may be the same.
300 # It seems possible that a third party token might have a custom or
301 # namespaced JTI claim as well as a default "jti" claim. In that case,
302 # we wouldn't want to copy either one.
303 api_settings.JTI_CLAIM,
304 "jti",
305 )
306 access_token_class = AccessToken
308 @property
309 def access_token(self):
310 """
311 Returns an access token created from this refresh token. Copies all
312 claims present in this refresh token to the new access token except
313 those claims listed in the `no_copy_claims` attribute.
314 """
315 access = self.access_token_class()
317 # Use instantiation time of refresh token as relative timestamp for
318 # access token "exp" claim. This ensures that both a refresh and
319 # access token expire relative to the same time if they are created as
320 # a pair.
321 access.set_exp(from_time=self.current_time)
323 no_copy = self.no_copy_claims
324 for claim, value in self.payload.items():
325 if claim in no_copy:
326 continue
327 access[claim] = value
329 return access
332class UntypedToken(Token):
333 token_type = "untyped"
334 lifetime = timedelta(seconds=0)
336 def verify_token_type(self):
337 """
338 Untyped tokens do not verify the "token_type" claim. This is useful
339 when performing general validation of a token's signature and other
340 properties which do not relate to the token's intended use.
341 """
342 pass