Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/requests/utils.py: 11%

482 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1""" 

2requests.utils 

3~~~~~~~~~~~~~~ 

4 

5This module provides utility functions that are used within Requests 

6that are also useful for external consumption. 

7""" 

8 

9import codecs 

10import contextlib 

11import io 

12import os 

13import re 

14import socket 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import zipfile 

20from collections import OrderedDict 

21 

22from urllib3.util import make_headers, parse_url 

23 

24from . import certs 

25from .__version__ import __version__ 

26 

27# to_native_string is unused here, but imported here for backwards compatibility 

28from ._internal_utils import HEADER_VALIDATORS, to_native_string # noqa: F401 

29from .compat import ( 

30 Mapping, 

31 basestring, 

32 bytes, 

33 getproxies, 

34 getproxies_environment, 

35 integer_types, 

36) 

37from .compat import parse_http_list as _parse_list_header 

38from .compat import ( 

39 proxy_bypass, 

40 proxy_bypass_environment, 

41 quote, 

42 str, 

43 unquote, 

44 urlparse, 

45 urlunparse, 

46) 

47from .cookies import cookiejar_from_dict 

48from .exceptions import ( 

49 FileModeWarning, 

50 InvalidHeader, 

51 InvalidURL, 

52 UnrewindableBodyError, 

53) 

54from .structures import CaseInsensitiveDict 

55 

56NETRC_FILES = (".netrc", "_netrc") 

57 

58DEFAULT_CA_BUNDLE_PATH = certs.where() 

59 

60DEFAULT_PORTS = {"http": 80, "https": 443} 

61 

62# Ensure that ', ' is used to preserve previous delimiter behavior. 

63DEFAULT_ACCEPT_ENCODING = ", ".join( 

64 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"]) 

65) 

66 

67 

68if sys.platform == "win32": 68 ↛ 71line 68 didn't jump to line 71, because the condition on line 68 was never true

69 # provide a proxy_bypass version on Windows without DNS lookups 

70 

71 def proxy_bypass_registry(host): 

72 try: 

73 import winreg 

74 except ImportError: 

75 return False 

76 

77 try: 

78 internetSettings = winreg.OpenKey( 

79 winreg.HKEY_CURRENT_USER, 

80 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", 

81 ) 

82 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it 

83 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) 

84 # ProxyOverride is almost always a string 

85 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] 

86 except (OSError, ValueError): 

87 return False 

88 if not proxyEnable or not proxyOverride: 

89 return False 

90 

91 # make a check value list from the registry entry: replace the 

92 # '<local>' string by the localhost entry and the corresponding 

93 # canonical entry. 

94 proxyOverride = proxyOverride.split(";") 

95 # now check if we match one of the registry values. 

96 for test in proxyOverride: 

97 if test == "<local>": 

98 if "." not in host: 

99 return True 

100 test = test.replace(".", r"\.") # mask dots 

101 test = test.replace("*", r".*") # change glob sequence 

102 test = test.replace("?", r".") # change glob char 

103 if re.match(test, host, re.I): 

104 return True 

105 return False 

106 

107 def proxy_bypass(host): # noqa 

108 """Return True, if the host should be bypassed. 

109 

110 Checks proxy settings gathered from the environment, if specified, 

111 or the registry. 

112 """ 

113 if getproxies_environment(): 

114 return proxy_bypass_environment(host) 

115 else: 

116 return proxy_bypass_registry(host) 

117 

118 

119def dict_to_sequence(d): 

120 """Returns an internal sequence dictionary update.""" 

121 

122 if hasattr(d, "items"): 

123 d = d.items() 

124 

125 return d 

126 

127 

128def super_len(o): 

129 total_length = None 

130 current_position = 0 

131 

132 if hasattr(o, "__len__"): 

133 total_length = len(o) 

134 

135 elif hasattr(o, "len"): 

136 total_length = o.len 

137 

138 elif hasattr(o, "fileno"): 

139 try: 

140 fileno = o.fileno() 

141 except (io.UnsupportedOperation, AttributeError): 

142 # AttributeError is a surprising exception, seeing as how we've just checked 

143 # that `hasattr(o, 'fileno')`. It happens for objects obtained via 

144 # `Tarfile.extractfile()`, per issue 5229. 

145 pass 

146 else: 

