Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/http/request.py: 51%

367 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import cgi 

2import codecs 

3import copy 

4from io import BytesIO 

5from itertools import chain 

6from urllib.parse import parse_qsl, quote, urlencode, urljoin, urlsplit 

7 

8from django.conf import settings 

9from django.core import signing 

10from django.core.exceptions import ( 

11 DisallowedHost, 

12 ImproperlyConfigured, 

13 RequestDataTooBig, 

14 TooManyFieldsSent, 

15) 

16from django.core.files import uploadhandler 

17from django.http.multipartparser import MultiPartParser, MultiPartParserError 

18from django.utils.datastructures import ( 

19 CaseInsensitiveMapping, 

20 ImmutableList, 

21 MultiValueDict, 

22) 

23from django.utils.encoding import escape_uri_path, iri_to_uri 

24from django.utils.functional import cached_property 

25from django.utils.http import is_same_domain 

26from django.utils.regex_helper import _lazy_re_compile 

27 

28from .multipartparser import parse_header 

29 

30RAISE_ERROR = object() 

31host_validation_re = _lazy_re_compile( 

32 r"^([a-z0-9.-]+|\[[a-f0-9]*:[a-f0-9\.:]+\])(:\d+)?$" 

33) 

34 

35 

36class UnreadablePostError(OSError): 

37 pass 

38 

39 

40class RawPostDataException(Exception): 

41 """ 

42 You cannot access raw_post_data from a request that has 

43 multipart/* POST data if it has been accessed via POST, 

44 FILES, etc.. 

45 """ 

46 

47 pass 

48 

49 

50class HttpRequest: 

51 """A basic HTTP request.""" 

52 

53 # The encoding used in GET/POST dicts. None means use default setting. 

54 _encoding = None 

55 _upload_handlers = [] 

56 

57 def __init__(self): 

58 # WARNING: The `WSGIRequest` subclass doesn't call `super`. 

59 # Any variable assignment made here should also happen in 

60 # `WSGIRequest.__init__()`. 

61 

62 self.GET = QueryDict(mutable=True) 

63 self.POST = QueryDict(mutable=True) 

64 self.COOKIES = {} 

65 self.META = {} 

66 self.FILES = MultiValueDict() 

67 

68 self.path = "" 

69 self.path_info = "" 

70 self.method = None 

71 self.resolver_match = None 

72 self.content_type = None 

73 self.content_params = None 

74 

75 def __repr__(self): 

76 if self.method is None or not self.get_full_path(): 

77 return "<%s>" % self.__class__.__name__ 

78 return "<%s: %s %r>" % ( 

79 self.__class__.__name__, 

80 self.method, 

81 self.get_full_path(), 

82 ) 

83 

84 @cached_property 

85 def headers(self): 

86 return HttpHeaders(self.META) 

87 

88 @cached_property 

89 def accepted_types(self): 

90 """Return a list of MediaType instances.""" 

91 return parse_accept_header(self.headers.get("Accept", "*/*")) 

92 

93 def accepts(self, media_type): 

94 return any( 

95 accepted_type.match(media_type) for accepted_type in self.accepted_types 

96 ) 

97 

98 def _set_content_type_params(self, meta): 

99 """Set content_type, content_params, and encoding.""" 

100 self.content_type, self.content_params = cgi.parse_header( 

101 meta.get("CONTENT_TYPE", "") 

102 ) 

103 if "charset" in self.content_params: 

104 try: 

105 codecs.lookup(self.content_params["charset"]) 

106 except LookupError: 

107 pass 

108 else: 

109 self.encoding = self.content_params["charset"] 

110 

111 def _get_raw_host(self): 

112 """ 

113 Return the HTTP host using the environment or request headers. Skip 

114 allowed hosts protection, so may return an insecure host. 

115 """ 

116 # We try three options, in order of decreasing preference. 

