Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/rest_framework_simplejwt/backends.py: 62%

63 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import json 

2from datetime import timedelta 

3from typing import Optional, Type, Union 

4 

5import jwt 

6from django.utils.translation import gettext_lazy as _ 

7from jwt import InvalidAlgorithmError, InvalidTokenError, algorithms 

8 

9from .exceptions import TokenBackendError 

10from .utils import format_lazy 

11 

12try: 

13 from jwt import PyJWKClient 

14 

15 JWK_CLIENT_AVAILABLE = True 

16except ImportError: 

17 JWK_CLIENT_AVAILABLE = False 

18 

19ALLOWED_ALGORITHMS = { 

20 "HS256", 

21 "HS384", 

22 "HS512", 

23 "RS256", 

24 "RS384", 

25 "RS512", 

26 "ES256", 

27 "ES384", 

28 "ES512", 

29} 

30 

31 

32class TokenBackend: 

33 def __init__( 

34 self, 

35 algorithm, 

36 signing_key=None, 

37 verifying_key="", 

38 audience=None, 

39 issuer=None, 

40 jwk_url: str = None, 

41 leeway: Union[float, int, timedelta] = None, 

42 json_encoder: Optional[Type[json.JSONEncoder]] = None, 

43 ): 

44 self._validate_algorithm(algorithm) 

45 

46 self.algorithm = algorithm 

47 self.signing_key = signing_key 

48 self.verifying_key = verifying_key 

49 self.audience = audience 

50 self.issuer = issuer 

51 

52 if JWK_CLIENT_AVAILABLE: 52 ↛ 55line 52 didn't jump to line 55, because the condition on line 52 was never false

53 self.jwks_client = PyJWKClient(jwk_url) if jwk_url else None 

54 else: 

55 self.jwks_client = None 

56 

57 self.leeway = leeway 

58 self.json_encoder = json_encoder 

59 

60 def _validate_algorithm(self, algorithm): 

61 """ 

62 Ensure that the nominated algorithm is recognized, and that cryptography is installed for those 

63 algorithms that require it 

64 """ 

65 if algorithm not in ALLOWED_ALGORITHMS: 65 ↛ 66line 65 didn't jump to line 66, because the condition on line 65 was never true

66 raise TokenBackendError( 

67 format_lazy(_("Unrecognized algorithm type '{}'"), algorithm) 

68 ) 

69 

70 if algorithm in algorithms.requires_cryptography and not algorithms.has_crypto: 70 ↛ 71line 70 didn't jump to line 71, because the condition on line 70 was never true

71 raise TokenBackendError( 

72 format_lazy( 

73 _("You must have cryptography installed to use {}."), algorithm 

74 ) 

75 ) 

76 

77 def get_leeway(self) -> timedelta: 

78 if self.leeway is None: 78 ↛ 79line 78 didn't jump to line 79, because the condition on line 78 was never true

79 return timedelta(seconds=0) 

80 elif isinstance(self.leeway, (int, float)): 80 ↛ 82line 80 didn't jump to line 82, because the condition on line 80 was never false

81 return timedelta(seconds=self.leeway) 

82 elif isinstance(self.leeway, timedelta): 

83 return self.leeway 

84 else: 

85 raise TokenBackendError( 

86 format_lazy( 

87 _( 

88 "Unrecognized type '{}', 'leeway' must be of type int, float or timedelta." 

89 ), 

90 type(self.leeway), 

91 ) 

92 ) 

93 

94 def get_verifying_key(self, token): 

95 if self.algorithm.startswith("HS"): 95 ↛ 98line 95 didn't jump to line 98, because the condition on line 95 was never false

96 return self.signing_key 

97 

98 if self.jwks_client: 

99 return self.jwks_client.get_signing_key_from_jwt(token).key 

100 

101 return self.verifying_key 

102 

103 def encode(self, payload): 

104 """ 

105 Returns an encoded token for the given payload dictionary. 

106 """ 

107 jwt_payload = payload.copy() 

108 if self.audience is not None: 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true

109 jwt_payload["aud"] = self.audience 

110 if self.issuer is not None: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true

111 jwt_payload["iss"] = self.issuer 

112 

113 token = jwt.encode( 

114 jwt_payload, 

115 self.signing_key, 

116 algorithm=self.algorithm, 

117 json_encoder=self.json_encoder, 

118 ) 

119 if isinstance(token, bytes): 119 ↛ 121line 119 didn't jump to line 121, because the condition on line 119 was never true

120 # For PyJWT <= 1.7.1 

121 return token.decode("utf-8") 

122 # For PyJWT >= 2.0.0a1 

123 return token 

124 

125 def decode(self, token, verify=True): 

126 """ 

127 Performs a validation of the given token and returns its payload 

128 dictionary. 

129 

130 Raises a `TokenBackendError` if the token is malformed, if its 

131 signature check fails, or if its 'exp' claim indicates it has expired. 

132 """ 

133 try: 

134 return jwt.decode( 

135 token, 

136 self.get_verifying_key(token), 

137 algorithms=[self.algorithm], 

138 audience=self.audience, 

139 issuer=self.issuer, 

140 leeway=self.get_leeway(), 

141 options={ 

142 "verify_aud": self.audience is not None, 

143 "verify_signature": verify, 

144 }, 

145 ) 

146 except InvalidAlgorithmError as ex: 

147 raise TokenBackendError(_("Invalid algorithm specified")) from ex 

148 except InvalidTokenError: 

149 raise TokenBackendError(_("Token is invalid or expired"))