147 total_length = os.fstat(fileno).st_size 

148 

149 # Having used fstat to determine the file length, we need to 

150 # confirm that this file was opened up in binary mode. 

151 if "b" not in o.mode: 

152 warnings.warn( 

153 ( 

154 "Requests has determined the content-length for this " 

155 "request using the binary size of the file: however, the " 

156 "file has been opened in text mode (i.e. without the 'b' " 

157 "flag in the mode). This may lead to an incorrect " 

158 "content-length. In Requests 3.0, support will be removed " 

159 "for files in text mode." 

160 ), 

161 FileModeWarning, 

162 ) 

163 

164 if hasattr(o, "tell"): 

165 try: 

166 current_position = o.tell() 

167 except OSError: 

168 # This can happen in some weird situations, such as when the file 

169 # is actually a special file descriptor like stdin. In this 

170 # instance, we don't know what the length is, so set it to zero and 

171 # let requests chunk it instead. 

172 if total_length is not None: 

173 current_position = total_length 

174 else: 

175 if hasattr(o, "seek") and total_length is None: 

176 # StringIO and BytesIO have seek but no usable fileno 

177 try: 

178 # seek to end of file 

179 o.seek(0, 2) 

180 total_length = o.tell() 

181 

182 # seek back to current position to support 

183 # partially read file-like objects 

184 o.seek(current_position or 0) 

185 except OSError: 

186 total_length = 0 

187 

188 if total_length is None: 

189 total_length = 0 

190 

191 return max(0, total_length - current_position) 

192 

193 

194def get_netrc_auth(url, raise_errors=False): 

195 """Returns the Requests tuple auth for a given url from netrc.""" 

196 

197 netrc_file = os.environ.get("NETRC") 

198 if netrc_file is not None: 

199 netrc_locations = (netrc_file,) 

200 else: 

201 netrc_locations = (f"~/{f}" for f in NETRC_FILES) 

202 

203 try: 

204 from netrc import NetrcParseError, netrc 

205 

206 netrc_path = None 

207 

208 for f in netrc_locations: 

209 try: 

210 loc = os.path.expanduser(f) 

211 except KeyError: 

212 # os.path.expanduser can fail when $HOME is undefined and 

213 # getpwuid fails. See https://bugs.python.org/issue20164 & 

214 # https://github.com/psf/requests/issues/1846 

215 return 

216 

217 if os.path.exists(loc): 

218 netrc_path = loc 

219 break 

220 

221 # Abort early if there isn't one. 

222 if netrc_path is None: 

223 return 

224 

225 ri = urlparse(url) 

226 

227 # Strip port numbers from netloc. This weird `if...encode`` dance is 

228 # used for Python 3.2, which doesn't support unicode literals. 

229 splitstr = b":" 

230 if isinstance(url, str): 

231 splitstr = splitstr.decode("ascii") 

232 host = ri.netloc.split(splitstr)[0] 

233 

234 try: 

235 _netrc = netrc(netrc_path).authenticators(host) 

236 if _netrc: 

237 # Return with login / password 

238 login_i = 0 if _netrc[0] else 1 

239 return (_netrc[login_i], _netrc[2]) 

240 except (NetrcParseError, OSError): 

241 # If there was a parsing error or a permissions issue reading the file, 

242 # we'll just skip netrc auth unless explicitly asked to raise errors. 

243 if raise_errors: 

244 raise 

245 

246 # App Engine hackiness. 

247 except (ImportError, AttributeError): 

248 pass 

249 

250 

251def guess_filename(obj): 

252 """Tries to guess the filename of the given object.""" 

253 name = getattr(obj, "name", None) 

254 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">": 

255 return os.path.basename(name) 

256 

257 

258def extract_zipped_paths(path): 

259 """Replace nonexistent paths that look like they refer to a member of a zip 

260 archive with the location of an extracted copy of the target, or else 

261 just return the provided path unchanged. 

262 """ 

263 if os.path.exists(path): 

264 # this is already a valid path, no need to do anything further 

265 return path 

266 

267 # find the first valid part of the provided path and treat that as a zip archive 

268 # assume the rest of the path is the name of a member in the archive 

269 archive, member = os.path.split(path) 

270 while archive and not os.path.exists(archive): 

271 archive, prefix = os.path.split(archive) 