117 if settings.USE_X_FORWARDED_HOST and ("HTTP_X_FORWARDED_HOST" in self.META): 117 ↛ 118line 117 didn't jump to line 118, because the condition on line 117 was never true

118 host = self.META["HTTP_X_FORWARDED_HOST"] 

119 elif "HTTP_HOST" in self.META: 119 ↛ 120line 119 didn't jump to line 120, because the condition on line 119 was never true

120 host = self.META["HTTP_HOST"] 

121 else: 

122 # Reconstruct the host using the algorithm from PEP 333. 

123 host = self.META["SERVER_NAME"] 

124 server_port = self.get_port() 

125 if server_port != ("443" if self.is_secure() else "80"): 125 ↛ 126line 125 didn't jump to line 126, because the condition on line 125 was never true

126 host = "%s:%s" % (host, server_port) 

127 return host 

128 

129 def get_host(self): 

130 """Return the HTTP host using the environment or request headers.""" 

131 host = self._get_raw_host() 

132 

133 # Allow variants of localhost if ALLOWED_HOSTS is empty and DEBUG=True. 

134 allowed_hosts = settings.ALLOWED_HOSTS 

135 if settings.DEBUG and not allowed_hosts: 135 ↛ 136line 135 didn't jump to line 136, because the condition on line 135 was never true

136 allowed_hosts = [".localhost", "127.0.0.1", "[::1]"] 

137 

138 domain, port = split_domain_port(host) 

139 if domain and validate_host(domain, allowed_hosts): 139 ↛ 142line 139 didn't jump to line 142, because the condition on line 139 was never false

140 return host 

141 else: 

142 msg = "Invalid HTTP_HOST header: %r." % host 

143 if domain: 

144 msg += " You may need to add %r to ALLOWED_HOSTS." % domain 

145 else: 

146 msg += ( 

147 " The domain name provided is not valid according to RFC 1034/1035." 

148 ) 

149 raise DisallowedHost(msg) 

150 

151 def get_port(self): 

152 """Return the port number for the request as a string.""" 

153 if settings.USE_X_FORWARDED_PORT and "HTTP_X_FORWARDED_PORT" in self.META: 153 ↛ 154line 153 didn't jump to line 154, because the condition on line 153 was never true

154 port = self.META["HTTP_X_FORWARDED_PORT"] 

155 else: 

156 port = self.META["SERVER_PORT"] 

157 return str(port) 

158 

159 def get_full_path(self, force_append_slash=False): 

160 return self._get_full_path(self.path, force_append_slash) 

161 

162 def get_full_path_info(self, force_append_slash=False): 

163 return self._get_full_path(self.path_info, force_append_slash) 

164 

165 def _get_full_path(self, path, force_append_slash): 

166 # RFC 3986 requires query string arguments to be in the ASCII range. 

167 # Rather than crash if this doesn't happen, we encode defensively. 

168 return "%s%s%s" % ( 

169 escape_uri_path(path), 

170 "/" if force_append_slash and not path.endswith("/") else "", 

171 ("?" + iri_to_uri(self.META.get("QUERY_STRING", ""))) 

172 if self.META.get("QUERY_STRING", "") 

173 else "", 

174 ) 

175 

176 def get_signed_cookie(self, key, default=RAISE_ERROR, salt="", max_age=None): 

177 """ 

178 Attempt to return a signed cookie. If the signature fails or the 

179 cookie has expired, raise an exception, unless the `default` argument 

180 is provided, in which case return that value. 

181 """ 

182 try: 

183 cookie_value = self.COOKIES[key] 

184 except KeyError: 

185 if default is not RAISE_ERROR: 

186 return default 

187 else: 

188 raise 

189 try: 

190 value = signing.get_cookie_signer(salt=key + salt).unsign( 

191 cookie_value, max_age=max_age 

192 ) 

193 except signing.BadSignature: 

194 if default is not RAISE_ERROR: 

195 return default 

196 else: 

197 raise 

198 return value 

199 

200 def build_absolute_uri(self, location=None): 

