Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/rest_framework_simplejwt/tokens.py: 71%

144 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from datetime import timedelta 

2from uuid import uuid4 

3 

4from django.conf import settings 

5from django.utils.module_loading import import_string 

6from django.utils.translation import gettext_lazy as _ 

7 

8from .exceptions import TokenBackendError, TokenError 

9from .settings import api_settings 

10from .token_blacklist.models import BlacklistedToken, OutstandingToken 

11from .utils import aware_utcnow, datetime_from_epoch, datetime_to_epoch, format_lazy 

12 

13 

14class Token: 

15 """ 

16 A class which validates and wraps an existing JWT or can be used to build a 

17 new JWT. 

18 """ 

19 

20 token_type = None 

21 lifetime = None 

22 

23 def __init__(self, token=None, verify=True): 

24 """ 

25 !!!! IMPORTANT !!!! MUST raise a TokenError with a user-facing error 

26 message if the given token is invalid, expired, or otherwise not safe 

27 to use. 

28 """ 

29 if self.token_type is None or self.lifetime is None: 29 ↛ 30line 29 didn't jump to line 30, because the condition on line 29 was never true

30 raise TokenError(_("Cannot create token with no type or lifetime")) 

31 

32 self.token = token 

33 self.current_time = aware_utcnow() 

34 

35 # Set up token 

36 if token is not None: 

37 # An encoded token was provided 

38 token_backend = self.get_token_backend() 

39 

40 # Decode token 

41 try: 

42 self.payload = token_backend.decode(token, verify=verify) 

43 except TokenBackendError: 

44 raise TokenError(_("Token is invalid or expired")) 

45 

46 if verify: 46 ↛ exitline 46 didn't return from function '__init__', because the condition on line 46 was never false

47 self.verify() 

48 else: 

49 # New token. Skip all the verification steps. 

50 self.payload = {api_settings.TOKEN_TYPE_CLAIM: self.token_type} 

51 

52 # Set "exp" and "iat" claims with default value 

53 self.set_exp(from_time=self.current_time, lifetime=self.lifetime) 

54 self.set_iat(at_time=self.current_time) 

55 

56 # Set "jti" claim 

57 self.set_jti() 

58 

59 def __repr__(self): 

60 return repr(self.payload) 

61 

62 def __getitem__(self, key): 

63 return self.payload[key] 

64 

65 def __setitem__(self, key, value): 

66 self.payload[key] = value 

67 

68 def __delitem__(self, key): 

69 del self.payload[key] 

70 

71 def __contains__(self, key): 

72 return key in self.payload 

73 

74 def get(self, key, default=None): 

75 return self.payload.get(key, default) 

76 

77 def __str__(self): 

78 """ 

79 Signs and returns a token as a base64 encoded string. 

80 """ 

81 return self.get_token_backend().encode(self.payload) 

82 

83 def verify(self): 

84 """ 

85 Performs additional validation steps which were not performed when this 

86 token was decoded. This method is part of the "public" API to indicate 

87 the intention that it may be overridden in subclasses. 

88 """ 

89 # According to RFC 7519, the "exp" claim is OPTIONAL 

90 # (https://tools.ietf.org/html/rfc7519#section-4.1.4). As a more 

91 # correct behavior for authorization tokens, we require an "exp" 

92 # claim. We don't want any zombie tokens walking around. 

93 self.check_exp() 

94 

95 # If the defaults are not None then we should enforce the 

96 # requirement of these settings.As above, the spec labels 

97 # these as optional. 

98 if ( 98 ↛ 102line 98 didn't jump to line 102

99 api_settings.JTI_CLAIM is not None 

100 and api_settings.JTI_CLAIM not in self.payload 

101 ): 

102 raise TokenError(_("Token has no id")) 

103 

104 if api_settings.TOKEN_TYPE_CLAIM is not None: 104 ↛ exitline 104 didn't return from function 'verify', because the condition on line 104 was never false

105 

106 self.verify_token_type() 

107 

108 def verify_token_type(self): 

109 """ 

110 Ensures that the token type claim is present and has the correct value. 

111 """ 

112 try: 

113 token_type = self.payload[api_settings.TOKEN_TYPE_CLAIM] 