272 if not prefix: 

273 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split), 

274 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users 

275 break 

276 member = "/".join([prefix, member]) 

277 

278 if not zipfile.is_zipfile(archive): 

279 return path 

280 

281 zip_file = zipfile.ZipFile(archive) 

282 if member not in zip_file.namelist(): 

283 return path 

284 

285 # we have a valid zip archive and a valid member of that archive 

286 tmp = tempfile.gettempdir() 

287 extracted_path = os.path.join(tmp, member.split("/")[-1]) 

288 if not os.path.exists(extracted_path): 

289 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition 

290 with atomic_open(extracted_path) as file_handler: 

291 file_handler.write(zip_file.read(member)) 

292 return extracted_path 

293 

294 

295@contextlib.contextmanager 

296def atomic_open(filename): 

297 """Write a file to the disk in an atomic fashion""" 

298 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) 

299 try: 

300 with os.fdopen(tmp_descriptor, "wb") as tmp_handler: 

301 yield tmp_handler 

302 os.replace(tmp_name, filename) 

303 except BaseException: 

304 os.remove(tmp_name) 

305 raise 

306 

307 

308def from_key_val_list(value): 

309 """Take an object and test to see if it can be represented as a 

310 dictionary. Unless it can not be represented as such, return an 

311 OrderedDict, e.g., 

312 

313 :: 

314 

315 >>> from_key_val_list([('key', 'val')]) 

316 OrderedDict([('key', 'val')]) 

317 >>> from_key_val_list('string') 

318 Traceback (most recent call last): 

319 ... 

320 ValueError: cannot encode objects that are not 2-tuples 

321 >>> from_key_val_list({'key': 'val'}) 

322 OrderedDict([('key', 'val')]) 

323 

324 :rtype: OrderedDict 

325 """ 

326 if value is None: 

327 return None 

328 

329 if isinstance(value, (str, bytes, bool, int)): 

330 raise ValueError("cannot encode objects that are not 2-tuples") 

331 

332 return OrderedDict(value) 

333 

334 

335def to_key_val_list(value): 

336 """Take an object and test to see if it can be represented as a 

337 dictionary. If it can be, return a list of tuples, e.g., 

338 

339 :: 

340 

341 >>> to_key_val_list([('key', 'val')]) 

342 [('key', 'val')] 

343 >>> to_key_val_list({'key': 'val'}) 

344 [('key', 'val')] 

345 >>> to_key_val_list('string') 

346 Traceback (most recent call last): 

347 ... 

348 ValueError: cannot encode objects that are not 2-tuples 

349 

350 :rtype: list 

351 """ 

352 if value is None: 

353 return None 

354 

355 if isinstance(value, (str, bytes, bool, int)): 

356 raise ValueError("cannot encode objects that are not 2-tuples") 

357 

358 if isinstance(value, Mapping): 

359 value = value.items() 

360 

361 return list(value) 

362 

363 

364# From mitsuhiko/werkzeug (used with permission). 

365def parse_list_header(value): 

366 """Parse lists as described by RFC 2068 Section 2. 

367 

368 In particular, parse comma-separated lists where the elements of 

369 the list may include quoted-strings. A quoted-string could 

370 contain a comma. A non-quoted string could have quotes in the 

371 middle. Quotes are removed automatically after parsing. 

372 

373 It basically works like :func:`parse_set_header` just that items 

374 may appear multiple times and case sensitivity is preserved. 

375 

376 The return value is a standard :class:`list`: 

377 

378 >>> parse_list_header('token, "quoted value"') 

379 ['token', 'quoted value'] 

380 

381 To create a header from the :class:`list` again, use the 

382 :func:`dump_header` function. 

383 

384 :param value: a string with a list header. 

385 :return: :class:`list` 

386 :rtype: list 

387 """ 

388 result = [] 

389 for item in _parse_list_header(value): 

390 if item[:1] == item[-1:] == '"': 

391 item = unquote_header_value(item[1:-1]) 

392 result.append(item) 

393 return result 

394 

395 

396# From mitsuhiko/werkzeug (used with permission). 

397def parse_dict_header(value): 

398 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

399 convert them into a python dict: 

400 

401 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

402 >>> type(d) is dict 

403 True 

404 >>> sorted(d.items()) 