201 """ 

202 Build an absolute URI from the location and the variables available in 

203 this request. If no ``location`` is specified, build the absolute URI 

204 using request.get_full_path(). If the location is absolute, convert it 

205 to an RFC 3987 compliant URI and return it. If location is relative or 

206 is scheme-relative (i.e., ``//example.com/``), urljoin() it to a base 

207 URL constructed from the request variables. 

208 """ 

209 if location is None: 209 ↛ 212line 209 didn't jump to line 212, because the condition on line 209 was never true

210 # Make it an absolute url (but schemeless and domainless) for the 

211 # edge case that the path starts with '//'. 

212 location = "//%s" % self.get_full_path() 

213 else: 

214 # Coerce lazy locations. 

215 location = str(location) 

216 bits = urlsplit(location) 

217 if not (bits.scheme and bits.netloc): 217 ↛ 239line 217 didn't jump to line 239, because the condition on line 217 was never false

218 # Handle the simple, most common case. If the location is absolute 

219 # and a scheme or host (netloc) isn't provided, skip an expensive 

220 # urljoin() as long as no path segments are '.' or '..'. 

221 if ( 221 ↛ 238line 221 didn't jump to line 238

222 bits.path.startswith("/") 

223 and not bits.scheme 

224 and not bits.netloc 

225 and "/./" not in bits.path 

226 and "/../" not in bits.path 

227 ): 

228 # If location starts with '//' but has no netloc, reuse the 

229 # schema and netloc from the current request. Strip the double 

230 # slashes and continue as if it wasn't specified. 

231 if location.startswith("//"): 231 ↛ 232line 231 didn't jump to line 232, because the condition on line 231 was never true

232 location = location[2:] 

233 location = self._current_scheme_host + location 

234 else: 

235 # Join the constructed URL with the provided location, which 

236 # allows the provided location to apply query strings to the 

237 # base path. 

238 location = urljoin(self._current_scheme_host + self.path, location) 

239 return iri_to_uri(location) 

240 

241 @cached_property 

242 def _current_scheme_host(self): 

243 return "{}://{}".format(self.scheme, self.get_host()) 

244 

245 def _get_scheme(self): 

246 """ 

247 Hook for subclasses like WSGIRequest to implement. Return 'http' by 

248 default. 

249 """ 

250 return "http" 

251 

252 @property 

253 def scheme(self): 

254 if settings.SECURE_PROXY_SSL_HEADER: 254 ↛ 255line 254 didn't jump to line 255, because the condition on line 254 was never true

255 try: 

256 header, secure_value = settings.SECURE_PROXY_SSL_HEADER 

257 except ValueError: 

258 raise ImproperlyConfigured( 

259 "The SECURE_PROXY_SSL_HEADER setting must be a tuple containing " 

260 "two values." 

261 ) 

262 header_value = self.META.get(header) 

263 if header_value is not None: 

264 return "https" if header_value == secure_value else "http" 

265 return self._get_scheme() 

266 

267 def is_secure(self): 

268 return self.scheme == "https" 

269 

270 @property 

271 def encoding(self): 

272 return self._encoding 

273 

274 @encoding.setter 

275 def encoding(self, val): 

276 """ 

277 Set the encoding used for GET/POST accesses. If the GET or POST 

278 dictionary has already been created, remove and recreate it on the 

279 next access (so that it is decoded correctly). 

280 """ 

281 self._encoding = val 

282 if hasattr(self, "GET"): 282 ↛ 284line 282 didn't jump to line 284, because the condition on line 282 was never false

283 del self.GET 

284 if hasattr(self, "_post"): 284 ↛ 285line 284 didn't jump to line 285, because the condition on line 284 was never true

285 del self._post 

286 

287 def _initialize_handlers(self): 

288 self._upload_handlers = [ 

289 uploadhandler.load_handler(handler, self) 

290 for handler in settings.FILE_UPLOAD_HANDLERS 

291 ] 