114 except KeyError: 

115 raise TokenError(_("Token has no type")) 

116 

117 if self.token_type != token_type: 117 ↛ 118line 117 didn't jump to line 118, because the condition on line 117 was never true

118 raise TokenError(_("Token has wrong type")) 

119 

120 def set_jti(self): 

121 """ 

122 Populates the configured jti claim of a token with a string where there 

123 is a negligible probability that the same string will be chosen at a 

124 later time. 

125 

126 See here: 

127 https://tools.ietf.org/html/rfc7519#section-4.1.7 

128 """ 

129 self.payload[api_settings.JTI_CLAIM] = uuid4().hex 

130 

131 def set_exp(self, claim="exp", from_time=None, lifetime=None): 

132 """ 

133 Updates the expiration time of a token. 

134 

135 See here: 

136 https://tools.ietf.org/html/rfc7519#section-4.1.4 

137 """ 

138 if from_time is None: 138 ↛ 139line 138 didn't jump to line 139, because the condition on line 138 was never true

139 from_time = self.current_time 

140 

141 if lifetime is None: 

142 lifetime = self.lifetime 

143 

144 self.payload[claim] = datetime_to_epoch(from_time + lifetime) 

145 

146 def set_iat(self, claim="iat", at_time=None): 

147 """ 

148 Updates the time at which the token was issued. 

149 

150 See here: 

151 https://tools.ietf.org/html/rfc7519#section-4.1.6 

152 """ 

153 if at_time is None: 153 ↛ 154line 153 didn't jump to line 154, because the condition on line 153 was never true

154 at_time = self.current_time 

155 

156 self.payload[claim] = datetime_to_epoch(at_time) 

157 

158 def check_exp(self, claim="exp", current_time=None): 

159 """ 

160 Checks whether a timestamp value in the given claim has passed (since 

161 the given datetime value in `current_time`). Raises a TokenError with 

162 a user-facing error message if so. 

163 """ 

164 if current_time is None: 164 ↛ 167line 164 didn't jump to line 167, because the condition on line 164 was never false

165 current_time = self.current_time 

166 

167 try: 

168 claim_value = self.payload[claim] 

169 except KeyError: 

170 raise TokenError(format_lazy(_("Token has no '{}' claim"), claim)) 

171 

172 claim_time = datetime_from_epoch(claim_value) 

173 leeway = self.get_token_backend().get_leeway() 

174 if claim_time <= current_time - leeway: 174 ↛ 175line 174 didn't jump to line 175, because the condition on line 174 was never true

175 raise TokenError(format_lazy(_("Token '{}' claim has expired"), claim)) 

176 

177 @classmethod 

178 def for_user(cls, user): 

179 """ 

180 Returns an authorization token for the given user that will be provided 

181 after authenticating the user's credentials. 

182 """ 

183 user_id = getattr(user, api_settings.USER_ID_FIELD) 

184 if not isinstance(user_id, int): 184 ↛ 185line 184 didn't jump to line 185, because the condition on line 184 was never true

185 user_id = str(user_id) 

186 

187 token = cls() 

188 token[api_settings.USER_ID_CLAIM] = user_id 

189 

190 return token 

191 

192 _token_backend = None 

193 

194 @property 

195 def token_backend(self): 

196 if self._token_backend is None: 

197 self._token_backend = import_string( 

198 "rest_framework_simplejwt.state.token_backend" 

199 ) 

200 return self._token_backend 

201 

202 def get_token_backend(self): 

203 # Backward compatibility. 

204 return self.token_backend 

205 

206 

207class BlacklistMixin: 

208 """ 

209 If the `rest_framework_simplejwt.token_blacklist` app was configured to be 

210 used, tokens created from `BlacklistMixin` subclasses will insert 

211 themselves into an outstanding token list and also check for their 

212 membership in a token blacklist. 

213 """ 

214 

215 if "rest_framework_simplejwt.token_blacklist" in settings.INSTALLED_APPS: 215 ↛ 217line 215 didn't jump to line 217, because the condition on line 215 was never true

216 

217 def verify(self, *args, **kwargs): 

218 self.check_blacklist() 

219 