405 [('bar', 'as well'), ('foo', 'is a fish')] 

406 

407 If there is no value for a key it will be `None`: 

408 

409 >>> parse_dict_header('key_without_value') 

410 {'key_without_value': None} 

411 

412 To create a header from the :class:`dict` again, use the 

413 :func:`dump_header` function. 

414 

415 :param value: a string with a dict header. 

416 :return: :class:`dict` 

417 :rtype: dict 

418 """ 

419 result = {} 

420 for item in _parse_list_header(value): 

421 if "=" not in item: 

422 result[item] = None 

423 continue 

424 name, value = item.split("=", 1) 

425 if value[:1] == value[-1:] == '"': 

426 value = unquote_header_value(value[1:-1]) 

427 result[name] = value 

428 return result 

429 

430 

431# From mitsuhiko/werkzeug (used with permission). 

432def unquote_header_value(value, is_filename=False): 

433 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

434 This does not use the real unquoting but what browsers are actually 

435 using for quoting. 

436 

437 :param value: the header value to unquote. 

438 :rtype: str 

439 """ 

440 if value and value[0] == value[-1] == '"': 

441 # this is not the real unquoting, but fixing this so that the 

442 # RFC is met will result in bugs with internet explorer and 

443 # probably some other browsers as well. IE for example is 

444 # uploading files with "C:\foo\bar.txt" as filename 

445 value = value[1:-1] 

446 

447 # if this is a filename and the starting characters look like 

448 # a UNC path, then just return the value without quotes. Using the 

449 # replace sequence below on a UNC path has the effect of turning 

450 # the leading double slash into a single slash and then 

451 # _fix_ie_filename() doesn't work correctly. See #458. 

452 if not is_filename or value[:2] != "\\\\": 

453 return value.replace("\\\\", "\\").replace('\\"', '"') 

454 return value 

455 

456 

457def dict_from_cookiejar(cj): 

458 """Returns a key/value dictionary from a CookieJar. 

459 

460 :param cj: CookieJar object to extract cookies from. 

461 :rtype: dict 

462 """ 

463 

464 cookie_dict = {} 

465 

466 for cookie in cj: 

467 cookie_dict[cookie.name] = cookie.value 

468 

469 return cookie_dict 

470 

471 

472def add_dict_to_cookiejar(cj, cookie_dict): 

473 """Returns a CookieJar from a key/value dictionary. 

474 

475 :param cj: CookieJar to insert cookies into. 

476 :param cookie_dict: Dict of key/values to insert into CookieJar. 

477 :rtype: CookieJar 

478 """ 

479 

480 return cookiejar_from_dict(cookie_dict, cj) 

481 

482 

483def get_encodings_from_content(content): 

484 """Returns encodings from given content string. 

485 

486 :param content: bytestring to extract encodings from. 

487 """ 

488 warnings.warn( 

489 ( 

490 "In requests 3.0, get_encodings_from_content will be removed. For " 

491 "more information, please see the discussion on issue #2266. (This" 

492 " warning should only appear once.)" 

493 ), 

494 DeprecationWarning, 

495 ) 

496 

497 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) 

498 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) 

499 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') 

500 

501 return ( 

502 charset_re.findall(content) 

503 + pragma_re.findall(content) 

504 + xml_re.findall(content) 

505 ) 

506 

507 

508def _parse_content_type_header(header): 

509 """Returns content type and parameters from given header 

510 

511 :param header: string 

512 :return: tuple containing content type and dictionary of 

513 parameters 

514 """ 

515 

516 tokens = header.split(";") 

517 content_type, params = tokens[0].strip(), tokens[1:] 

518 params_dict = {} 

519 items_to_strip = "\"' " 

520 

521 for param in params: 

522 param = param.strip() 

523 if param: 

524 key, value = param, True 

525 index_of_equals = param.find("=") 

526 if index_of_equals != -1: 

527 key = param[:index_of_equals].strip(items_to_strip) 

528 value = param[index_of_equals + 1 :].strip(items_to_strip) 

529 params_dict[key.lower()] = value 

530 return content_type, params_dict 

531 

532 

533def get_encoding_from_headers(headers): 

534 """Returns encodings from given HTTP Header Dict. 

535 

536 :param headers: dictionary to extract encoding from. 

537 :rtype: str 

538 """ 

