Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/jwt/utils.py: 44%
77 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import base64
2import binascii
3import re
4from typing import Union
6try:
7 from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve
8 from cryptography.hazmat.primitives.asymmetric.utils import (
9 decode_dss_signature,
10 encode_dss_signature,
11 )
12except ModuleNotFoundError:
13 EllipticCurve = None
16def force_bytes(value: Union[str, bytes]) -> bytes:
17 if isinstance(value, str): 17 ↛ 19line 17 didn't jump to line 19, because the condition on line 17 was never false
18 return value.encode("utf-8")
19 elif isinstance(value, bytes):
20 return value
21 else:
22 raise TypeError("Expected a string value")
25def base64url_decode(input: Union[str, bytes]) -> bytes:
26 if isinstance(input, str): 26 ↛ 27line 26 didn't jump to line 27, because the condition on line 26 was never true
27 input = input.encode("ascii")
29 rem = len(input) % 4
31 if rem > 0:
32 input += b"=" * (4 - rem)
34 return base64.urlsafe_b64decode(input)
37def base64url_encode(input: bytes) -> bytes:
38 return base64.urlsafe_b64encode(input).replace(b"=", b"")
41def to_base64url_uint(val: int) -> bytes:
42 if val < 0:
43 raise ValueError("Must be a positive integer")
45 int_bytes = bytes_from_int(val)
47 if len(int_bytes) == 0:
48 int_bytes = b"\x00"
50 return base64url_encode(int_bytes)
53def from_base64url_uint(val: Union[str, bytes]) -> int:
54 if isinstance(val, str):
55 val = val.encode("ascii")
57 data = base64url_decode(val)
58 return int.from_bytes(data, byteorder="big")
61def number_to_bytes(num: int, num_bytes: int) -> bytes:
62 padded_hex = "%0*x" % (2 * num_bytes, num)
63 return binascii.a2b_hex(padded_hex.encode("ascii"))
66def bytes_to_number(string: bytes) -> int:
67 return int(binascii.b2a_hex(string), 16)
70def bytes_from_int(val: int) -> bytes:
71 remaining = val
72 byte_length = 0
74 while remaining != 0:
75 remaining >>= 8
76 byte_length += 1
78 return val.to_bytes(byte_length, "big", signed=False)
81def der_to_raw_signature(der_sig: bytes, curve: EllipticCurve) -> bytes:
82 num_bits = curve.key_size
83 num_bytes = (num_bits + 7) // 8
85 r, s = decode_dss_signature(der_sig)
87 return number_to_bytes(r, num_bytes) + number_to_bytes(s, num_bytes)
90def raw_to_der_signature(raw_sig: bytes, curve: EllipticCurve) -> bytes:
91 num_bits = curve.key_size
92 num_bytes = (num_bits + 7) // 8
94 if len(raw_sig) != 2 * num_bytes:
95 raise ValueError("Invalid signature")
97 r = bytes_to_number(raw_sig[:num_bytes])
98 s = bytes_to_number(raw_sig[num_bytes:])
100 return encode_dss_signature(r, s)
103# Based on https://github.com/hynek/pem/blob/7ad94db26b0bc21d10953f5dbad3acfdfacf57aa/src/pem/_core.py#L224-L252
104_PEMS = {
105 b"CERTIFICATE",
106 b"TRUSTED CERTIFICATE",
107 b"PRIVATE KEY",
108 b"PUBLIC KEY",
109 b"ENCRYPTED PRIVATE KEY",
110 b"OPENSSH PRIVATE KEY",
111 b"DSA PRIVATE KEY",
112 b"RSA PRIVATE KEY",
113 b"RSA PUBLIC KEY",
114 b"EC PRIVATE KEY",
115 b"DH PARAMETERS",
116 b"NEW CERTIFICATE REQUEST",
117 b"CERTIFICATE REQUEST",
118 b"SSH2 PUBLIC KEY",
119 b"SSH2 ENCRYPTED PRIVATE KEY",
120 b"X509 CRL",
121}
123_PEM_RE = re.compile(
124 b"----[- ]BEGIN ("
125 + b"|".join(_PEMS)
126 + b""")[- ]----\r?
127.+?\r?
128----[- ]END \\1[- ]----\r?\n?""",
129 re.DOTALL,
130)
133def is_pem_format(key: bytes) -> bool:
134 return bool(_PEM_RE.search(key))
137# Based on https://github.com/pyca/cryptography/blob/bcb70852d577b3f490f015378c75cba74986297b/src/cryptography/hazmat/primitives/serialization/ssh.py#L40-L46
138_CERT_SUFFIX = b"-cert-v01@openssh.com"
139_SSH_PUBKEY_RC = re.compile(rb"\A(\S+)[ \t]+(\S+)")
140_SSH_KEY_FORMATS = [
141 b"ssh-ed25519",
142 b"ssh-rsa",
143 b"ssh-dss",
144 b"ecdsa-sha2-nistp256",
145 b"ecdsa-sha2-nistp384",
146 b"ecdsa-sha2-nistp521",
147]
150def is_ssh_key(key: bytes) -> bool:
151 if any(string_value in key for string_value in _SSH_KEY_FORMATS): 151 ↛ 152line 151 didn't jump to line 152, because the condition on line 151 was never true
152 return True
154 ssh_pubkey_match = _SSH_PUBKEY_RC.match(key)
155 if ssh_pubkey_match: 155 ↛ 156line 155 didn't jump to line 156, because the condition on line 155 was never true
156 key_type = ssh_pubkey_match.group(1)
157 if _CERT_SUFFIX == key_type[-len(_CERT_SUFFIX) :]:
158 return True
160 return False