220 super().verify(*args, **kwargs) 

221 

222 def check_blacklist(self): 

223 """ 

224 Checks if this token is present in the token blacklist. Raises 

225 `TokenError` if so. 

226 """ 

227 jti = self.payload[api_settings.JTI_CLAIM] 

228 

229 if BlacklistedToken.objects.filter(token__jti=jti).exists(): 

230 raise TokenError(_("Token is blacklisted")) 

231 

232 def blacklist(self): 

233 """ 

234 Ensures this token is included in the outstanding token list and 

235 adds it to the blacklist. 

236 """ 

237 jti = self.payload[api_settings.JTI_CLAIM] 

238 exp = self.payload["exp"] 

239 

240 # Ensure outstanding token exists with given jti 

241 token, _ = OutstandingToken.objects.get_or_create( 

242 jti=jti, 

243 defaults={ 

244 "token": str(self), 

245 "expires_at": datetime_from_epoch(exp), 

246 }, 

247 ) 

248 

249 return BlacklistedToken.objects.get_or_create(token=token) 

250 

251 @classmethod 

252 def for_user(cls, user): 

253 """ 

254 Adds this token to the outstanding token list. 

255 """ 

256 token = super().for_user(user) 

257 

258 jti = token[api_settings.JTI_CLAIM] 

259 exp = token["exp"] 

260 

261 OutstandingToken.objects.create( 

262 user=user, 

263 jti=jti, 

264 token=str(token), 

265 created_at=token.current_time, 

266 expires_at=datetime_from_epoch(exp), 

267 ) 

268 

269 return token 

270 

271 

272class SlidingToken(BlacklistMixin, Token): 

273 token_type = "sliding" 

274 lifetime = api_settings.SLIDING_TOKEN_LIFETIME 

275 

276 def __init__(self, *args, **kwargs): 

277 super().__init__(*args, **kwargs) 

278 

279 if self.token is None: 

280 # Set sliding refresh expiration claim if new token 

281 self.set_exp( 

282 api_settings.SLIDING_TOKEN_REFRESH_EXP_CLAIM, 

283 from_time=self.current_time, 

284 lifetime=api_settings.SLIDING_TOKEN_REFRESH_LIFETIME, 

285 ) 

286 

287 

288class AccessToken(Token): 

289 token_type = "access" 

290 lifetime = api_settings.ACCESS_TOKEN_LIFETIME 

291 

292 

293class RefreshToken(BlacklistMixin, Token): 

294 token_type = "refresh" 

295 lifetime = api_settings.REFRESH_TOKEN_LIFETIME 

296 no_copy_claims = ( 

297 api_settings.TOKEN_TYPE_CLAIM, 

298 "exp", 

299 # Both of these claims are included even though they may be the same. 

300 # It seems possible that a third party token might have a custom or 

301 # namespaced JTI claim as well as a default "jti" claim. In that case, 

302 # we wouldn't want to copy either one. 

303 api_settings.JTI_CLAIM, 

304 "jti", 

305 ) 

306 access_token_class = AccessToken 

307 

308 @property 

309 def access_token(self): 

310 """ 

311 Returns an access token created from this refresh token. Copies all 

312 claims present in this refresh token to the new access token except 

313 those claims listed in the `no_copy_claims` attribute. 

314 """ 

315 access = self.access_token_class() 

316 

317 # Use instantiation time of refresh token as relative timestamp for 

318 # access token "exp" claim. This ensures that both a refresh and 

319 # access token expire relative to the same time if they are created as 

320 # a pair. 

321 access.set_exp(from_time=self.current_time) 

322 

323 no_copy = self.no_copy_claims 

324 for claim, value in self.payload.items(): 

325 if claim in no_copy: 

326 continue 

327 access[claim] = value 

328 

329 return access 

330 

331 

332class UntypedToken(Token): 

333 token_type = "untyped" 

334 lifetime = timedelta(seconds=0) 

335 

336 def verify_token_type(self): 

337 """ 

338 Untyped tokens do not verify the "token_type" claim. This is useful 

339 when performing general validation of a token's signature and other 

340 properties which do not relate to the token's intended use. 

341 """ 

342 pass