Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/jwt/api_jws.py: 54%

160 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import annotations 

2 

3import binascii 

4import json 

5import warnings 

6from typing import Any, Type 

7 

8from .algorithms import ( 

9 Algorithm, 

10 get_default_algorithms, 

11 has_crypto, 

12 requires_cryptography, 

13) 

14from .exceptions import ( 

15 DecodeError, 

16 InvalidAlgorithmError, 

17 InvalidSignatureError, 

18 InvalidTokenError, 

19) 

20from .utils import base64url_decode, base64url_encode 

21from .warnings import RemovedInPyjwt3Warning 

22 

23 

24class PyJWS: 

25 header_typ = "JWT" 

26 

27 def __init__(self, algorithms=None, options=None) -> None: 

28 self._algorithms = get_default_algorithms() 

29 self._valid_algs = ( 

30 set(algorithms) if algorithms is not None else set(self._algorithms) 

31 ) 

32 

33 # Remove algorithms that aren't on the whitelist 

34 for key in list(self._algorithms.keys()): 

35 if key not in self._valid_algs: 35 ↛ 36line 35 didn't jump to line 36, because the condition on line 35 was never true

36 del self._algorithms[key] 

37 

38 if options is None: 38 ↛ 40line 38 didn't jump to line 40, because the condition on line 38 was never false

39 options = {} 

40 self.options = {**self._get_default_options(), **options} 

41 

42 @staticmethod 

43 def _get_default_options() -> dict[str, bool]: 

44 return {"verify_signature": True} 

45 

46 def register_algorithm(self, alg_id: str, alg_obj: Algorithm) -> None: 

47 """ 

48 Registers a new Algorithm for use when creating and verifying tokens. 

49 """ 

50 if alg_id in self._algorithms: 

51 raise ValueError("Algorithm already has a handler.") 

52 

53 if not isinstance(alg_obj, Algorithm): 

54 raise TypeError("Object is not of type `Algorithm`") 

55 

56 self._algorithms[alg_id] = alg_obj 

57 self._valid_algs.add(alg_id) 

58 

59 def unregister_algorithm(self, alg_id: str) -> None: 

60 """ 

61 Unregisters an Algorithm for use when creating and verifying tokens 

62 Throws KeyError if algorithm is not registered. 

63 """ 

64 if alg_id not in self._algorithms: 

65 raise KeyError( 

66 "The specified algorithm could not be removed" 

67 " because it is not registered." 

68 ) 

69 

70 del self._algorithms[alg_id] 

71 self._valid_algs.remove(alg_id) 

72 

73 def get_algorithms(self) -> list[str]: 

74 """ 

75 Returns a list of supported values for the 'alg' parameter. 

76 """ 

77 return list(self._valid_algs) 

78 

79 def get_algorithm_by_name(self, alg_name: str) -> Algorithm: 

80 """ 

81 For a given string name, return the matching Algorithm object. 

82 

83 Example usage: 

84 

85 >>> jws_obj.get_algorithm_by_name("RS256") 

86 """ 

87 try: 

88 return self._algorithms[alg_name] 

89 except KeyError as e: 

90 if not has_crypto and alg_name in requires_cryptography: 

91 raise NotImplementedError( 

92 f"Algorithm '{alg_name}' could not be found. Do you have cryptography installed?" 

93 ) from e 

94 raise NotImplementedError("Algorithm not supported") from e 

95 

96 def encode( 

97 self, 

98 payload: bytes, 

99 key: str, 

100 algorithm: str | None = "HS256", 

101 headers: dict[str, Any] | None = None, 

102 json_encoder: Type[json.JSONEncoder] | None = None, 

103 is_payload_detached: bool = False, 

104 ) -> str: 

105 segments = [] 

106 

107 # declare a new var to narrow the type for type checkers 

108 algorithm_: str = algorithm if algorithm is not None else "none" 

109 

110 # Prefer headers values if present to function parameters. 

111 if headers: 111 ↛ 112line 111 didn't jump to line 112, because the condition on line 111 was never true

112 headers_alg = headers.get("alg") 

113 if headers_alg: 

114 algorithm_ = headers["alg"] 

115 

116 headers_b64 = headers.get("b64") 

117 if headers_b64 is False: 

118 is_payload_detached = True 