292 

293 @property 

294 def upload_handlers(self): 

295 if not self._upload_handlers: 295 ↛ 298line 295 didn't jump to line 298, because the condition on line 295 was never false

296 # If there are no upload handlers defined, initialize them from settings. 

297 self._initialize_handlers() 

298 return self._upload_handlers 

299 

300 @upload_handlers.setter 

301 def upload_handlers(self, upload_handlers): 

302 if hasattr(self, "_files"): 

303 raise AttributeError( 

304 "You cannot set the upload handlers after the upload has been " 

305 "processed." 

306 ) 

307 self._upload_handlers = upload_handlers 

308 

309 def parse_file_upload(self, META, post_data): 

310 """Return a tuple of (POST QueryDict, FILES MultiValueDict).""" 

311 self.upload_handlers = ImmutableList( 

312 self.upload_handlers, 

313 warning=( 

314 "You cannot alter upload handlers after the upload has been " 

315 "processed." 

316 ), 

317 ) 

318 parser = MultiPartParser(META, post_data, self.upload_handlers, self.encoding) 

319 return parser.parse() 

320 

321 @property 

322 def body(self): 

323 if not hasattr(self, "_body"): 

324 if self._read_started: 

325 raise RawPostDataException( 

326 "You cannot access body after reading from request's data stream" 

327 ) 

328 

329 # Limit the maximum request data size that will be handled in-memory. 

330 if ( 

331 settings.DATA_UPLOAD_MAX_MEMORY_SIZE is not None 

332 and int(self.META.get("CONTENT_LENGTH") or 0) 

333 > settings.DATA_UPLOAD_MAX_MEMORY_SIZE 

334 ): 

335 raise RequestDataTooBig( 

336 "Request body exceeded settings.DATA_UPLOAD_MAX_MEMORY_SIZE." 

337 ) 

338 

339 try: 

340 self._body = self.read() 

341 except OSError as e: 

342 raise UnreadablePostError(*e.args) from e 

343 self._stream = BytesIO(self._body) 

344 return self._body 

345 

346 def _mark_post_parse_error(self): 

347 self._post = QueryDict() 

348 self._files = MultiValueDict() 

349 

350 def _load_post_and_files(self): 

351 """Populate self._post and self._files if the content-type is a form type""" 

352 if self.method != "POST": 

353 self._post, self._files = ( 

354 QueryDict(encoding=self._encoding), 

355 MultiValueDict(), 

356 ) 

357 return 

358 if self._read_started and not hasattr(self, "_body"): 

359 self._mark_post_parse_error() 

360 return 

361 

362 if self.content_type == "multipart/form-data": 

363 if hasattr(self, "_body"): 

364 # Use already read data 

365 data = BytesIO(self._body) 

366 else: 

367 data = self 

368 try: 

369 self._post, self._files = self.parse_file_upload(self.META, data) 

370 except MultiPartParserError: 

371 # An error occurred while parsing POST data. Since when 

372 # formatting the error the request handler might access 

373 # self.POST, set self._post and self._file to prevent 

374 # attempts to parse POST data again. 

375 self._mark_post_parse_error() 

376 raise 

377 elif self.content_type == "application/x-www-form-urlencoded": 

378 self._post, self._files = ( 

379 QueryDict(self.body, encoding=self._encoding), 

380 MultiValueDict(), 

381 ) 

382 else: 

383 self._post, self._files = ( 

384 QueryDict(encoding=self._encoding), 

385 MultiValueDict(), 

386 ) 

387 

388 def close(self): 

389 if hasattr(self, "_files"): 

390 for f in chain.from_iterable(list_[1] for list_ in self._files.lists()): 

391 f.close() 

392 

393 # File-like and iterator interface. 

394 # 

395 # Expects self._stream to be set to an appropriate source of bytes by 

396 # a corresponding request subclass (e.g. WSGIRequest). 

397 # Also when request data has already been read by request.POST or 

