Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/jwt/algorithms.py: 11%
326 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import hashlib
2import hmac
3import json
5from .exceptions import InvalidKeyError
6from .utils import (
7 base64url_decode,
8 base64url_encode,
9 der_to_raw_signature,
10 force_bytes,
11 from_base64url_uint,
12 is_pem_format,
13 is_ssh_key,
14 raw_to_der_signature,
15 to_base64url_uint,
16)
18try:
19 import cryptography.exceptions
20 from cryptography.exceptions import InvalidSignature
21 from cryptography.hazmat.primitives import hashes
22 from cryptography.hazmat.primitives.asymmetric import ec, padding
23 from cryptography.hazmat.primitives.asymmetric.ec import (
24 EllipticCurvePrivateKey,
25 EllipticCurvePublicKey,
26 )
27 from cryptography.hazmat.primitives.asymmetric.ed448 import (
28 Ed448PrivateKey,
29 Ed448PublicKey,
30 )
31 from cryptography.hazmat.primitives.asymmetric.ed25519 import (
32 Ed25519PrivateKey,
33 Ed25519PublicKey,
34 )
35 from cryptography.hazmat.primitives.asymmetric.rsa import (
36 RSAPrivateKey,
37 RSAPrivateNumbers,
38 RSAPublicKey,
39 RSAPublicNumbers,
40 rsa_crt_dmp1,
41 rsa_crt_dmq1,
42 rsa_crt_iqmp,
43 rsa_recover_prime_factors,
44 )
45 from cryptography.hazmat.primitives.serialization import (
46 Encoding,
47 NoEncryption,
48 PrivateFormat,
49 PublicFormat,
50 load_pem_private_key,
51 load_pem_public_key,
52 load_ssh_public_key,
53 )
55 has_crypto = True
56except ModuleNotFoundError:
57 has_crypto = False
59requires_cryptography = {
60 "RS256",
61 "RS384",
62 "RS512",
63 "ES256",
64 "ES256K",
65 "ES384",
66 "ES521",
67 "ES512",
68 "PS256",
69 "PS384",
70 "PS512",
71 "EdDSA",
72}
75def get_default_algorithms():
76 """
77 Returns the algorithms that are implemented by the library.
78 """
79 default_algorithms = {
80 "none": NoneAlgorithm(),
81 "HS256": HMACAlgorithm(HMACAlgorithm.SHA256),
82 "HS384": HMACAlgorithm(HMACAlgorithm.SHA384),
83 "HS512": HMACAlgorithm(HMACAlgorithm.SHA512),
84 }
86 if has_crypto: 86 ↛ 87line 86 didn't jump to line 87, because the condition on line 86 was never true
87 default_algorithms.update(
88 {
89 "RS256": RSAAlgorithm(RSAAlgorithm.SHA256),
90 "RS384": RSAAlgorithm(RSAAlgorithm.SHA384),
91 "RS512": RSAAlgorithm(RSAAlgorithm.SHA512),
92 "ES256": ECAlgorithm(ECAlgorithm.SHA256),
93 "ES256K": ECAlgorithm(ECAlgorithm.SHA256),
94 "ES384": ECAlgorithm(ECAlgorithm.SHA384),
95 "ES521": ECAlgorithm(ECAlgorithm.SHA512),
96 "ES512": ECAlgorithm(
97 ECAlgorithm.SHA512
98 ), # Backward compat for #219 fix
99 "PS256": RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256),
100 "PS384": RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384),
101 "PS512": RSAPSSAlgorithm(RSAPSSAlgorithm.SHA512),
102 "EdDSA": OKPAlgorithm(),
103 }
104 )
106 return default_algorithms
109class Algorithm:
110 """
111 The interface for an algorithm used to sign and verify tokens.
112 """
114 def prepare_key(self, key):
115 """
116 Performs necessary validation and conversions on the key and returns
117 the key value in the proper format for sign() and verify().
118 """
119 raise NotImplementedError
121 def sign(self, msg, key):
122 """
123 Returns a digital signature for the specified message
124 using the specified key value.
125 """
126 raise NotImplementedError
128 def verify(self, msg, key, sig):
129 """
130 Verifies that the specified digital signature is valid
131 for the specified message and key values.
132 """
133 raise NotImplementedError
135 @staticmethod
136 def to_jwk(key_obj):
137 """
138 Serializes a given RSA key into a JWK
139 """
140 raise NotImplementedError
142 @staticmethod
143 def from_jwk(jwk):
144 """
145 Deserializes a given RSA key from JWK back into a PublicKey or PrivateKey object
146 """
147 raise NotImplementedError
150class NoneAlgorithm(Algorithm):
151 """
152 Placeholder for use when no signing or verification
153 operations are required.
154 """
156 def prepare_key(self, key):
157 if key == "":
158 key = None
160 if key is not None:
161 raise InvalidKeyError('When alg = "none", key value must be None.')
163 return key
165 def sign(self, msg, key):
166 return b""
168 def verify(self, msg, key, sig):
169 return False
172class HMACAlgorithm(Algorithm):
173 """
174 Performs signing and verification operations using HMAC
175 and the specified hash function.
176 """
178 SHA256 = hashlib.sha256
179 SHA384 = hashlib.sha384
180 SHA512 = hashlib.sha512
182 def __init__(self, hash_alg):
183 self.hash_alg = hash_alg
185 def prepare_key(self, key):
186 key = force_bytes(key)
188 if is_pem_format(key) or is_ssh_key(key): 188 ↛ 189line 188 didn't jump to line 189, because the condition on line 188 was never true
189 raise InvalidKeyError(
190 "The specified key is an asymmetric key or x509 certificate and"
191 " should not be used as an HMAC secret."
192 )
194 return key
196 @staticmethod
197 def to_jwk(key_obj):
198 return json.dumps(
199 {
200 "k": base64url_encode(force_bytes(key_obj)).decode(),
201 "kty": "oct",
202 }
203 )
205 @staticmethod
206 def from_jwk(jwk):
207 try:
208 if isinstance(jwk, str):
209 obj = json.loads(jwk)
210 elif isinstance(jwk, dict):
211 obj = jwk
212 else:
213 raise ValueError
214 except ValueError:
215 raise InvalidKeyError("Key is not valid JSON")
217 if obj.get("kty") != "oct":
218 raise InvalidKeyError("Not an HMAC key")
220 return base64url_decode(obj["k"])
222 def sign(self, msg, key):
223 return hmac.new(key, msg, self.hash_alg).digest()
225 def verify(self, msg, key, sig):
226 return hmac.compare_digest(sig, self.sign(msg, key))
229if has_crypto: 229 ↛ 231line 229 didn't jump to line 231, because the condition on line 229 was never true
231 class RSAAlgorithm(Algorithm):
232 """
233 Performs signing and verification operations using
234 RSASSA-PKCS-v1_5 and the specified hash function.
235 """
237 SHA256 = hashes.SHA256
238 SHA384 = hashes.SHA384
239 SHA512 = hashes.SHA512
241 def __init__(self, hash_alg):
242 self.hash_alg = hash_alg
244 def prepare_key(self, key):
245 if isinstance(key, (RSAPrivateKey, RSAPublicKey)):
246 return key
248 if not isinstance(key, (bytes, str)):
249 raise TypeError("Expecting a PEM-formatted key.")
251 key = force_bytes(key)
253 try:
254 if key.startswith(b"ssh-rsa"):
255 key = load_ssh_public_key(key)
256 else:
257 key = load_pem_private_key(key, password=None)
258 except ValueError:
259 key = load_pem_public_key(key)
260 return key
262 @staticmethod
263 def to_jwk(key_obj):
264 obj = None
266 if getattr(key_obj, "private_numbers", None):
267 # Private key
268 numbers = key_obj.private_numbers()
270 obj = {
271 "kty": "RSA",
272 "key_ops": ["sign"],
273 "n": to_base64url_uint(numbers.public_numbers.n).decode(),
274 "e": to_base64url_uint(numbers.public_numbers.e).decode(),
275 "d": to_base64url_uint(numbers.d).decode(),
276 "p": to_base64url_uint(numbers.p).decode(),
277 "q": to_base64url_uint(numbers.q).decode(),
278 "dp": to_base64url_uint(numbers.dmp1).decode(),
279 "dq": to_base64url_uint(numbers.dmq1).decode(),
280 "qi": to_base64url_uint(numbers.iqmp).decode(),
281 }
283 elif getattr(key_obj, "verify", None):
284 # Public key
285 numbers = key_obj.public_numbers()
287 obj = {
288 "kty": "RSA",
289 "key_ops": ["verify"],
290 "n": to_base64url_uint(numbers.n).decode(),
291 "e": to_base64url_uint(numbers.e).decode(),
292 }
293 else:
294 raise InvalidKeyError("Not a public or private key")
296 return json.dumps(obj)
298 @staticmethod
299 def from_jwk(jwk):
300 try:
301 if isinstance(jwk, str):
302 obj = json.loads(jwk)
303 elif isinstance(jwk, dict):
304 obj = jwk
305 else:
306 raise ValueError
307 except ValueError:
308 raise InvalidKeyError("Key is not valid JSON")
310 if obj.get("kty") != "RSA":
311 raise InvalidKeyError("Not an RSA key")
313 if "d" in obj and "e" in obj and "n" in obj:
314 # Private key
315 if "oth" in obj:
316 raise InvalidKeyError(
317 "Unsupported RSA private key: > 2 primes not supported"
318 )
320 other_props = ["p", "q", "dp", "dq", "qi"]
321 props_found = [prop in obj for prop in other_props]
322 any_props_found = any(props_found)
324 if any_props_found and not all(props_found):
325 raise InvalidKeyError(
326 "RSA key must include all parameters if any are present besides d"
327 )
329 public_numbers = RSAPublicNumbers(
330 from_base64url_uint(obj["e"]),
331 from_base64url_uint(obj["n"]),
332 )
334 if any_props_found:
335 numbers = RSAPrivateNumbers(
336 d=from_base64url_uint(obj["d"]),
337 p=from_base64url_uint(obj["p"]),
338 q=from_base64url_uint(obj["q"]),
339 dmp1=from_base64url_uint(obj["dp"]),
340 dmq1=from_base64url_uint(obj["dq"]),
341 iqmp=from_base64url_uint(obj["qi"]),
342 public_numbers=public_numbers,
343 )
344 else:
345 d = from_base64url_uint(obj["d"])
346 p, q = rsa_recover_prime_factors(
347 public_numbers.n, d, public_numbers.e
348 )
350 numbers = RSAPrivateNumbers(
351 d=d,
352 p=p,
353 q=q,
354 dmp1=rsa_crt_dmp1(d, p),
355 dmq1=rsa_crt_dmq1(d, q),
356 iqmp=rsa_crt_iqmp(p, q),
357 public_numbers=public_numbers,
358 )
360 return numbers.private_key()
361 elif "n" in obj and "e" in obj:
362 # Public key
363 numbers = RSAPublicNumbers(
364 from_base64url_uint(obj["e"]),
365 from_base64url_uint(obj["n"]),
366 )
368 return numbers.public_key()
369 else:
370 raise InvalidKeyError("Not a public or private key")
372 def sign(self, msg, key):
373 return key.sign(msg, padding.PKCS1v15(), self.hash_alg())
375 def verify(self, msg, key, sig):
376 try:
377 key.verify(sig, msg, padding.PKCS1v15(), self.hash_alg())
378 return True
379 except InvalidSignature:
380 return False
382 class ECAlgorithm(Algorithm):
383 """
384 Performs signing and verification operations using
385 ECDSA and the specified hash function
386 """
388 SHA256 = hashes.SHA256
389 SHA384 = hashes.SHA384
390 SHA512 = hashes.SHA512
392 def __init__(self, hash_alg):
393 self.hash_alg = hash_alg
395 def prepare_key(self, key):
396 if isinstance(key, (EllipticCurvePrivateKey, EllipticCurvePublicKey)):
397 return key
399 if not isinstance(key, (bytes, str)):
400 raise TypeError("Expecting a PEM-formatted key.")
402 key = force_bytes(key)
404 # Attempt to load key. We don't know if it's
405 # a Signing Key or a Verifying Key, so we try
406 # the Verifying Key first.
407 try:
408 if key.startswith(b"ecdsa-sha2-"):
409 key = load_ssh_public_key(key)
410 else:
411 key = load_pem_public_key(key)
412 except ValueError:
413 key = load_pem_private_key(key, password=None)
415 # Explicit check the key to prevent confusing errors from cryptography
416 if not isinstance(key, (EllipticCurvePrivateKey, EllipticCurvePublicKey)):
417 raise InvalidKeyError(
418 "Expecting a EllipticCurvePrivateKey/EllipticCurvePublicKey. Wrong key provided for ECDSA algorithms"
419 )
421 return key
423 def sign(self, msg, key):
424 der_sig = key.sign(msg, ec.ECDSA(self.hash_alg()))
426 return der_to_raw_signature(der_sig, key.curve)
428 def verify(self, msg, key, sig):
429 try:
430 der_sig = raw_to_der_signature(sig, key.curve)
431 except ValueError:
432 return False
434 try:
435 if isinstance(key, EllipticCurvePrivateKey):
436 key = key.public_key()
437 key.verify(der_sig, msg, ec.ECDSA(self.hash_alg()))
438 return True
439 except InvalidSignature:
440 return False
442 @staticmethod
443 def to_jwk(key_obj):
445 if isinstance(key_obj, EllipticCurvePrivateKey):
446 public_numbers = key_obj.public_key().public_numbers()
447 elif isinstance(key_obj, EllipticCurvePublicKey):
448 public_numbers = key_obj.public_numbers()
449 else:
450 raise InvalidKeyError("Not a public or private key")
452 if isinstance(key_obj.curve, ec.SECP256R1):
453 crv = "P-256"
454 elif isinstance(key_obj.curve, ec.SECP384R1):
455 crv = "P-384"
456 elif isinstance(key_obj.curve, ec.SECP521R1):
457 crv = "P-521"
458 elif isinstance(key_obj.curve, ec.SECP256K1):
459 crv = "secp256k1"
460 else:
461 raise InvalidKeyError(f"Invalid curve: {key_obj.curve}")
463 obj = {
464 "kty": "EC",
465 "crv": crv,
466 "x": to_base64url_uint(public_numbers.x).decode(),
467 "y": to_base64url_uint(public_numbers.y).decode(),
468 }
470 if isinstance(key_obj, EllipticCurvePrivateKey):
471 obj["d"] = to_base64url_uint(
472 key_obj.private_numbers().private_value
473 ).decode()
475 return json.dumps(obj)
477 @staticmethod
478 def from_jwk(jwk):
479 try:
480 if isinstance(jwk, str):
481 obj = json.loads(jwk)
482 elif isinstance(jwk, dict):
483 obj = jwk
484 else:
485 raise ValueError
486 except ValueError:
487 raise InvalidKeyError("Key is not valid JSON")
489 if obj.get("kty") != "EC":
490 raise InvalidKeyError("Not an Elliptic curve key")
492 if "x" not in obj or "y" not in obj:
493 raise InvalidKeyError("Not an Elliptic curve key")
495 x = base64url_decode(obj.get("x"))
496 y = base64url_decode(obj.get("y"))
498 curve = obj.get("crv")
499 if curve == "P-256":
500 if len(x) == len(y) == 32:
501 curve_obj = ec.SECP256R1()
502 else:
503 raise InvalidKeyError("Coords should be 32 bytes for curve P-256")
504 elif curve == "P-384":
505 if len(x) == len(y) == 48:
506 curve_obj = ec.SECP384R1()
507 else:
508 raise InvalidKeyError("Coords should be 48 bytes for curve P-384")
509 elif curve == "P-521":
510 if len(x) == len(y) == 66:
511 curve_obj = ec.SECP521R1()
512 else:
513 raise InvalidKeyError("Coords should be 66 bytes for curve P-521")
514 elif curve == "secp256k1":
515 if len(x) == len(y) == 32:
516 curve_obj = ec.SECP256K1()
517 else:
518 raise InvalidKeyError(
519 "Coords should be 32 bytes for curve secp256k1"
520 )
521 else:
522 raise InvalidKeyError(f"Invalid curve: {curve}")
524 public_numbers = ec.EllipticCurvePublicNumbers(
525 x=int.from_bytes(x, byteorder="big"),
526 y=int.from_bytes(y, byteorder="big"),
527 curve=curve_obj,
528 )
530 if "d" not in obj:
531 return public_numbers.public_key()
533 d = base64url_decode(obj.get("d"))
534 if len(d) != len(x):
535 raise InvalidKeyError(
536 "D should be {} bytes for curve {}", len(x), curve
537 )
539 return ec.EllipticCurvePrivateNumbers(
540 int.from_bytes(d, byteorder="big"), public_numbers
541 ).private_key()
543 class RSAPSSAlgorithm(RSAAlgorithm):
544 """
545 Performs a signature using RSASSA-PSS with MGF1
546 """
548 def sign(self, msg, key):
549 return key.sign(
550 msg,
551 padding.PSS(
552 mgf=padding.MGF1(self.hash_alg()),
553 salt_length=self.hash_alg.digest_size,
554 ),
555 self.hash_alg(),
556 )
558 def verify(self, msg, key, sig):
559 try:
560 key.verify(
561 sig,
562 msg,
563 padding.PSS(
564 mgf=padding.MGF1(self.hash_alg()),
565 salt_length=self.hash_alg.digest_size,
566 ),
567 self.hash_alg(),
568 )
569 return True
570 except InvalidSignature:
571 return False
573 class OKPAlgorithm(Algorithm):
574 """
575 Performs signing and verification operations using EdDSA
577 This class requires ``cryptography>=2.6`` to be installed.
578 """
580 def __init__(self, **kwargs):
581 pass
583 def prepare_key(self, key):
584 if isinstance(key, (bytes, str)):
585 if isinstance(key, str):
586 key = key.encode("utf-8")
587 str_key = key.decode("utf-8")
589 if "-----BEGIN PUBLIC" in str_key:
590 key = load_pem_public_key(key)
591 elif "-----BEGIN PRIVATE" in str_key:
592 key = load_pem_private_key(key, password=None)
593 elif str_key[0:4] == "ssh-":
594 key = load_ssh_public_key(key)
596 # Explicit check the key to prevent confusing errors from cryptography
597 if not isinstance(
598 key,
599 (Ed25519PrivateKey, Ed25519PublicKey, Ed448PrivateKey, Ed448PublicKey),
600 ):
601 raise InvalidKeyError(
602 "Expecting a EllipticCurvePrivateKey/EllipticCurvePublicKey. Wrong key provided for EdDSA algorithms"
603 )
605 return key
607 def sign(self, msg, key):
608 """
609 Sign a message ``msg`` using the EdDSA private key ``key``
610 :param str|bytes msg: Message to sign
611 :param Ed25519PrivateKey}Ed448PrivateKey key: A :class:`.Ed25519PrivateKey`
612 or :class:`.Ed448PrivateKey` isinstance
613 :return bytes signature: The signature, as bytes
614 """
615 msg = bytes(msg, "utf-8") if type(msg) is not bytes else msg
616 return key.sign(msg)
618 def verify(self, msg, key, sig):
619 """
620 Verify a given ``msg`` against a signature ``sig`` using the EdDSA key ``key``
622 :param str|bytes sig: EdDSA signature to check ``msg`` against
623 :param str|bytes msg: Message to sign
624 :param Ed25519PrivateKey|Ed25519PublicKey|Ed448PrivateKey|Ed448PublicKey key:
625 A private or public EdDSA key instance
626 :return bool verified: True if signature is valid, False if not.
627 """
628 try:
629 msg = bytes(msg, "utf-8") if type(msg) is not bytes else msg
630 sig = bytes(sig, "utf-8") if type(sig) is not bytes else sig
632 if isinstance(key, (Ed25519PrivateKey, Ed448PrivateKey)):
633 key = key.public_key()
634 key.verify(sig, msg)
635 return True # If no exception was raised, the signature is valid.
636 except cryptography.exceptions.InvalidSignature:
637 return False
639 @staticmethod
640 def to_jwk(key):
641 if isinstance(key, (Ed25519PublicKey, Ed448PublicKey)):
642 x = key.public_bytes(
643 encoding=Encoding.Raw,
644 format=PublicFormat.Raw,
645 )
646 crv = "Ed25519" if isinstance(key, Ed25519PublicKey) else "Ed448"
647 return json.dumps(
648 {
649 "x": base64url_encode(force_bytes(x)).decode(),
650 "kty": "OKP",
651 "crv": crv,
652 }
653 )
655 if isinstance(key, (Ed25519PrivateKey, Ed448PrivateKey)):
656 d = key.private_bytes(
657 encoding=Encoding.Raw,
658 format=PrivateFormat.Raw,
659 encryption_algorithm=NoEncryption(),
660 )
662 x = key.public_key().public_bytes(
663 encoding=Encoding.Raw,
664 format=PublicFormat.Raw,
665 )
667 crv = "Ed25519" if isinstance(key, Ed25519PrivateKey) else "Ed448"
668 return json.dumps(
669 {
670 "x": base64url_encode(force_bytes(x)).decode(),
671 "d": base64url_encode(force_bytes(d)).decode(),
672 "kty": "OKP",
673 "crv": crv,
674 }
675 )
677 raise InvalidKeyError("Not a public or private key")
679 @staticmethod
680 def from_jwk(jwk):
681 try:
682 if isinstance(jwk, str):
683 obj = json.loads(jwk)
684 elif isinstance(jwk, dict):
685 obj = jwk
686 else:
687 raise ValueError
688 except ValueError:
689 raise InvalidKeyError("Key is not valid JSON")
691 if obj.get("kty") != "OKP":
692 raise InvalidKeyError("Not an Octet Key Pair")
694 curve = obj.get("crv")
695 if curve != "Ed25519" and curve != "Ed448":
696 raise InvalidKeyError(f"Invalid curve: {curve}")
698 if "x" not in obj:
699 raise InvalidKeyError('OKP should have "x" parameter')
700 x = base64url_decode(obj.get("x"))
702 try:
703 if "d" not in obj:
704 if curve == "Ed25519":
705 return Ed25519PublicKey.from_public_bytes(x)
706 return Ed448PublicKey.from_public_bytes(x)
707 d = base64url_decode(obj.get("d"))
708 if curve == "Ed25519":
709 return Ed25519PrivateKey.from_private_bytes(d)
710 return Ed448PrivateKey.from_private_bytes(d)
711 except ValueError as err:
712 raise InvalidKeyError("Invalid key parameter") from err