119 

120 # Header 

121 header: dict[str, Any] = {"typ": self.header_typ, "alg": algorithm_} 

122 

123 if headers: 123 ↛ 124line 123 didn't jump to line 124, because the condition on line 123 was never true

124 self._validate_headers(headers) 

125 header.update(headers) 

126 

127 if not header["typ"]: 127 ↛ 128line 127 didn't jump to line 128, because the condition on line 127 was never true

128 del header["typ"] 

129 

130 if is_payload_detached: 130 ↛ 131line 130 didn't jump to line 131, because the condition on line 130 was never true

131 header["b64"] = False 

132 elif "b64" in header: 132 ↛ 134line 132 didn't jump to line 134, because the condition on line 132 was never true

133 # True is the standard value for b64, so no need for it 

134 del header["b64"] 

135 

136 # Fix for headers misorder - issue #715 

137 json_header = json.dumps( 

138 header, separators=(",", ":"), cls=json_encoder, sort_keys=True 

139 ).encode() 

140 

141 segments.append(base64url_encode(json_header)) 

142 

143 if is_payload_detached: 143 ↛ 144line 143 didn't jump to line 144, because the condition on line 143 was never true

144 msg_payload = payload 

145 else: 

146 msg_payload = base64url_encode(payload) 

147 segments.append(msg_payload) 

148 

149 # Segments 

150 signing_input = b".".join(segments) 

151 

152 alg_obj = self.get_algorithm_by_name(algorithm_) 

153 key = alg_obj.prepare_key(key) 

154 signature = alg_obj.sign(signing_input, key) 

155 

156 segments.append(base64url_encode(signature)) 

157 

158 # Don't put the payload content inside the encoded token when detached 

159 if is_payload_detached: 159 ↛ 160line 159 didn't jump to line 160, because the condition on line 159 was never true

160 segments[1] = b"" 

161 encoded_string = b".".join(segments) 

162 

163 return encoded_string.decode("utf-8") 

164 

165 def decode_complete( 

166 self, 

167 jwt: str, 

168 key: str = "", 

169 algorithms: list[str] | None = None, 

170 options: dict[str, Any] | None = None, 

171 detached_payload: bytes | None = None, 

172 **kwargs, 

173 ) -> dict[str, Any]: 

174 if kwargs: 174 ↛ 175line 174 didn't jump to line 175, because the condition on line 174 was never true

175 warnings.warn( 

176 "passing additional kwargs to decode_complete() is deprecated " 

177 "and will be removed in pyjwt version 3. " 

178 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

179 RemovedInPyjwt3Warning, 

180 ) 

181 if options is None: 181 ↛ 182line 181 didn't jump to line 182, because the condition on line 181 was never true

182 options = {} 

183 merged_options = {**self.options, **options} 

184 verify_signature = merged_options["verify_signature"] 

185 

186 if verify_signature and not algorithms: 186 ↛ 187line 186 didn't jump to line 187, because the condition on line 186 was never true

187 raise DecodeError( 

188 'It is required that you pass in a value for the "algorithms" argument when calling decode().' 

189 ) 

190 

191 payload, signing_input, header, signature = self._load(jwt) 

192 

193 if header.get("b64", True) is False: 193 ↛ 194line 193 didn't jump to line 194, because the condition on line 193 was never true

194 if detached_payload is None: 

195 raise DecodeError( 

196 'It is required that you pass in a value for the "detached_payload" argument to decode a message having the b64 header set to false.' 

197 ) 

198 payload = detached_payload 

199 signing_input = b".".join([signing_input.rsplit(b".", 1)[0], payload]) 

200 

201 if verify_signature: 201 ↛ 204line 201 didn't jump to line 204, because the condition on line 201 was never false

202 self._verify_signature(signing_input, header, signature, key, algorithms) 

203 

204 return { 

205 "payload": payload, 

206 "header": header, 

207 "signature": signature, 

208 } 

209 

210 def decode( 

211 self, 

212 jwt: str, 

213 key: str = "", 

214 algorithms: list[str] | None = None, 

215 options: dict[str, Any] | None = None, 

216 detached_payload: bytes | None = None, 

217 **kwargs, 

218 ) -> str: 

219 if kwargs: 