398 # request.body, self._stream points to a BytesIO instance 

399 # containing that data. 

400 

401 def read(self, *args, **kwargs): 

402 self._read_started = True 

403 try: 

404 return self._stream.read(*args, **kwargs) 

405 except OSError as e: 

406 raise UnreadablePostError(*e.args) from e 

407 

408 def readline(self, *args, **kwargs): 

409 self._read_started = True 

410 try: 

411 return self._stream.readline(*args, **kwargs) 

412 except OSError as e: 

413 raise UnreadablePostError(*e.args) from e 

414 

415 def __iter__(self): 

416 return iter(self.readline, b"") 

417 

418 def readlines(self): 

419 return list(self) 

420 

421 

422class HttpHeaders(CaseInsensitiveMapping): 

423 HTTP_PREFIX = "HTTP_" 

424 # PEP 333 gives two headers which aren't prepended with HTTP_. 

425 UNPREFIXED_HEADERS = {"CONTENT_TYPE", "CONTENT_LENGTH"} 

426 

427 def __init__(self, environ): 

428 headers = {} 

429 for header, value in environ.items(): 

430 name = self.parse_header_name(header) 

431 if name: 

432 headers[name] = value 

433 super().__init__(headers) 

434 

435 def __getitem__(self, key): 

436 """Allow header lookup using underscores in place of hyphens.""" 

437 return super().__getitem__(key.replace("_", "-")) 

438 

439 @classmethod 

440 def parse_header_name(cls, header): 

441 if header.startswith(cls.HTTP_PREFIX): 

442 header = header[len(cls.HTTP_PREFIX) :] 

443 elif header not in cls.UNPREFIXED_HEADERS: 

444 return None 

445 return header.replace("_", "-").title() 

446 

447 

448class QueryDict(MultiValueDict): 

449 """ 

450 A specialized MultiValueDict which represents a query string. 

451 

452 A QueryDict can be used to represent GET or POST data. It subclasses 

453 MultiValueDict since keys in such data can be repeated, for instance 

454 in the data from a form with a <select multiple> field. 

455 

456 By default QueryDicts are immutable, though the copy() method 

457 will always return a mutable copy. 

458 

459 Both keys and values set on this class are converted from the given encoding 

460 (DEFAULT_CHARSET by default) to str. 

461 """ 

462 

463 # These are both reset in __init__, but is specified here at the class 

464 # level so that unpickling will have valid values 

465 _mutable = True 

466 _encoding = None 

467 

468 def __init__(self, query_string=None, mutable=False, encoding=None): 

469 super().__init__() 

470 self.encoding = encoding or settings.DEFAULT_CHARSET 

471 query_string = query_string or "" 

472 parse_qsl_kwargs = { 

473 "keep_blank_values": True, 

474 "encoding": self.encoding, 

475 "max_num_fields": settings.DATA_UPLOAD_MAX_NUMBER_FIELDS, 

476 } 

477 if isinstance(query_string, bytes): 

478 # query_string normally contains URL-encoded data, a subset of ASCII. 

479 try: 

480 query_string = query_string.decode(self.encoding) 

481 except UnicodeDecodeError: 

482 # ... but some user agents are misbehaving :-( 

483 query_string = query_string.decode("iso-8859-1") 

484 try: 

485 for key, value in parse_qsl(query_string, **parse_qsl_kwargs): 

486 self.appendlist(key, value) 

487 except ValueError as e: 

488 # ValueError can also be raised if the strict_parsing argument to 

489 # parse_qsl() is True. As that is not used by Django, assume that 

490 # the exception was raised by exceeding the value of max_num_fields 

491 # instead of fragile checks of exception message strings. 

492 raise TooManyFieldsSent( 

493 "The number of GET/POST parameters exceeded " 

494 "settings.DATA_UPLOAD_MAX_NUMBER_FIELDS." 

495 ) from e 

496 self._mutable = mutable 

497 

498 @classmethod 