539 

540 content_type = headers.get("content-type") 

541 

542 if not content_type: 

543 return None 

544 

545 content_type, params = _parse_content_type_header(content_type) 

546 

547 if "charset" in params: 

548 return params["charset"].strip("'\"") 

549 

550 if "text" in content_type: 

551 return "ISO-8859-1" 

552 

553 if "application/json" in content_type: 

554 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset 

555 return "utf-8" 

556 

557 

558def stream_decode_response_unicode(iterator, r): 

559 """Stream decodes an iterator.""" 

560 

561 if r.encoding is None: 

562 yield from iterator 

563 return 

564 

565 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace") 

566 for chunk in iterator: 

567 rv = decoder.decode(chunk) 

568 if rv: 

569 yield rv 

570 rv = decoder.decode(b"", final=True) 

571 if rv: 

572 yield rv 

573 

574 

575def iter_slices(string, slice_length): 

576 """Iterate over slices of a string.""" 

577 pos = 0 

578 if slice_length is None or slice_length <= 0: 

579 slice_length = len(string) 

580 while pos < len(string): 

581 yield string[pos : pos + slice_length] 

582 pos += slice_length 

583 

584 

585def get_unicode_from_response(r): 

586 """Returns the requested content back in unicode. 

587 

588 :param r: Response object to get unicode content from. 

589 

590 Tried: 

591 

592 1. charset from content-type 

593 2. fall back and replace all unicode characters 

594 

595 :rtype: str 

596 """ 

597 warnings.warn( 

598 ( 

599 "In requests 3.0, get_unicode_from_response will be removed. For " 

600 "more information, please see the discussion on issue #2266. (This" 

601 " warning should only appear once.)" 

602 ), 

603 DeprecationWarning, 

604 ) 

605 

606 tried_encodings = [] 

607 

608 # Try charset from content-type 

609 encoding = get_encoding_from_headers(r.headers) 

610 

611 if encoding: 

612 try: 

613 return str(r.content, encoding) 

614 except UnicodeError: 

615 tried_encodings.append(encoding) 

616 

617 # Fall back: 

618 try: 

619 return str(r.content, encoding, errors="replace") 

620 except TypeError: 

621 return r.content 

622 

623 

624# The unreserved URI characters (RFC 3986) 

625UNRESERVED_SET = frozenset( 

626 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~" 

627) 

628 

629 

630def unquote_unreserved(uri): 

631 """Un-escape any percent-escape sequences in a URI that are unreserved 

632 characters. This leaves all reserved, illegal and non-ASCII bytes encoded. 

633 

634 :rtype: str 

635 """ 

636 parts = uri.split("%") 

637 for i in range(1, len(parts)): 

638 h = parts[i][0:2] 

639 if len(h) == 2 and h.isalnum(): 

640 try: 

641 c = chr(int(h, 16)) 

642 except ValueError: 

643 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") 

644 

645 if c in UNRESERVED_SET: 

646 parts[i] = c + parts[i][2:] 

647 else: 

648 parts[i] = f"%{parts[i]}" 

649 else: 

650 parts[i] = f"%{parts[i]}" 

651 return "".join(parts) 

652 

653 

654def requote_uri(uri): 

655 """Re-quote the given URI. 

656 

657 This function passes the given URI through an unquote/quote cycle to 

658 ensure that it is fully and consistently quoted. 

659 

660 :rtype: str 

661 """ 

662 safe_with_percent = "!#$%&'()*+,/:;=?@[]~" 

663 safe_without_percent = "!#$&'()*+,/:;=?@[]~" 

664 try: 

665 # Unquote only the unreserved characters 

666 # Then quote only illegal characters (do not quote reserved, 

667 # unreserved, or '%') 

668 return quote(unquote_unreserved(uri), safe=safe_with_percent) 

669 except InvalidURL: 

670 # We couldn't unquote the given URI, so let's try quoting it, but 

671 # there may be unquoted '%'s in the URI. We need to make sure they're 

672 # properly quoted so they do not cause issues elsewhere. 

673 return quote(uri, safe=safe_without_percent) 

674 

675 

676def address_in_network(ip, net): 

677 """This function allows you to check if an IP belongs to a network subnet 

678 

679 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 

680 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 

681 

682 :rtype: bool 

683 """ 

