Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/urllib3/connection.py: 24%
226 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import absolute_import
3import datetime
4import logging
5import os
6import re
7import socket
8import warnings
9from socket import error as SocketError
10from socket import timeout as SocketTimeout
12from .packages import six
13from .packages.six.moves.http_client import HTTPConnection as _HTTPConnection
14from .packages.six.moves.http_client import HTTPException # noqa: F401
15from .util.proxy import create_proxy_ssl_context
17try: # Compiled with SSL?
18 import ssl
20 BaseSSLError = ssl.SSLError
21except (ImportError, AttributeError): # Platform-specific: No SSL.
22 ssl = None
24 class BaseSSLError(BaseException):
25 pass
28try:
29 # Python 3: not a no-op, we're adding this to the namespace so it can be imported.
30 ConnectionError = ConnectionError
31except NameError:
32 # Python 2
33 class ConnectionError(Exception):
34 pass
37try: # Python 3:
38 # Not a no-op, we're adding this to the namespace so it can be imported.
39 BrokenPipeError = BrokenPipeError
40except NameError: # Python 2:
42 class BrokenPipeError(Exception):
43 pass
46from ._collections import HTTPHeaderDict # noqa (historical, removed in v2)
47from ._version import __version__
48from .exceptions import (
49 ConnectTimeoutError,
50 NewConnectionError,
51 SubjectAltNameWarning,
52 SystemTimeWarning,
53)
54from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection
55from .util.ssl_ import (
56 assert_fingerprint,
57 create_urllib3_context,
58 is_ipaddress,
59 resolve_cert_reqs,
60 resolve_ssl_version,
61 ssl_wrap_socket,
62)
63from .util.ssl_match_hostname import CertificateError, match_hostname
65log = logging.getLogger(__name__)
67port_by_scheme = {"http": 80, "https": 443}
69# When it comes time to update this value as a part of regular maintenance
70# (ie test_recent_date is failing) update it to ~6 months before the current date.
71RECENT_DATE = datetime.date(2022, 1, 1)
73_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
76class HTTPConnection(_HTTPConnection, object):
77 """
78 Based on :class:`http.client.HTTPConnection` but provides an extra constructor
79 backwards-compatibility layer between older and newer Pythons.
81 Additional keyword parameters are used to configure attributes of the connection.
82 Accepted parameters include:
84 - ``strict``: See the documentation on :class:`urllib3.connectionpool.HTTPConnectionPool`
85 - ``source_address``: Set the source address for the current connection.
86 - ``socket_options``: Set specific options on the underlying socket. If not specified, then
87 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling
88 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy.
90 For example, if you wish to enable TCP Keep Alive in addition to the defaults,
91 you might pass:
93 .. code-block:: python
95 HTTPConnection.default_socket_options + [
96 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
97 ]
99 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``).
100 """
102 default_port = port_by_scheme["http"]
104 #: Disable Nagle's algorithm by default.
105 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]``
106 default_socket_options = [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
108 #: Whether this connection verifies the host's certificate.
109 is_verified = False
111 #: Whether this proxy connection (if used) verifies the proxy host's
112 #: certificate.
113 proxy_is_verified = None
115 def __init__(self, *args, **kw):
116 if not six.PY2:
117 kw.pop("strict", None)
119 # Pre-set source_address.
120 self.source_address = kw.get("source_address")
122 #: The socket options provided by the user. If no options are
123 #: provided, we use the default options.
124 self.socket_options = kw.pop("socket_options", self.default_socket_options)
126 # Proxy options provided by the user.
127 self.proxy = kw.pop("proxy", None)
128 self.proxy_config = kw.pop("proxy_config", None)
130 _HTTPConnection.__init__(self, *args, **kw)
132 @property
133 def host(self):
134 """
135 Getter method to remove any trailing dots that indicate the hostname is an FQDN.
137 In general, SSL certificates don't include the trailing dot indicating a
138 fully-qualified domain name, and thus, they don't validate properly when
139 checked against a domain name that includes the dot. In addition, some
140 servers may not expect to receive the trailing dot when provided.
142 However, the hostname with trailing dot is critical to DNS resolution; doing a
143 lookup with the trailing dot will properly only resolve the appropriate FQDN,
144 whereas a lookup without a trailing dot will search the system's search domain
145 list. Thus, it's important to keep the original host around for use only in
146 those cases where it's appropriate (i.e., when doing DNS lookup to establish the
147 actual TCP connection across which we're going to send HTTP requests).
148 """
149 return self._dns_host.rstrip(".")
151 @host.setter
152 def host(self, value):
153 """
154 Setter for the `host` property.
156 We assume that only urllib3 uses the _dns_host attribute; httplib itself
157 only uses `host`, and it seems reasonable that other libraries follow suit.
158 """
159 self._dns_host = value
161 def _new_conn(self):
162 """Establish a socket connection and set nodelay settings on it.
164 :return: New socket connection.
165 """
166 extra_kw = {}
167 if self.source_address:
168 extra_kw["source_address"] = self.source_address
170 if self.socket_options:
171 extra_kw["socket_options"] = self.socket_options
173 try:
174 conn = connection.create_connection(
175 (self._dns_host, self.port), self.timeout, **extra_kw
176 )
178 except SocketTimeout:
179 raise ConnectTimeoutError(
180 self,
181 "Connection to %s timed out. (connect timeout=%s)"
182 % (self.host, self.timeout),
183 )
185 except SocketError as e:
186 raise NewConnectionError(
187 self, "Failed to establish a new connection: %s" % e
188 )
190 return conn
192 def _is_using_tunnel(self):
193 # Google App Engine's httplib does not define _tunnel_host
194 return getattr(self, "_tunnel_host", None)
196 def _prepare_conn(self, conn):
197 self.sock = conn
198 if self._is_using_tunnel():
199 # TODO: Fix tunnel so it doesn't depend on self.sock state.
200 self._tunnel()
201 # Mark this connection as not reusable
202 self.auto_open = 0
204 def connect(self):
205 conn = self._new_conn()
206 self._prepare_conn(conn)
208 def putrequest(self, method, url, *args, **kwargs):
209 """ """
210 # Empty docstring because the indentation of CPython's implementation
211 # is broken but we don't want this method in our documentation.
212 match = _CONTAINS_CONTROL_CHAR_RE.search(method)
213 if match:
214 raise ValueError(
215 "Method cannot contain non-token characters %r (found at least %r)"
216 % (method, match.group())
217 )
219 return _HTTPConnection.putrequest(self, method, url, *args, **kwargs)
221 def putheader(self, header, *values):
222 """ """
223 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values):
224 _HTTPConnection.putheader(self, header, *values)
225 elif six.ensure_str(header.lower()) not in SKIPPABLE_HEADERS:
226 raise ValueError(
227 "urllib3.util.SKIP_HEADER only supports '%s'"
228 % ("', '".join(map(str.title, sorted(SKIPPABLE_HEADERS))),)
229 )
231 def request(self, method, url, body=None, headers=None):
232 if headers is None:
233 headers = {}
234 else:
235 # Avoid modifying the headers passed into .request()
236 headers = headers.copy()
237 if "user-agent" not in (six.ensure_str(k.lower()) for k in headers):
238 headers["User-Agent"] = _get_default_user_agent()
239 super(HTTPConnection, self).request(method, url, body=body, headers=headers)
241 def request_chunked(self, method, url, body=None, headers=None):
242 """
243 Alternative to the common request method, which sends the
244 body with chunked encoding and not as one block
245 """
246 headers = headers or {}
247 header_keys = set([six.ensure_str(k.lower()) for k in headers])
248 skip_accept_encoding = "accept-encoding" in header_keys
249 skip_host = "host" in header_keys
250 self.putrequest(
251 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host
252 )
253 if "user-agent" not in header_keys:
254 self.putheader("User-Agent", _get_default_user_agent())
255 for header, value in headers.items():
256 self.putheader(header, value)
257 if "transfer-encoding" not in header_keys:
258 self.putheader("Transfer-Encoding", "chunked")
259 self.endheaders()
261 if body is not None:
262 stringish_types = six.string_types + (bytes,)
263 if isinstance(body, stringish_types):
264 body = (body,)
265 for chunk in body:
266 if not chunk:
267 continue
268 if not isinstance(chunk, bytes):
269 chunk = chunk.encode("utf8")
270 len_str = hex(len(chunk))[2:]
271 to_send = bytearray(len_str.encode())
272 to_send += b"\r\n"
273 to_send += chunk
274 to_send += b"\r\n"
275 self.send(to_send)
277 # After the if clause, to always have a closed body
278 self.send(b"0\r\n\r\n")
281class HTTPSConnection(HTTPConnection):
282 """
283 Many of the parameters to this constructor are passed to the underlying SSL
284 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`.
285 """
287 default_port = port_by_scheme["https"]
289 cert_reqs = None
290 ca_certs = None
291 ca_cert_dir = None
292 ca_cert_data = None
293 ssl_version = None
294 assert_fingerprint = None
295 tls_in_tls_required = False
297 def __init__(
298 self,
299 host,
300 port=None,
301 key_file=None,
302 cert_file=None,
303 key_password=None,
304 strict=None,
305 timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
306 ssl_context=None,
307 server_hostname=None,
308 **kw
309 ):
311 HTTPConnection.__init__(self, host, port, strict=strict, timeout=timeout, **kw)
313 self.key_file = key_file
314 self.cert_file = cert_file
315 self.key_password = key_password
316 self.ssl_context = ssl_context
317 self.server_hostname = server_hostname
319 # Required property for Google AppEngine 1.9.0 which otherwise causes
320 # HTTPS requests to go out as HTTP. (See Issue #356)
321 self._protocol = "https"
323 def set_cert(
324 self,
325 key_file=None,
326 cert_file=None,
327 cert_reqs=None,
328 key_password=None,
329 ca_certs=None,
330 assert_hostname=None,
331 assert_fingerprint=None,
332 ca_cert_dir=None,
333 ca_cert_data=None,
334 ):
335 """
336 This method should only be called once, before the connection is used.
337 """
338 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also
339 # have an SSLContext object in which case we'll use its verify_mode.
340 if cert_reqs is None:
341 if self.ssl_context is not None:
342 cert_reqs = self.ssl_context.verify_mode
343 else:
344 cert_reqs = resolve_cert_reqs(None)
346 self.key_file = key_file
347 self.cert_file = cert_file
348 self.cert_reqs = cert_reqs
349 self.key_password = key_password
350 self.assert_hostname = assert_hostname
351 self.assert_fingerprint = assert_fingerprint
352 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
353 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
354 self.ca_cert_data = ca_cert_data
356 def connect(self):
357 # Add certificate verification
358 self.sock = conn = self._new_conn()
359 hostname = self.host
360 tls_in_tls = False
362 if self._is_using_tunnel():
363 if self.tls_in_tls_required:
364 self.sock = conn = self._connect_tls_proxy(hostname, conn)
365 tls_in_tls = True
367 # Calls self._set_hostport(), so self.host is
368 # self._tunnel_host below.
369 self._tunnel()
370 # Mark this connection as not reusable
371 self.auto_open = 0
373 # Override the host with the one we're requesting data from.
374 hostname = self._tunnel_host
376 server_hostname = hostname
377 if self.server_hostname is not None:
378 server_hostname = self.server_hostname
380 is_time_off = datetime.date.today() < RECENT_DATE
381 if is_time_off:
382 warnings.warn(
383 (
384 "System time is way off (before {0}). This will probably "
385 "lead to SSL verification errors"
386 ).format(RECENT_DATE),
387 SystemTimeWarning,
388 )
390 # Wrap socket using verification with the root certs in
391 # trusted_root_certs
392 default_ssl_context = False
393 if self.ssl_context is None:
394 default_ssl_context = True
395 self.ssl_context = create_urllib3_context(
396 ssl_version=resolve_ssl_version(self.ssl_version),
397 cert_reqs=resolve_cert_reqs(self.cert_reqs),
398 )
400 context = self.ssl_context
401 context.verify_mode = resolve_cert_reqs(self.cert_reqs)
403 # Try to load OS default certs if none are given.
404 # Works well on Windows (requires Python3.4+)
405 if (
406 not self.ca_certs
407 and not self.ca_cert_dir
408 and not self.ca_cert_data
409 and default_ssl_context
410 and hasattr(context, "load_default_certs")
411 ):
412 context.load_default_certs()
414 self.sock = ssl_wrap_socket(
415 sock=conn,
416 keyfile=self.key_file,
417 certfile=self.cert_file,
418 key_password=self.key_password,
419 ca_certs=self.ca_certs,
420 ca_cert_dir=self.ca_cert_dir,
421 ca_cert_data=self.ca_cert_data,
422 server_hostname=server_hostname,
423 ssl_context=context,
424 tls_in_tls=tls_in_tls,
425 )
427 # If we're using all defaults and the connection
428 # is TLSv1 or TLSv1.1 we throw a DeprecationWarning
429 # for the host.
430 if (
431 default_ssl_context
432 and self.ssl_version is None
433 and hasattr(self.sock, "version")
434 and self.sock.version() in {"TLSv1", "TLSv1.1"}
435 ):
436 warnings.warn(
437 "Negotiating TLSv1/TLSv1.1 by default is deprecated "
438 "and will be disabled in urllib3 v2.0.0. Connecting to "
439 "'%s' with '%s' can be enabled by explicitly opting-in "
440 "with 'ssl_version'" % (self.host, self.sock.version()),
441 DeprecationWarning,
442 )
444 if self.assert_fingerprint:
445 assert_fingerprint(
446 self.sock.getpeercert(binary_form=True), self.assert_fingerprint
447 )
448 elif (
449 context.verify_mode != ssl.CERT_NONE
450 and not getattr(context, "check_hostname", False)
451 and self.assert_hostname is not False
452 ):
453 # While urllib3 attempts to always turn off hostname matching from
454 # the TLS library, this cannot always be done. So we check whether
455 # the TLS Library still thinks it's matching hostnames.
456 cert = self.sock.getpeercert()
457 if not cert.get("subjectAltName", ()):
458 warnings.warn(
459 (
460 "Certificate for {0} has no `subjectAltName`, falling back to check for a "
461 "`commonName` for now. This feature is being removed by major browsers and "
462 "deprecated by RFC 2818. (See https://github.com/urllib3/urllib3/issues/497 "
463 "for details.)".format(hostname)
464 ),
465 SubjectAltNameWarning,
466 )
467 _match_hostname(cert, self.assert_hostname or server_hostname)
469 self.is_verified = (
470 context.verify_mode == ssl.CERT_REQUIRED
471 or self.assert_fingerprint is not None
472 )
474 def _connect_tls_proxy(self, hostname, conn):
475 """
476 Establish a TLS connection to the proxy using the provided SSL context.
477 """
478 proxy_config = self.proxy_config
479 ssl_context = proxy_config.ssl_context
480 if ssl_context:
481 # If the user provided a proxy context, we assume CA and client
482 # certificates have already been set
483 return ssl_wrap_socket(
484 sock=conn,
485 server_hostname=hostname,
486 ssl_context=ssl_context,
487 )
489 ssl_context = create_proxy_ssl_context(
490 self.ssl_version,
491 self.cert_reqs,
492 self.ca_certs,
493 self.ca_cert_dir,
494 self.ca_cert_data,
495 )
497 # If no cert was provided, use only the default options for server
498 # certificate validation
499 socket = ssl_wrap_socket(
500 sock=conn,
501 ca_certs=self.ca_certs,
502 ca_cert_dir=self.ca_cert_dir,
503 ca_cert_data=self.ca_cert_data,
504 server_hostname=hostname,
505 ssl_context=ssl_context,
506 )
508 if ssl_context.verify_mode != ssl.CERT_NONE and not getattr(
509 ssl_context, "check_hostname", False
510 ):
511 # While urllib3 attempts to always turn off hostname matching from
512 # the TLS library, this cannot always be done. So we check whether
513 # the TLS Library still thinks it's matching hostnames.
514 cert = socket.getpeercert()
515 if not cert.get("subjectAltName", ()):
516 warnings.warn(
517 (
518 "Certificate for {0} has no `subjectAltName`, falling back to check for a "
519 "`commonName` for now. This feature is being removed by major browsers and "
520 "deprecated by RFC 2818. (See https://github.com/urllib3/urllib3/issues/497 "
521 "for details.)".format(hostname)
522 ),
523 SubjectAltNameWarning,
524 )
525 _match_hostname(cert, hostname)
527 self.proxy_is_verified = ssl_context.verify_mode == ssl.CERT_REQUIRED
528 return socket
531def _match_hostname(cert, asserted_hostname):
532 # Our upstream implementation of ssl.match_hostname()
533 # only applies this normalization to IP addresses so it doesn't
534 # match DNS SANs so we do the same thing!
535 stripped_hostname = asserted_hostname.strip("u[]")
536 if is_ipaddress(stripped_hostname):
537 asserted_hostname = stripped_hostname
539 try:
540 match_hostname(cert, asserted_hostname)
541 except CertificateError as e:
542 log.warning(
543 "Certificate did not match expected hostname: %s. Certificate: %s",
544 asserted_hostname,
545 cert,
546 )
547 # Add cert to exception and reraise so client code can inspect
548 # the cert when catching the exception, if they want to
549 e._peer_cert = cert
550 raise
553def _get_default_user_agent():
554 return "python-urllib3/%s" % __version__
557class DummyConnection(object):
558 """Used to detect a failed ConnectionCls import."""
560 pass
563if not ssl: 563 ↛ 564line 563 didn't jump to line 564, because the condition on line 563 was never true
564 HTTPSConnection = DummyConnection # noqa: F811
567VerifiedHTTPSConnection = HTTPSConnection