220 warnings.warn( 

221 "passing additional kwargs to decode() is deprecated " 

222 "and will be removed in pyjwt version 3. " 

223 f"Unsupported kwargs: {tuple(kwargs.keys())}", 

224 RemovedInPyjwt3Warning, 

225 ) 

226 decoded = self.decode_complete( 

227 jwt, key, algorithms, options, detached_payload=detached_payload 

228 ) 

229 return decoded["payload"] 

230 

231 def get_unverified_header(self, jwt: str | bytes) -> dict: 

232 """Returns back the JWT header parameters as a dict() 

233 

234 Note: The signature is not verified so the header parameters 

235 should not be fully trusted until signature verification is complete 

236 """ 

237 headers = self._load(jwt)[2] 

238 self._validate_headers(headers) 

239 

240 return headers 

241 

242 def _load(self, jwt: str | bytes) -> tuple[bytes, bytes, dict, bytes]: 

243 if isinstance(jwt, str): 243 ↛ 244line 243 didn't jump to line 244, because the condition on line 243 was never true

244 jwt = jwt.encode("utf-8") 

245 

246 if not isinstance(jwt, bytes): 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true

247 raise DecodeError(f"Invalid token type. Token must be a {bytes}") 

248 

249 try: 

250 signing_input, crypto_segment = jwt.rsplit(b".", 1) 

251 header_segment, payload_segment = signing_input.split(b".", 1) 

252 except ValueError as err: 

253 raise DecodeError("Not enough segments") from err 

254 

255 try: 

256 header_data = base64url_decode(header_segment) 

257 except (TypeError, binascii.Error) as err: 

258 raise DecodeError("Invalid header padding") from err 

259 

260 try: 

261 header = json.loads(header_data) 

262 except ValueError as e: 

263 raise DecodeError(f"Invalid header string: {e}") from e 

264 

265 if not isinstance(header, dict): 265 ↛ 266line 265 didn't jump to line 266, because the condition on line 265 was never true

266 raise DecodeError("Invalid header string: must be a json object") 

267 

268 try: 

269 payload = base64url_decode(payload_segment) 

270 except (TypeError, binascii.Error) as err: 

271 raise DecodeError("Invalid payload padding") from err 

272 

273 try: 

274 signature = base64url_decode(crypto_segment) 

275 except (TypeError, binascii.Error) as err: 

276 raise DecodeError("Invalid crypto padding") from err 

277 

278 return (payload, signing_input, header, signature) 

279 

280 def _verify_signature( 

281 self, 

282 signing_input: bytes, 

283 header: dict, 

284 signature: bytes, 

285 key: str = "", 

286 algorithms: list[str] | None = None, 

287 ) -> None: 

288 

289 alg = header.get("alg") 

290 

291 if not alg or (algorithms is not None and alg not in algorithms): 291 ↛ 292line 291 didn't jump to line 292, because the condition on line 291 was never true

292 raise InvalidAlgorithmError("The specified alg value is not allowed") 

293 

294 try: 

295 alg_obj = self.get_algorithm_by_name(alg) 

296 except NotImplementedError as e: 

297 raise InvalidAlgorithmError("Algorithm not supported") from e 

298 key = alg_obj.prepare_key(key) 

299 

300 if not alg_obj.verify(signing_input, key, signature): 300 ↛ 301line 300 didn't jump to line 301, because the condition on line 300 was never true

301 raise InvalidSignatureError("Signature verification failed") 

302 

303 def _validate_headers(self, headers: dict[str, Any]) -> None: 

304 if "kid" in headers: 

305 self._validate_kid(headers["kid"]) 

306 

307 def _validate_kid(self, kid: str) -> None: 

308 if not isinstance(kid, str): 

309 raise InvalidTokenError("Key ID header parameter must be a string") 

310 

311 

312_jws_global_obj = PyJWS() 

313encode = _jws_global_obj.encode 

314decode_complete = _jws_global_obj.decode_complete 

315decode = _jws_global_obj.decode 

316register_algorithm = _jws_global_obj.register_algorithm 

317unregister_algorithm = _jws_global_obj.unregister_algorithm 

318get_algorithm_by_name = _jws_global_obj.get_algorithm_by_name 

319get_unverified_header = _jws_global_obj.get_unverified_header