684 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0] 

685 netaddr, bits = net.split("/") 

686 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0] 

687 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask 

688 return (ipaddr & netmask) == (network & netmask) 

689 

690 

691def dotted_netmask(mask): 

692 """Converts mask from /xx format to xxx.xxx.xxx.xxx 

693 

694 Example: if mask is 24 function returns 255.255.255.0 

695 

696 :rtype: str 

697 """ 

698 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1 

699 return socket.inet_ntoa(struct.pack(">I", bits)) 

700 

701 

702def is_ipv4_address(string_ip): 

703 """ 

704 :rtype: bool 

705 """ 

706 try: 

707 socket.inet_aton(string_ip) 

708 except OSError: 

709 return False 

710 return True 

711 

712 

713def is_valid_cidr(string_network): 

714 """ 

715 Very simple check of the cidr format in no_proxy variable. 

716 

717 :rtype: bool 

718 """ 

719 if string_network.count("/") == 1: 

720 try: 

721 mask = int(string_network.split("/")[1]) 

722 except ValueError: 

723 return False 

724 

725 if mask < 1 or mask > 32: 

726 return False 

727 

728 try: 

729 socket.inet_aton(string_network.split("/")[0]) 

730 except OSError: 

731 return False 

732 else: 

733 return False 

734 return True 

735 

736 

737@contextlib.contextmanager 

738def set_environ(env_name, value): 

739 """Set the environment variable 'env_name' to 'value' 

740 

741 Save previous value, yield, and then restore the previous value stored in 

742 the environment variable 'env_name'. 

743 

744 If 'value' is None, do nothing""" 

745 value_changed = value is not None 

746 if value_changed: 

747 old_value = os.environ.get(env_name) 

748 os.environ[env_name] = value 

749 try: 

750 yield 

751 finally: 

752 if value_changed: 

753 if old_value is None: 

754 del os.environ[env_name] 

755 else: 

756 os.environ[env_name] = old_value 

757 

758 

759def should_bypass_proxies(url, no_proxy): 

760 """ 

761 Returns whether we should bypass proxies or not. 

762 

763 :rtype: bool 

764 """ 

765 # Prioritize lowercase environment variables over uppercase 

766 # to keep a consistent behaviour with other http projects (curl, wget). 

767 def get_proxy(key): 

768 return os.environ.get(key) or os.environ.get(key.upper()) 

769 

770 # First check whether no_proxy is defined. If it is, check that the URL 

771 # we're getting isn't in the no_proxy list. 

772 no_proxy_arg = no_proxy 

773 if no_proxy is None: 

774 no_proxy = get_proxy("no_proxy") 

775 parsed = urlparse(url) 

776 

777 if parsed.hostname is None: 

778 # URLs don't always have hostnames, e.g. file:/// urls. 

779 return True 

780 

781 if no_proxy: 

782 # We need to check whether we match here. We need to see if we match 

783 # the end of the hostname, both with and without the port. 

784 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) 

785 

786 if is_ipv4_address(parsed.hostname): 

787 for proxy_ip in no_proxy: 

788 if is_valid_cidr(proxy_ip): 

789 if address_in_network(parsed.hostname, proxy_ip): 

790 return True 

791 elif parsed.hostname == proxy_ip: 

792 # If no_proxy ip was defined in plain IP notation instead of cidr notation & 

793 # matches the IP of the index 

794 return True 

795 else: 

796 host_with_port = parsed.hostname 

797 if parsed.port: 

798 host_with_port += f":{parsed.port}" 

799 

800 for host in no_proxy: 

801 if parsed.hostname.endswith(host) or host_with_port.endswith(host): 

802 # The URL does match something in no_proxy, so we don't want 

803 # to apply the proxies on this URL. 

804 return True 

805 

806 with set_environ("no_proxy", no_proxy_arg): 

807 # parsed.hostname can be `None` in cases such as a file URI. 

808 try: 

809 bypass = proxy_bypass(parsed.hostname) 

810 except (TypeError, socket.gaierror): 

811 bypass = False 

812 

813 if bypass: 

814 return True 

815 

816 return False 

817 

818 

819def get_environ_proxies(url, no_proxy=None): 

820 """ 

821 Return a dict of environment proxies. 

822 

823 :rtype: dict 

824 """ 