499 def fromkeys(cls, iterable, value="", mutable=False, encoding=None): 

500 """ 

501 Return a new QueryDict with keys (may be repeated) from an iterable and 

502 values from value. 

503 """ 

504 q = cls("", mutable=True, encoding=encoding) 

505 for key in iterable: 

506 q.appendlist(key, value) 

507 if not mutable: 

508 q._mutable = False 

509 return q 

510 

511 @property 

512 def encoding(self): 

513 if self._encoding is None: 513 ↛ 514line 513 didn't jump to line 514, because the condition on line 513 was never true

514 self._encoding = settings.DEFAULT_CHARSET 

515 return self._encoding 

516 

517 @encoding.setter 

518 def encoding(self, value): 

519 self._encoding = value 

520 

521 def _assert_mutable(self): 

522 if not self._mutable: 522 ↛ 523line 522 didn't jump to line 523, because the condition on line 522 was never true

523 raise AttributeError("This QueryDict instance is immutable") 

524 

525 def __setitem__(self, key, value): 

526 self._assert_mutable() 

527 key = bytes_to_text(key, self.encoding) 

528 value = bytes_to_text(value, self.encoding) 

529 super().__setitem__(key, value) 

530 

531 def __delitem__(self, key): 

532 self._assert_mutable() 

533 super().__delitem__(key) 

534 

535 def __copy__(self): 

536 result = self.__class__("", mutable=True, encoding=self.encoding) 

537 for key, value in self.lists(): 

538 result.setlist(key, value) 

539 return result 

540 

541 def __deepcopy__(self, memo): 

542 result = self.__class__("", mutable=True, encoding=self.encoding) 

543 memo[id(self)] = result 

544 for key, value in self.lists(): 544 ↛ 545line 544 didn't jump to line 545, because the loop on line 544 never started

545 result.setlist(copy.deepcopy(key, memo), copy.deepcopy(value, memo)) 

546 return result 

547 

548 def setlist(self, key, list_): 

549 self._assert_mutable() 

550 key = bytes_to_text(key, self.encoding) 

551 list_ = [bytes_to_text(elt, self.encoding) for elt in list_] 

552 super().setlist(key, list_) 

553 

554 def setlistdefault(self, key, default_list=None): 

555 self._assert_mutable() 

556 return super().setlistdefault(key, default_list) 

557 

558 def appendlist(self, key, value): 

559 self._assert_mutable() 

560 key = bytes_to_text(key, self.encoding) 

561 value = bytes_to_text(value, self.encoding) 

562 super().appendlist(key, value) 

563 

564 def pop(self, key, *args): 

565 self._assert_mutable() 

566 return super().pop(key, *args) 

567 

568 def popitem(self): 

569 self._assert_mutable() 

570 return super().popitem() 

571 

572 def clear(self): 

573 self._assert_mutable() 

574 super().clear() 

575 

576 def setdefault(self, key, default=None): 

577 self._assert_mutable() 

578 key = bytes_to_text(key, self.encoding) 

579 default = bytes_to_text(default, self.encoding) 

580 return super().setdefault(key, default) 

581 

582 def copy(self): 

583 """Return a mutable copy of this object.""" 

584 return self.__deepcopy__({}) 

585 

586 def urlencode(self, safe=None): 

587 """ 

588 Return an encoded string of all query string arguments. 

589 

590 `safe` specifies characters which don't require quoting, for example:: 

591 

592 >>> q = QueryDict(mutable=True) 

593 >>> q['next'] = '/a&b/' 

594 >>> q.urlencode() 

595 'next=%2Fa%26b%2F' 

596 >>> q.urlencode(safe='/') 

597 'next=/a%26b/' 

598 """ 

599 output = [] 

600 if safe: 

601 safe = safe.encode(self.encoding) 

602 

603 def encode(k, v): 

604 return "%s=%s" % ((quote(k, safe), quote(v, safe))) 

605 

606 else: 

607 

