Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/jwt/utils.py: 44%

77 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import base64 

2import binascii 

3import re 

4from typing import Union 

5 

6try: 

7 from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve 

8 from cryptography.hazmat.primitives.asymmetric.utils import ( 

9 decode_dss_signature, 

10 encode_dss_signature, 

11 ) 

12except ModuleNotFoundError: 

13 EllipticCurve = None 

14 

15 

16def force_bytes(value: Union[str, bytes]) -> bytes: 

17 if isinstance(value, str): 17 ↛ 19line 17 didn't jump to line 19, because the condition on line 17 was never false

18 return value.encode("utf-8") 

19 elif isinstance(value, bytes): 

20 return value 

21 else: 

22 raise TypeError("Expected a string value") 

23 

24 

25def base64url_decode(input: Union[str, bytes]) -> bytes: 

26 if isinstance(input, str): 26 ↛ 27line 26 didn't jump to line 27, because the condition on line 26 was never true

27 input = input.encode("ascii") 

28 

29 rem = len(input) % 4 

30 

31 if rem > 0: 

32 input += b"=" * (4 - rem) 

33 

34 return base64.urlsafe_b64decode(input) 

35 

36 

37def base64url_encode(input: bytes) -> bytes: 

38 return base64.urlsafe_b64encode(input).replace(b"=", b"") 

39 

40 

41def to_base64url_uint(val: int) -> bytes: 

42 if val < 0: 

43 raise ValueError("Must be a positive integer") 

44 

45 int_bytes = bytes_from_int(val) 

46 

47 if len(int_bytes) == 0: 

48 int_bytes = b"\x00" 

49 

50 return base64url_encode(int_bytes) 

51 

52 

53def from_base64url_uint(val: Union[str, bytes]) -> int: 

54 if isinstance(val, str): 

55 val = val.encode("ascii") 

56 

57 data = base64url_decode(val) 

58 return int.from_bytes(data, byteorder="big") 

59 

60 

61def number_to_bytes(num: int, num_bytes: int) -> bytes: 

62 padded_hex = "%0*x" % (2 * num_bytes, num) 

63 return binascii.a2b_hex(padded_hex.encode("ascii")) 

64 

65 

66def bytes_to_number(string: bytes) -> int: 

67 return int(binascii.b2a_hex(string), 16) 

68 

69 

70def bytes_from_int(val: int) -> bytes: 

71 remaining = val 

72 byte_length = 0 

73 

74 while remaining != 0: 

75 remaining >>= 8 

76 byte_length += 1 

77 

78 return val.to_bytes(byte_length, "big", signed=False) 

79 

80 

81def der_to_raw_signature(der_sig: bytes, curve: EllipticCurve) -> bytes: 

82 num_bits = curve.key_size 

83 num_bytes = (num_bits + 7) // 8 

84 

85 r, s = decode_dss_signature(der_sig) 

86 

87 return number_to_bytes(r, num_bytes) + number_to_bytes(s, num_bytes) 

88 

89 

90def raw_to_der_signature(raw_sig: bytes, curve: EllipticCurve) -> bytes: 

91 num_bits = curve.key_size 

92 num_bytes = (num_bits + 7) // 8 

93 

94 if len(raw_sig) != 2 * num_bytes: 

95 raise ValueError("Invalid signature") 

96 

97 r = bytes_to_number(raw_sig[:num_bytes]) 

98 s = bytes_to_number(raw_sig[num_bytes:]) 

99 

100 return encode_dss_signature(r, s) 

101 

102 

103# Based on https://github.com/hynek/pem/blob/7ad94db26b0bc21d10953f5dbad3acfdfacf57aa/src/pem/_core.py#L224-L252 

104_PEMS = { 

105 b"CERTIFICATE", 

106 b"TRUSTED CERTIFICATE", 

107 b"PRIVATE KEY", 

108 b"PUBLIC KEY", 

109 b"ENCRYPTED PRIVATE KEY", 

110 b"OPENSSH PRIVATE KEY", 

111 b"DSA PRIVATE KEY", 

112 b"RSA PRIVATE KEY", 

113 b"RSA PUBLIC KEY", 

114 b"EC PRIVATE KEY", 

115 b"DH PARAMETERS", 

116 b"NEW CERTIFICATE REQUEST", 

117 b"CERTIFICATE REQUEST", 

118 b"SSH2 PUBLIC KEY", 

119 b"SSH2 ENCRYPTED PRIVATE KEY", 

120 b"X509 CRL", 

121} 

122 

123_PEM_RE = re.compile( 

124 b"----[- ]BEGIN (" 

125 + b"|".join(_PEMS) 

126 + b""")[- ]----\r? 

127.+?\r? 

128----[- ]END \\1[- ]----\r?\n?""", 

129 re.DOTALL, 

130) 

131 

132 

133def is_pem_format(key: bytes) -> bool: 

134 return bool(_PEM_RE.search(key)) 

135 

136 

137# Based on https://github.com/pyca/cryptography/blob/bcb70852d577b3f490f015378c75cba74986297b/src/cryptography/hazmat/primitives/serialization/ssh.py#L40-L46 

138_CERT_SUFFIX = b"-cert-v01@openssh.com" 

139_SSH_PUBKEY_RC = re.compile(rb"\A(\S+)[ \t]+(\S+)") 

140_SSH_KEY_FORMATS = [ 

141 b"ssh-ed25519", 

142 b"ssh-rsa", 

143 b"ssh-dss", 

144 b"ecdsa-sha2-nistp256", 

145 b"ecdsa-sha2-nistp384", 

146 b"ecdsa-sha2-nistp521", 

147] 

148 

149 

150def is_ssh_key(key: bytes) -> bool: 

151 if any(string_value in key for string_value in _SSH_KEY_FORMATS): 151 ↛ 152line 151 didn't jump to line 152, because the condition on line 151 was never true

152 return True 

153 

154 ssh_pubkey_match = _SSH_PUBKEY_RC.match(key) 

155 if ssh_pubkey_match: 155 ↛ 156line 155 didn't jump to line 156, because the condition on line 155 was never true

156 key_type = ssh_pubkey_match.group(1) 

157 if _CERT_SUFFIX == key_type[-len(_CERT_SUFFIX) :]: 

158 return True 

159 

160 return False