825 if should_bypass_proxies(url, no_proxy=no_proxy): 

826 return {} 

827 else: 

828 return getproxies() 

829 

830 

831def select_proxy(url, proxies): 

832 """Select a proxy for the url, if applicable. 

833 

834 :param url: The url being for the request 

835 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

836 """ 

837 proxies = proxies or {} 

838 urlparts = urlparse(url) 

839 if urlparts.hostname is None: 

840 return proxies.get(urlparts.scheme, proxies.get("all")) 

841 

842 proxy_keys = [ 

843 urlparts.scheme + "://" + urlparts.hostname, 

844 urlparts.scheme, 

845 "all://" + urlparts.hostname, 

846 "all", 

847 ] 

848 proxy = None 

849 for proxy_key in proxy_keys: 

850 if proxy_key in proxies: 

851 proxy = proxies[proxy_key] 

852 break 

853 

854 return proxy 

855 

856 

857def resolve_proxies(request, proxies, trust_env=True): 

858 """This method takes proxy information from a request and configuration 

859 input to resolve a mapping of target proxies. This will consider settings 

860 such a NO_PROXY to strip proxy configurations. 

861 

862 :param request: Request or PreparedRequest 

863 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

864 :param trust_env: Boolean declaring whether to trust environment configs 

865 

866 :rtype: dict 

867 """ 

868 proxies = proxies if proxies is not None else {} 

869 url = request.url 

870 scheme = urlparse(url).scheme 

871 no_proxy = proxies.get("no_proxy") 

872 new_proxies = proxies.copy() 

873 

874 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy): 

875 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy) 

876 

877 proxy = environ_proxies.get(scheme, environ_proxies.get("all")) 

878 

879 if proxy: 

880 new_proxies.setdefault(scheme, proxy) 

881 return new_proxies 

882 

883 

884def default_user_agent(name="python-requests"): 

885 """ 

886 Return a string representing the default user agent. 

887 

888 :rtype: str 

889 """ 

890 return f"{name}/{__version__}" 

891 

892 

893def default_headers(): 

894 """ 

895 :rtype: requests.structures.CaseInsensitiveDict 

896 """ 

897 return CaseInsensitiveDict( 

898 { 

899 "User-Agent": default_user_agent(), 

900 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING, 

901 "Accept": "*/*", 

902 "Connection": "keep-alive", 

903 } 

904 ) 

905 

906 

907def parse_header_links(value): 

908 """Return a list of parsed link headers proxies. 

909 

910 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" 

911 

912 :rtype: list 

913 """ 

914 

915 links = [] 

916 

917 replace_chars = " '\"" 

918 

919 value = value.strip(replace_chars) 

920 if not value: 

921 return links 

922 

923 for val in re.split(", *<", value): 

924 try: 

925 url, params = val.split(";", 1) 

926 except ValueError: 

927 url, params = val, "" 

928 

929 link = {"url": url.strip("<> '\"")} 

930 

931 for param in params.split(";"): 

932 try: 

933 key, value = param.split("=") 

934 except ValueError: 

935 break 

936 

937 link[key.strip(replace_chars)] = value.strip(replace_chars) 

938 

939 links.append(link) 

940 

941 return links 

942 

943 

944# Null bytes; no need to recreate these on each call to guess_json_utf 

945_null = "\x00".encode("ascii") # encoding to ASCII for Python 3 

946_null2 = _null * 2 

947_null3 = _null * 3 

948 

949 

950def guess_json_utf(data): 

951 """ 

952 :rtype: str 

953 """ 

954 # JSON always starts with two ASCII characters, so detection is as 

955 # easy as counting the nulls and from their location and count 

956 # determine the encoding. Also detect a BOM, if present. 

957 sample = data[:4] 

958 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): 

959 return "utf-32" # BOM included 

960 if sample[:3] == codecs.BOM_UTF8: 

961 return "utf-8-sig" # BOM included, MS style (discouraged) 

962 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): 

963 return "utf-16" # BOM included 

964 nullcount = sample.count(_null) 

965 if nullcount == 0: 

966 return "utf-8" 

967 if nullcount == 2: 

968 if sample[::2] == _null2: # 1st and 3rd are null 

969 return "utf-16-be" 

970 if sample[1::2] == _null2: # 2nd and 4th are null 