608 def encode(k, v): 

609 return urlencode({k: v}) 

610 

611 for k, list_ in self.lists(): 

612 output.extend( 

613 encode(k.encode(self.encoding), str(v).encode(self.encoding)) 

614 for v in list_ 

615 ) 

616 return "&".join(output) 

617 

618 

619class MediaType: 

620 def __init__(self, media_type_raw_line): 

621 full_type, self.params = parse_header( 

622 media_type_raw_line.encode("ascii") if media_type_raw_line else b"" 

623 ) 

624 self.main_type, _, self.sub_type = full_type.partition("/") 

625 

626 def __str__(self): 

627 params_str = "".join( 

628 "; %s=%s" % (k, v.decode("ascii")) for k, v in self.params.items() 

629 ) 

630 return "%s%s%s" % ( 

631 self.main_type, 

632 ("/%s" % self.sub_type) if self.sub_type else "", 

633 params_str, 

634 ) 

635 

636 def __repr__(self): 

637 return "<%s: %s>" % (self.__class__.__qualname__, self) 

638 

639 @property 

640 def is_all_types(self): 

641 return self.main_type == "*" and self.sub_type == "*" 

642 

643 def match(self, other): 

644 if self.is_all_types: 

645 return True 

646 other = MediaType(other) 

647 if self.main_type == other.main_type and self.sub_type in {"*", other.sub_type}: 

648 return True 

649 return False 

650 

651 

652# It's neither necessary nor appropriate to use 

653# django.utils.encoding.force_str() for parsing URLs and form inputs. Thus, 

654# this slightly more restricted function, used by QueryDict. 

655def bytes_to_text(s, encoding): 

656 """ 

657 Convert bytes objects to strings, using the given encoding. Illegally 

658 encoded input characters are replaced with Unicode "unknown" codepoint 

659 (\ufffd). 

660 

661 Return any non-bytes objects without change. 

662 """ 

663 if isinstance(s, bytes): 663 ↛ 664line 663 didn't jump to line 664, because the condition on line 663 was never true

664 return str(s, encoding, "replace") 

665 else: 

666 return s 

667 

668 

669def split_domain_port(host): 

670 """ 

671 Return a (domain, port) tuple from a given host. 

672 

673 Returned domain is lowercased. If the host is invalid, the domain will be 

674 empty. 

675 """ 

676 host = host.lower() 

677 

678 if not host_validation_re.match(host): 678 ↛ 679line 678 didn't jump to line 679, because the condition on line 678 was never true

679 return "", "" 

680 

681 if host[-1] == "]": 681 ↛ 683line 681 didn't jump to line 683, because the condition on line 681 was never true

682 # It's an IPv6 address without a port. 

683 return host, "" 

684 bits = host.rsplit(":", 1) 

685 domain, port = bits if len(bits) == 2 else (bits[0], "") 

686 # Remove a trailing dot (if present) from the domain. 

687 domain = domain[:-1] if domain.endswith(".") else domain 

688 return domain, port 

689 

690 

691def validate_host(host, allowed_hosts): 

692 """ 

693 Validate the given host for this site. 

694 

695 Check that the host looks valid and matches a host or host pattern in the 

696 given list of ``allowed_hosts``. Any pattern beginning with a period 

697 matches a domain and all its subdomains (e.g. ``.example.com`` matches 

698 ``example.com`` and any subdomain), ``*`` matches anything, and anything 

699 else must match exactly. 

700 

701 Note: This function assumes that the given host is lowercased and has 

702 already had the port, if any, stripped off. 

703 

704 Return ``True`` for a valid host, ``False`` otherwise. 

705 """ 

706 return any( 706 ↛ exitline 706 didn't finish the generator expression on line 706

707 pattern == "*" or is_same_domain(host, pattern) for pattern in allowed_hosts 

708 ) 

709 

710 

711def parse_accept_header(header): 

712 return [MediaType(token) for token in header.split(",") if token.strip()]