Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/requests/utils.py: 11%
482 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2requests.utils
3~~~~~~~~~~~~~~
5This module provides utility functions that are used within Requests
6that are also useful for external consumption.
7"""
9import codecs
10import contextlib
11import io
12import os
13import re
14import socket
15import struct
16import sys
17import tempfile
18import warnings
19import zipfile
20from collections import OrderedDict
22from urllib3.util import make_headers, parse_url
24from . import certs
25from .__version__ import __version__
27# to_native_string is unused here, but imported here for backwards compatibility
28from ._internal_utils import HEADER_VALIDATORS, to_native_string # noqa: F401
29from .compat import (
30 Mapping,
31 basestring,
32 bytes,
33 getproxies,
34 getproxies_environment,
35 integer_types,
36)
37from .compat import parse_http_list as _parse_list_header
38from .compat import (
39 proxy_bypass,
40 proxy_bypass_environment,
41 quote,
42 str,
43 unquote,
44 urlparse,
45 urlunparse,
46)
47from .cookies import cookiejar_from_dict
48from .exceptions import (
49 FileModeWarning,
50 InvalidHeader,
51 InvalidURL,
52 UnrewindableBodyError,
53)
54from .structures import CaseInsensitiveDict
56NETRC_FILES = (".netrc", "_netrc")
58DEFAULT_CA_BUNDLE_PATH = certs.where()
60DEFAULT_PORTS = {"http": 80, "https": 443}
62# Ensure that ', ' is used to preserve previous delimiter behavior.
63DEFAULT_ACCEPT_ENCODING = ", ".join(
64 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"])
65)
68if sys.platform == "win32": 68 ↛ 71line 68 didn't jump to line 71, because the condition on line 68 was never true
69 # provide a proxy_bypass version on Windows without DNS lookups
71 def proxy_bypass_registry(host):
72 try:
73 import winreg
74 except ImportError:
75 return False
77 try:
78 internetSettings = winreg.OpenKey(
79 winreg.HKEY_CURRENT_USER,
80 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
81 )
82 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
83 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
84 # ProxyOverride is almost always a string
85 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
86 except (OSError, ValueError):
87 return False
88 if not proxyEnable or not proxyOverride:
89 return False
91 # make a check value list from the registry entry: replace the
92 # '<local>' string by the localhost entry and the corresponding
93 # canonical entry.
94 proxyOverride = proxyOverride.split(";")
95 # now check if we match one of the registry values.
96 for test in proxyOverride:
97 if test == "<local>":
98 if "." not in host:
99 return True
100 test = test.replace(".", r"\.") # mask dots
101 test = test.replace("*", r".*") # change glob sequence
102 test = test.replace("?", r".") # change glob char
103 if re.match(test, host, re.I):
104 return True
105 return False
107 def proxy_bypass(host): # noqa
108 """Return True, if the host should be bypassed.
110 Checks proxy settings gathered from the environment, if specified,
111 or the registry.
112 """
113 if getproxies_environment():
114 return proxy_bypass_environment(host)
115 else:
116 return proxy_bypass_registry(host)
119def dict_to_sequence(d):
120 """Returns an internal sequence dictionary update."""
122 if hasattr(d, "items"):
123 d = d.items()
125 return d
128def super_len(o):
129 total_length = None
130 current_position = 0
132 if hasattr(o, "__len__"):
133 total_length = len(o)
135 elif hasattr(o, "len"):
136 total_length = o.len
138 elif hasattr(o, "fileno"):
139 try:
140 fileno = o.fileno()
141 except (io.UnsupportedOperation, AttributeError):
142 # AttributeError is a surprising exception, seeing as how we've just checked
143 # that `hasattr(o, 'fileno')`. It happens for objects obtained via
144 # `Tarfile.extractfile()`, per issue 5229.
145 pass
146 else:
147 total_length = os.fstat(fileno).st_size
149 # Having used fstat to determine the file length, we need to
150 # confirm that this file was opened up in binary mode.
151 if "b" not in o.mode:
152 warnings.warn(
153 (
154 "Requests has determined the content-length for this "
155 "request using the binary size of the file: however, the "
156 "file has been opened in text mode (i.e. without the 'b' "
157 "flag in the mode). This may lead to an incorrect "
158 "content-length. In Requests 3.0, support will be removed "
159 "for files in text mode."
160 ),
161 FileModeWarning,
162 )
164 if hasattr(o, "tell"):
165 try:
166 current_position = o.tell()
167 except OSError:
168 # This can happen in some weird situations, such as when the file
169 # is actually a special file descriptor like stdin. In this
170 # instance, we don't know what the length is, so set it to zero and
171 # let requests chunk it instead.
172 if total_length is not None:
173 current_position = total_length
174 else:
175 if hasattr(o, "seek") and total_length is None:
176 # StringIO and BytesIO have seek but no usable fileno
177 try:
178 # seek to end of file
179 o.seek(0, 2)
180 total_length = o.tell()
182 # seek back to current position to support
183 # partially read file-like objects
184 o.seek(current_position or 0)
185 except OSError:
186 total_length = 0
188 if total_length is None:
189 total_length = 0
191 return max(0, total_length - current_position)
194def get_netrc_auth(url, raise_errors=False):
195 """Returns the Requests tuple auth for a given url from netrc."""
197 netrc_file = os.environ.get("NETRC")
198 if netrc_file is not None:
199 netrc_locations = (netrc_file,)
200 else:
201 netrc_locations = (f"~/{f}" for f in NETRC_FILES)
203 try:
204 from netrc import NetrcParseError, netrc
206 netrc_path = None
208 for f in netrc_locations:
209 try:
210 loc = os.path.expanduser(f)
211 except KeyError:
212 # os.path.expanduser can fail when $HOME is undefined and
213 # getpwuid fails. See https://bugs.python.org/issue20164 &
214 # https://github.com/psf/requests/issues/1846
215 return
217 if os.path.exists(loc):
218 netrc_path = loc
219 break
221 # Abort early if there isn't one.
222 if netrc_path is None:
223 return
225 ri = urlparse(url)
227 # Strip port numbers from netloc. This weird `if...encode`` dance is
228 # used for Python 3.2, which doesn't support unicode literals.
229 splitstr = b":"
230 if isinstance(url, str):
231 splitstr = splitstr.decode("ascii")
232 host = ri.netloc.split(splitstr)[0]
234 try:
235 _netrc = netrc(netrc_path).authenticators(host)
236 if _netrc:
237 # Return with login / password
238 login_i = 0 if _netrc[0] else 1
239 return (_netrc[login_i], _netrc[2])
240 except (NetrcParseError, OSError):
241 # If there was a parsing error or a permissions issue reading the file,
242 # we'll just skip netrc auth unless explicitly asked to raise errors.
243 if raise_errors:
244 raise
246 # App Engine hackiness.
247 except (ImportError, AttributeError):
248 pass
251def guess_filename(obj):
252 """Tries to guess the filename of the given object."""
253 name = getattr(obj, "name", None)
254 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">":
255 return os.path.basename(name)
258def extract_zipped_paths(path):
259 """Replace nonexistent paths that look like they refer to a member of a zip
260 archive with the location of an extracted copy of the target, or else
261 just return the provided path unchanged.
262 """
263 if os.path.exists(path):
264 # this is already a valid path, no need to do anything further
265 return path
267 # find the first valid part of the provided path and treat that as a zip archive
268 # assume the rest of the path is the name of a member in the archive
269 archive, member = os.path.split(path)
270 while archive and not os.path.exists(archive):
271 archive, prefix = os.path.split(archive)
272 if not prefix:
273 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
274 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
275 break
276 member = "/".join([prefix, member])
278 if not zipfile.is_zipfile(archive):
279 return path
281 zip_file = zipfile.ZipFile(archive)
282 if member not in zip_file.namelist():
283 return path
285 # we have a valid zip archive and a valid member of that archive
286 tmp = tempfile.gettempdir()
287 extracted_path = os.path.join(tmp, member.split("/")[-1])
288 if not os.path.exists(extracted_path):
289 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition
290 with atomic_open(extracted_path) as file_handler:
291 file_handler.write(zip_file.read(member))
292 return extracted_path
295@contextlib.contextmanager
296def atomic_open(filename):
297 """Write a file to the disk in an atomic fashion"""
298 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
299 try:
300 with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
301 yield tmp_handler
302 os.replace(tmp_name, filename)
303 except BaseException:
304 os.remove(tmp_name)
305 raise
308def from_key_val_list(value):
309 """Take an object and test to see if it can be represented as a
310 dictionary. Unless it can not be represented as such, return an
311 OrderedDict, e.g.,
313 ::
315 >>> from_key_val_list([('key', 'val')])
316 OrderedDict([('key', 'val')])
317 >>> from_key_val_list('string')
318 Traceback (most recent call last):
319 ...
320 ValueError: cannot encode objects that are not 2-tuples
321 >>> from_key_val_list({'key': 'val'})
322 OrderedDict([('key', 'val')])
324 :rtype: OrderedDict
325 """
326 if value is None:
327 return None
329 if isinstance(value, (str, bytes, bool, int)):
330 raise ValueError("cannot encode objects that are not 2-tuples")
332 return OrderedDict(value)
335def to_key_val_list(value):
336 """Take an object and test to see if it can be represented as a
337 dictionary. If it can be, return a list of tuples, e.g.,
339 ::
341 >>> to_key_val_list([('key', 'val')])
342 [('key', 'val')]
343 >>> to_key_val_list({'key': 'val'})
344 [('key', 'val')]
345 >>> to_key_val_list('string')
346 Traceback (most recent call last):
347 ...
348 ValueError: cannot encode objects that are not 2-tuples
350 :rtype: list
351 """
352 if value is None:
353 return None
355 if isinstance(value, (str, bytes, bool, int)):
356 raise ValueError("cannot encode objects that are not 2-tuples")
358 if isinstance(value, Mapping):
359 value = value.items()
361 return list(value)
364# From mitsuhiko/werkzeug (used with permission).
365def parse_list_header(value):
366 """Parse lists as described by RFC 2068 Section 2.
368 In particular, parse comma-separated lists where the elements of
369 the list may include quoted-strings. A quoted-string could
370 contain a comma. A non-quoted string could have quotes in the
371 middle. Quotes are removed automatically after parsing.
373 It basically works like :func:`parse_set_header` just that items
374 may appear multiple times and case sensitivity is preserved.
376 The return value is a standard :class:`list`:
378 >>> parse_list_header('token, "quoted value"')
379 ['token', 'quoted value']
381 To create a header from the :class:`list` again, use the
382 :func:`dump_header` function.
384 :param value: a string with a list header.
385 :return: :class:`list`
386 :rtype: list
387 """
388 result = []
389 for item in _parse_list_header(value):
390 if item[:1] == item[-1:] == '"':
391 item = unquote_header_value(item[1:-1])
392 result.append(item)
393 return result
396# From mitsuhiko/werkzeug (used with permission).
397def parse_dict_header(value):
398 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
399 convert them into a python dict:
401 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
402 >>> type(d) is dict
403 True
404 >>> sorted(d.items())
405 [('bar', 'as well'), ('foo', 'is a fish')]
407 If there is no value for a key it will be `None`:
409 >>> parse_dict_header('key_without_value')
410 {'key_without_value': None}
412 To create a header from the :class:`dict` again, use the
413 :func:`dump_header` function.
415 :param value: a string with a dict header.
416 :return: :class:`dict`
417 :rtype: dict
418 """
419 result = {}
420 for item in _parse_list_header(value):
421 if "=" not in item:
422 result[item] = None
423 continue
424 name, value = item.split("=", 1)
425 if value[:1] == value[-1:] == '"':
426 value = unquote_header_value(value[1:-1])
427 result[name] = value
428 return result
431# From mitsuhiko/werkzeug (used with permission).
432def unquote_header_value(value, is_filename=False):
433 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
434 This does not use the real unquoting but what browsers are actually
435 using for quoting.
437 :param value: the header value to unquote.
438 :rtype: str
439 """
440 if value and value[0] == value[-1] == '"':
441 # this is not the real unquoting, but fixing this so that the
442 # RFC is met will result in bugs with internet explorer and
443 # probably some other browsers as well. IE for example is
444 # uploading files with "C:\foo\bar.txt" as filename
445 value = value[1:-1]
447 # if this is a filename and the starting characters look like
448 # a UNC path, then just return the value without quotes. Using the
449 # replace sequence below on a UNC path has the effect of turning
450 # the leading double slash into a single slash and then
451 # _fix_ie_filename() doesn't work correctly. See #458.
452 if not is_filename or value[:2] != "\\\\":
453 return value.replace("\\\\", "\\").replace('\\"', '"')
454 return value
457def dict_from_cookiejar(cj):
458 """Returns a key/value dictionary from a CookieJar.
460 :param cj: CookieJar object to extract cookies from.
461 :rtype: dict
462 """
464 cookie_dict = {}
466 for cookie in cj:
467 cookie_dict[cookie.name] = cookie.value
469 return cookie_dict
472def add_dict_to_cookiejar(cj, cookie_dict):
473 """Returns a CookieJar from a key/value dictionary.
475 :param cj: CookieJar to insert cookies into.
476 :param cookie_dict: Dict of key/values to insert into CookieJar.
477 :rtype: CookieJar
478 """
480 return cookiejar_from_dict(cookie_dict, cj)
483def get_encodings_from_content(content):
484 """Returns encodings from given content string.
486 :param content: bytestring to extract encodings from.
487 """
488 warnings.warn(
489 (
490 "In requests 3.0, get_encodings_from_content will be removed. For "
491 "more information, please see the discussion on issue #2266. (This"
492 " warning should only appear once.)"
493 ),
494 DeprecationWarning,
495 )
497 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
498 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
499 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
501 return (
502 charset_re.findall(content)
503 + pragma_re.findall(content)
504 + xml_re.findall(content)
505 )
508def _parse_content_type_header(header):
509 """Returns content type and parameters from given header
511 :param header: string
512 :return: tuple containing content type and dictionary of
513 parameters
514 """
516 tokens = header.split(";")
517 content_type, params = tokens[0].strip(), tokens[1:]
518 params_dict = {}
519 items_to_strip = "\"' "
521 for param in params:
522 param = param.strip()
523 if param:
524 key, value = param, True
525 index_of_equals = param.find("=")
526 if index_of_equals != -1:
527 key = param[:index_of_equals].strip(items_to_strip)
528 value = param[index_of_equals + 1 :].strip(items_to_strip)
529 params_dict[key.lower()] = value
530 return content_type, params_dict
533def get_encoding_from_headers(headers):
534 """Returns encodings from given HTTP Header Dict.
536 :param headers: dictionary to extract encoding from.
537 :rtype: str
538 """
540 content_type = headers.get("content-type")
542 if not content_type:
543 return None
545 content_type, params = _parse_content_type_header(content_type)
547 if "charset" in params:
548 return params["charset"].strip("'\"")
550 if "text" in content_type:
551 return "ISO-8859-1"
553 if "application/json" in content_type:
554 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
555 return "utf-8"
558def stream_decode_response_unicode(iterator, r):
559 """Stream decodes an iterator."""
561 if r.encoding is None:
562 yield from iterator
563 return
565 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
566 for chunk in iterator:
567 rv = decoder.decode(chunk)
568 if rv:
569 yield rv
570 rv = decoder.decode(b"", final=True)
571 if rv:
572 yield rv
575def iter_slices(string, slice_length):
576 """Iterate over slices of a string."""
577 pos = 0
578 if slice_length is None or slice_length <= 0:
579 slice_length = len(string)
580 while pos < len(string):
581 yield string[pos : pos + slice_length]
582 pos += slice_length
585def get_unicode_from_response(r):
586 """Returns the requested content back in unicode.
588 :param r: Response object to get unicode content from.
590 Tried:
592 1. charset from content-type
593 2. fall back and replace all unicode characters
595 :rtype: str
596 """
597 warnings.warn(
598 (
599 "In requests 3.0, get_unicode_from_response will be removed. For "
600 "more information, please see the discussion on issue #2266. (This"
601 " warning should only appear once.)"
602 ),
603 DeprecationWarning,
604 )
606 tried_encodings = []
608 # Try charset from content-type
609 encoding = get_encoding_from_headers(r.headers)
611 if encoding:
612 try:
613 return str(r.content, encoding)
614 except UnicodeError:
615 tried_encodings.append(encoding)
617 # Fall back:
618 try:
619 return str(r.content, encoding, errors="replace")
620 except TypeError:
621 return r.content
624# The unreserved URI characters (RFC 3986)
625UNRESERVED_SET = frozenset(
626 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
627)
630def unquote_unreserved(uri):
631 """Un-escape any percent-escape sequences in a URI that are unreserved
632 characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
634 :rtype: str
635 """
636 parts = uri.split("%")
637 for i in range(1, len(parts)):
638 h = parts[i][0:2]
639 if len(h) == 2 and h.isalnum():
640 try:
641 c = chr(int(h, 16))
642 except ValueError:
643 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
645 if c in UNRESERVED_SET:
646 parts[i] = c + parts[i][2:]
647 else:
648 parts[i] = f"%{parts[i]}"
649 else:
650 parts[i] = f"%{parts[i]}"
651 return "".join(parts)
654def requote_uri(uri):
655 """Re-quote the given URI.
657 This function passes the given URI through an unquote/quote cycle to
658 ensure that it is fully and consistently quoted.
660 :rtype: str
661 """
662 safe_with_percent = "!#$%&'()*+,/:;=?@[]~"
663 safe_without_percent = "!#$&'()*+,/:;=?@[]~"
664 try:
665 # Unquote only the unreserved characters
666 # Then quote only illegal characters (do not quote reserved,
667 # unreserved, or '%')
668 return quote(unquote_unreserved(uri), safe=safe_with_percent)
669 except InvalidURL:
670 # We couldn't unquote the given URI, so let's try quoting it, but
671 # there may be unquoted '%'s in the URI. We need to make sure they're
672 # properly quoted so they do not cause issues elsewhere.
673 return quote(uri, safe=safe_without_percent)
676def address_in_network(ip, net):
677 """This function allows you to check if an IP belongs to a network subnet
679 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
680 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
682 :rtype: bool
683 """
684 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
685 netaddr, bits = net.split("/")
686 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
687 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
688 return (ipaddr & netmask) == (network & netmask)
691def dotted_netmask(mask):
692 """Converts mask from /xx format to xxx.xxx.xxx.xxx
694 Example: if mask is 24 function returns 255.255.255.0
696 :rtype: str
697 """
698 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
699 return socket.inet_ntoa(struct.pack(">I", bits))
702def is_ipv4_address(string_ip):
703 """
704 :rtype: bool
705 """
706 try:
707 socket.inet_aton(string_ip)
708 except OSError:
709 return False
710 return True
713def is_valid_cidr(string_network):
714 """
715 Very simple check of the cidr format in no_proxy variable.
717 :rtype: bool
718 """
719 if string_network.count("/") == 1:
720 try:
721 mask = int(string_network.split("/")[1])
722 except ValueError:
723 return False
725 if mask < 1 or mask > 32:
726 return False
728 try:
729 socket.inet_aton(string_network.split("/")[0])
730 except OSError:
731 return False
732 else:
733 return False
734 return True
737@contextlib.contextmanager
738def set_environ(env_name, value):
739 """Set the environment variable 'env_name' to 'value'
741 Save previous value, yield, and then restore the previous value stored in
742 the environment variable 'env_name'.
744 If 'value' is None, do nothing"""
745 value_changed = value is not None
746 if value_changed:
747 old_value = os.environ.get(env_name)
748 os.environ[env_name] = value
749 try:
750 yield
751 finally:
752 if value_changed:
753 if old_value is None:
754 del os.environ[env_name]
755 else:
756 os.environ[env_name] = old_value
759def should_bypass_proxies(url, no_proxy):
760 """
761 Returns whether we should bypass proxies or not.
763 :rtype: bool
764 """
765 # Prioritize lowercase environment variables over uppercase
766 # to keep a consistent behaviour with other http projects (curl, wget).
767 def get_proxy(key):
768 return os.environ.get(key) or os.environ.get(key.upper())
770 # First check whether no_proxy is defined. If it is, check that the URL
771 # we're getting isn't in the no_proxy list.
772 no_proxy_arg = no_proxy
773 if no_proxy is None:
774 no_proxy = get_proxy("no_proxy")
775 parsed = urlparse(url)
777 if parsed.hostname is None:
778 # URLs don't always have hostnames, e.g. file:/// urls.
779 return True
781 if no_proxy:
782 # We need to check whether we match here. We need to see if we match
783 # the end of the hostname, both with and without the port.
784 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host)
786 if is_ipv4_address(parsed.hostname):
787 for proxy_ip in no_proxy:
788 if is_valid_cidr(proxy_ip):
789 if address_in_network(parsed.hostname, proxy_ip):
790 return True
791 elif parsed.hostname == proxy_ip:
792 # If no_proxy ip was defined in plain IP notation instead of cidr notation &
793 # matches the IP of the index
794 return True
795 else:
796 host_with_port = parsed.hostname
797 if parsed.port:
798 host_with_port += f":{parsed.port}"
800 for host in no_proxy:
801 if parsed.hostname.endswith(host) or host_with_port.endswith(host):
802 # The URL does match something in no_proxy, so we don't want
803 # to apply the proxies on this URL.
804 return True
806 with set_environ("no_proxy", no_proxy_arg):
807 # parsed.hostname can be `None` in cases such as a file URI.
808 try:
809 bypass = proxy_bypass(parsed.hostname)
810 except (TypeError, socket.gaierror):
811 bypass = False
813 if bypass:
814 return True
816 return False
819def get_environ_proxies(url, no_proxy=None):
820 """
821 Return a dict of environment proxies.
823 :rtype: dict
824 """
825 if should_bypass_proxies(url, no_proxy=no_proxy):
826 return {}
827 else:
828 return getproxies()
831def select_proxy(url, proxies):
832 """Select a proxy for the url, if applicable.
834 :param url: The url being for the request
835 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
836 """
837 proxies = proxies or {}
838 urlparts = urlparse(url)
839 if urlparts.hostname is None:
840 return proxies.get(urlparts.scheme, proxies.get("all"))
842 proxy_keys = [
843 urlparts.scheme + "://" + urlparts.hostname,
844 urlparts.scheme,
845 "all://" + urlparts.hostname,
846 "all",
847 ]
848 proxy = None
849 for proxy_key in proxy_keys:
850 if proxy_key in proxies:
851 proxy = proxies[proxy_key]
852 break
854 return proxy
857def resolve_proxies(request, proxies, trust_env=True):
858 """This method takes proxy information from a request and configuration
859 input to resolve a mapping of target proxies. This will consider settings
860 such a NO_PROXY to strip proxy configurations.
862 :param request: Request or PreparedRequest
863 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
864 :param trust_env: Boolean declaring whether to trust environment configs
866 :rtype: dict
867 """
868 proxies = proxies if proxies is not None else {}
869 url = request.url
870 scheme = urlparse(url).scheme
871 no_proxy = proxies.get("no_proxy")
872 new_proxies = proxies.copy()
874 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
875 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
877 proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
879 if proxy:
880 new_proxies.setdefault(scheme, proxy)
881 return new_proxies
884def default_user_agent(name="python-requests"):
885 """
886 Return a string representing the default user agent.
888 :rtype: str
889 """
890 return f"{name}/{__version__}"
893def default_headers():
894 """
895 :rtype: requests.structures.CaseInsensitiveDict
896 """
897 return CaseInsensitiveDict(
898 {
899 "User-Agent": default_user_agent(),
900 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
901 "Accept": "*/*",
902 "Connection": "keep-alive",
903 }
904 )
907def parse_header_links(value):
908 """Return a list of parsed link headers proxies.
910 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
912 :rtype: list
913 """
915 links = []
917 replace_chars = " '\""
919 value = value.strip(replace_chars)
920 if not value:
921 return links
923 for val in re.split(", *<", value):
924 try:
925 url, params = val.split(";", 1)
926 except ValueError:
927 url, params = val, ""
929 link = {"url": url.strip("<> '\"")}
931 for param in params.split(";"):
932 try:
933 key, value = param.split("=")
934 except ValueError:
935 break
937 link[key.strip(replace_chars)] = value.strip(replace_chars)
939 links.append(link)
941 return links
944# Null bytes; no need to recreate these on each call to guess_json_utf
945_null = "\x00".encode("ascii") # encoding to ASCII for Python 3
946_null2 = _null * 2
947_null3 = _null * 3
950def guess_json_utf(data):
951 """
952 :rtype: str
953 """
954 # JSON always starts with two ASCII characters, so detection is as
955 # easy as counting the nulls and from their location and count
956 # determine the encoding. Also detect a BOM, if present.
957 sample = data[:4]
958 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
959 return "utf-32" # BOM included
960 if sample[:3] == codecs.BOM_UTF8:
961 return "utf-8-sig" # BOM included, MS style (discouraged)
962 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
963 return "utf-16" # BOM included
964 nullcount = sample.count(_null)
965 if nullcount == 0:
966 return "utf-8"
967 if nullcount == 2:
968 if sample[::2] == _null2: # 1st and 3rd are null
969 return "utf-16-be"
970 if sample[1::2] == _null2: # 2nd and 4th are null
971 return "utf-16-le"
972 # Did not detect 2 valid UTF-16 ascii-range characters
973 if nullcount == 3:
974 if sample[:3] == _null3:
975 return "utf-32-be"
976 if sample[1:] == _null3:
977 return "utf-32-le"
978 # Did not detect a valid UTF-32 ascii-range character
979 return None
982def prepend_scheme_if_needed(url, new_scheme):
983 """Given a URL that may or may not have a scheme, prepend the given scheme.
984 Does not replace a present scheme with the one provided as an argument.
986 :rtype: str
987 """
988 parsed = parse_url(url)
989 scheme, auth, host, port, path, query, fragment = parsed
991 # A defect in urlparse determines that there isn't a netloc present in some
992 # urls. We previously assumed parsing was overly cautious, and swapped the
993 # netloc and path. Due to a lack of tests on the original defect, this is
994 # maintained with parse_url for backwards compatibility.
995 netloc = parsed.netloc
996 if not netloc:
997 netloc, path = path, netloc
999 if auth:
1000 # parse_url doesn't provide the netloc with auth
1001 # so we'll add it ourselves.
1002 netloc = "@".join([auth, netloc])
1003 if scheme is None:
1004 scheme = new_scheme
1005 if path is None:
1006 path = ""
1008 return urlunparse((scheme, netloc, path, "", query, fragment))
1011def get_auth_from_url(url):
1012 """Given a url with authentication components, extract them into a tuple of
1013 username,password.
1015 :rtype: (str,str)
1016 """
1017 parsed = urlparse(url)
1019 try:
1020 auth = (unquote(parsed.username), unquote(parsed.password))
1021 except (AttributeError, TypeError):
1022 auth = ("", "")
1024 return auth
1027def check_header_validity(header):
1028 """Verifies that header parts don't contain leading whitespace
1029 reserved characters, or return characters.
1031 :param header: tuple, in the format (name, value).
1032 """
1033 name, value = header
1035 for part in header:
1036 if type(part) not in HEADER_VALIDATORS:
1037 raise InvalidHeader(
1038 f"Header part ({part!r}) from {{{name!r}: {value!r}}} must be "
1039 f"of type str or bytes, not {type(part)}"
1040 )
1042 _validate_header_part(name, "name", HEADER_VALIDATORS[type(name)][0])
1043 _validate_header_part(value, "value", HEADER_VALIDATORS[type(value)][1])
1046def _validate_header_part(header_part, header_kind, validator):
1047 if not validator.match(header_part):
1048 raise InvalidHeader(
1049 f"Invalid leading whitespace, reserved character(s), or return"
1050 f"character(s) in header {header_kind}: {header_part!r}"
1051 )
1054def urldefragauth(url):
1055 """
1056 Given a url remove the fragment and the authentication part.
1058 :rtype: str
1059 """
1060 scheme, netloc, path, params, query, fragment = urlparse(url)
1062 # see func:`prepend_scheme_if_needed`
1063 if not netloc:
1064 netloc, path = path, netloc
1066 netloc = netloc.rsplit("@", 1)[-1]
1068 return urlunparse((scheme, netloc, path, params, query, ""))
1071def rewind_body(prepared_request):
1072 """Move file pointer back to its recorded starting position
1073 so it can be read again on redirect.
1074 """
1075 body_seek = getattr(prepared_request.body, "seek", None)
1076 if body_seek is not None and isinstance(
1077 prepared_request._body_position, integer_types
1078 ):
1079 try:
1080 body_seek(prepared_request._body_position)
1081 except OSError:
1082 raise UnrewindableBodyError(
1083 "An error occurred when rewinding request body for redirect."
1084 )
1085 else:
1086 raise UnrewindableBodyError("Unable to rewind request body for redirect.")