971 return "utf-16-le" 

972 # Did not detect 2 valid UTF-16 ascii-range characters 

973 if nullcount == 3: 

974 if sample[:3] == _null3: 

975 return "utf-32-be" 

976 if sample[1:] == _null3: 

977 return "utf-32-le" 

978 # Did not detect a valid UTF-32 ascii-range character 

979 return None 

980 

981 

982def prepend_scheme_if_needed(url, new_scheme): 

983 """Given a URL that may or may not have a scheme, prepend the given scheme. 

984 Does not replace a present scheme with the one provided as an argument. 

985 

986 :rtype: str 

987 """ 

988 parsed = parse_url(url) 

989 scheme, auth, host, port, path, query, fragment = parsed 

990 

991 # A defect in urlparse determines that there isn't a netloc present in some 

992 # urls. We previously assumed parsing was overly cautious, and swapped the 

993 # netloc and path. Due to a lack of tests on the original defect, this is 

994 # maintained with parse_url for backwards compatibility. 

995 netloc = parsed.netloc 

996 if not netloc: 

997 netloc, path = path, netloc 

998 

999 if auth: 

1000 # parse_url doesn't provide the netloc with auth 

1001 # so we'll add it ourselves. 

1002 netloc = "@".join([auth, netloc]) 

1003 if scheme is None: 

1004 scheme = new_scheme 

1005 if path is None: 

1006 path = "" 

1007 

1008 return urlunparse((scheme, netloc, path, "", query, fragment)) 

1009 

1010 

1011def get_auth_from_url(url): 

1012 """Given a url with authentication components, extract them into a tuple of 

1013 username,password. 

1014 

1015 :rtype: (str,str) 

1016 """ 

1017 parsed = urlparse(url) 

1018 

1019 try: 

1020 auth = (unquote(parsed.username), unquote(parsed.password)) 

1021 except (AttributeError, TypeError): 

1022 auth = ("", "") 

1023 

1024 return auth 

1025 

1026 

1027def check_header_validity(header): 

1028 """Verifies that header parts don't contain leading whitespace 

1029 reserved characters, or return characters. 

1030 

1031 :param header: tuple, in the format (name, value). 

1032 """ 

1033 name, value = header 

1034 

1035 for part in header: 

1036 if type(part) not in HEADER_VALIDATORS: 

1037 raise InvalidHeader( 

1038 f"Header part ({part!r}) from {{{name!r}: {value!r}}} must be " 

1039 f"of type str or bytes, not {type(part)}" 

1040 ) 

1041 

1042 _validate_header_part(name, "name", HEADER_VALIDATORS[type(name)][0]) 

1043 _validate_header_part(value, "value", HEADER_VALIDATORS[type(value)][1]) 

1044 

1045 

1046def _validate_header_part(header_part, header_kind, validator): 

1047 if not validator.match(header_part): 

1048 raise InvalidHeader( 

1049 f"Invalid leading whitespace, reserved character(s), or return" 

1050 f"character(s) in header {header_kind}: {header_part!r}" 

1051 ) 

1052 

1053 

1054def urldefragauth(url): 

1055 """ 

1056 Given a url remove the fragment and the authentication part. 

1057 

1058 :rtype: str 

1059 """ 

1060 scheme, netloc, path, params, query, fragment = urlparse(url) 

1061 

1062 # see func:`prepend_scheme_if_needed` 

1063 if not netloc: 

1064 netloc, path = path, netloc 

1065 

1066 netloc = netloc.rsplit("@", 1)[-1] 

1067 

1068 return urlunparse((scheme, netloc, path, params, query, "")) 

1069 

1070 

1071def rewind_body(prepared_request): 

1072 """Move file pointer back to its recorded starting position 

1073 so it can be read again on redirect. 

1074 """ 

1075 body_seek = getattr(prepared_request.body, "seek", None) 

1076 if body_seek is not None and isinstance( 

1077 prepared_request._body_position, integer_types 

1078 ): 

1079 try: 

1080 body_seek(prepared_request._body_position) 

1081 except OSError: 

1082 raise UnrewindableBodyError( 

1083 "An error occurred when rewinding request body for redirect." 

1084 ) 

1085 else: 

1086 raise UnrewindableBodyError("Unable to rewind request body for redirect.")