Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/http/request.py: 51%
367 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import cgi
2import codecs
3import copy
4from io import BytesIO
5from itertools import chain
6from urllib.parse import parse_qsl, quote, urlencode, urljoin, urlsplit
8from django.conf import settings
9from django.core import signing
10from django.core.exceptions import (
11 DisallowedHost,
12 ImproperlyConfigured,
13 RequestDataTooBig,
14 TooManyFieldsSent,
15)
16from django.core.files import uploadhandler
17from django.http.multipartparser import MultiPartParser, MultiPartParserError
18from django.utils.datastructures import (
19 CaseInsensitiveMapping,
20 ImmutableList,
21 MultiValueDict,
22)
23from django.utils.encoding import escape_uri_path, iri_to_uri
24from django.utils.functional import cached_property
25from django.utils.http import is_same_domain
26from django.utils.regex_helper import _lazy_re_compile
28from .multipartparser import parse_header
30RAISE_ERROR = object()
31host_validation_re = _lazy_re_compile(
32 r"^([a-z0-9.-]+|\[[a-f0-9]*:[a-f0-9\.:]+\])(:\d+)?$"
33)
36class UnreadablePostError(OSError):
37 pass
40class RawPostDataException(Exception):
41 """
42 You cannot access raw_post_data from a request that has
43 multipart/* POST data if it has been accessed via POST,
44 FILES, etc..
45 """
47 pass
50class HttpRequest:
51 """A basic HTTP request."""
53 # The encoding used in GET/POST dicts. None means use default setting.
54 _encoding = None
55 _upload_handlers = []
57 def __init__(self):
58 # WARNING: The `WSGIRequest` subclass doesn't call `super`.
59 # Any variable assignment made here should also happen in
60 # `WSGIRequest.__init__()`.
62 self.GET = QueryDict(mutable=True)
63 self.POST = QueryDict(mutable=True)
64 self.COOKIES = {}
65 self.META = {}
66 self.FILES = MultiValueDict()
68 self.path = ""
69 self.path_info = ""
70 self.method = None
71 self.resolver_match = None
72 self.content_type = None
73 self.content_params = None
75 def __repr__(self):
76 if self.method is None or not self.get_full_path():
77 return "<%s>" % self.__class__.__name__
78 return "<%s: %s %r>" % (
79 self.__class__.__name__,
80 self.method,
81 self.get_full_path(),
82 )
84 @cached_property
85 def headers(self):
86 return HttpHeaders(self.META)
88 @cached_property
89 def accepted_types(self):
90 """Return a list of MediaType instances."""
91 return parse_accept_header(self.headers.get("Accept", "*/*"))
93 def accepts(self, media_type):
94 return any(
95 accepted_type.match(media_type) for accepted_type in self.accepted_types
96 )
98 def _set_content_type_params(self, meta):
99 """Set content_type, content_params, and encoding."""
100 self.content_type, self.content_params = cgi.parse_header(
101 meta.get("CONTENT_TYPE", "")
102 )
103 if "charset" in self.content_params:
104 try:
105 codecs.lookup(self.content_params["charset"])
106 except LookupError:
107 pass
108 else:
109 self.encoding = self.content_params["charset"]
111 def _get_raw_host(self):
112 """
113 Return the HTTP host using the environment or request headers. Skip
114 allowed hosts protection, so may return an insecure host.
115 """
116 # We try three options, in order of decreasing preference.
117 if settings.USE_X_FORWARDED_HOST and ("HTTP_X_FORWARDED_HOST" in self.META): 117 ↛ 118line 117 didn't jump to line 118, because the condition on line 117 was never true
118 host = self.META["HTTP_X_FORWARDED_HOST"]
119 elif "HTTP_HOST" in self.META: 119 ↛ 120line 119 didn't jump to line 120, because the condition on line 119 was never true
120 host = self.META["HTTP_HOST"]
121 else:
122 # Reconstruct the host using the algorithm from PEP 333.
123 host = self.META["SERVER_NAME"]
124 server_port = self.get_port()
125 if server_port != ("443" if self.is_secure() else "80"): 125 ↛ 126line 125 didn't jump to line 126, because the condition on line 125 was never true
126 host = "%s:%s" % (host, server_port)
127 return host
129 def get_host(self):
130 """Return the HTTP host using the environment or request headers."""
131 host = self._get_raw_host()
133 # Allow variants of localhost if ALLOWED_HOSTS is empty and DEBUG=True.
134 allowed_hosts = settings.ALLOWED_HOSTS
135 if settings.DEBUG and not allowed_hosts: 135 ↛ 136line 135 didn't jump to line 136, because the condition on line 135 was never true
136 allowed_hosts = [".localhost", "127.0.0.1", "[::1]"]
138 domain, port = split_domain_port(host)
139 if domain and validate_host(domain, allowed_hosts): 139 ↛ 142line 139 didn't jump to line 142, because the condition on line 139 was never false
140 return host
141 else:
142 msg = "Invalid HTTP_HOST header: %r." % host
143 if domain:
144 msg += " You may need to add %r to ALLOWED_HOSTS." % domain
145 else:
146 msg += (
147 " The domain name provided is not valid according to RFC 1034/1035."
148 )
149 raise DisallowedHost(msg)
151 def get_port(self):
152 """Return the port number for the request as a string."""
153 if settings.USE_X_FORWARDED_PORT and "HTTP_X_FORWARDED_PORT" in self.META: 153 ↛ 154line 153 didn't jump to line 154, because the condition on line 153 was never true
154 port = self.META["HTTP_X_FORWARDED_PORT"]
155 else:
156 port = self.META["SERVER_PORT"]
157 return str(port)
159 def get_full_path(self, force_append_slash=False):
160 return self._get_full_path(self.path, force_append_slash)
162 def get_full_path_info(self, force_append_slash=False):
163 return self._get_full_path(self.path_info, force_append_slash)
165 def _get_full_path(self, path, force_append_slash):
166 # RFC 3986 requires query string arguments to be in the ASCII range.
167 # Rather than crash if this doesn't happen, we encode defensively.
168 return "%s%s%s" % (
169 escape_uri_path(path),
170 "/" if force_append_slash and not path.endswith("/") else "",
171 ("?" + iri_to_uri(self.META.get("QUERY_STRING", "")))
172 if self.META.get("QUERY_STRING", "")
173 else "",
174 )
176 def get_signed_cookie(self, key, default=RAISE_ERROR, salt="", max_age=None):
177 """
178 Attempt to return a signed cookie. If the signature fails or the
179 cookie has expired, raise an exception, unless the `default` argument
180 is provided, in which case return that value.
181 """
182 try:
183 cookie_value = self.COOKIES[key]
184 except KeyError:
185 if default is not RAISE_ERROR:
186 return default
187 else:
188 raise
189 try:
190 value = signing.get_cookie_signer(salt=key + salt).unsign(
191 cookie_value, max_age=max_age
192 )
193 except signing.BadSignature:
194 if default is not RAISE_ERROR:
195 return default
196 else:
197 raise
198 return value
200 def build_absolute_uri(self, location=None):
201 """
202 Build an absolute URI from the location and the variables available in
203 this request. If no ``location`` is specified, build the absolute URI
204 using request.get_full_path(). If the location is absolute, convert it
205 to an RFC 3987 compliant URI and return it. If location is relative or
206 is scheme-relative (i.e., ``//example.com/``), urljoin() it to a base
207 URL constructed from the request variables.
208 """
209 if location is None: 209 ↛ 212line 209 didn't jump to line 212, because the condition on line 209 was never true
210 # Make it an absolute url (but schemeless and domainless) for the
211 # edge case that the path starts with '//'.
212 location = "//%s" % self.get_full_path()
213 else:
214 # Coerce lazy locations.
215 location = str(location)
216 bits = urlsplit(location)
217 if not (bits.scheme and bits.netloc): 217 ↛ 239line 217 didn't jump to line 239, because the condition on line 217 was never false
218 # Handle the simple, most common case. If the location is absolute
219 # and a scheme or host (netloc) isn't provided, skip an expensive
220 # urljoin() as long as no path segments are '.' or '..'.
221 if ( 221 ↛ 238line 221 didn't jump to line 238
222 bits.path.startswith("/")
223 and not bits.scheme
224 and not bits.netloc
225 and "/./" not in bits.path
226 and "/../" not in bits.path
227 ):
228 # If location starts with '//' but has no netloc, reuse the
229 # schema and netloc from the current request. Strip the double
230 # slashes and continue as if it wasn't specified.
231 if location.startswith("//"): 231 ↛ 232line 231 didn't jump to line 232, because the condition on line 231 was never true
232 location = location[2:]
233 location = self._current_scheme_host + location
234 else:
235 # Join the constructed URL with the provided location, which
236 # allows the provided location to apply query strings to the
237 # base path.
238 location = urljoin(self._current_scheme_host + self.path, location)
239 return iri_to_uri(location)
241 @cached_property
242 def _current_scheme_host(self):
243 return "{}://{}".format(self.scheme, self.get_host())
245 def _get_scheme(self):
246 """
247 Hook for subclasses like WSGIRequest to implement. Return 'http' by
248 default.
249 """
250 return "http"
252 @property
253 def scheme(self):
254 if settings.SECURE_PROXY_SSL_HEADER: 254 ↛ 255line 254 didn't jump to line 255, because the condition on line 254 was never true
255 try:
256 header, secure_value = settings.SECURE_PROXY_SSL_HEADER
257 except ValueError:
258 raise ImproperlyConfigured(
259 "The SECURE_PROXY_SSL_HEADER setting must be a tuple containing "
260 "two values."
261 )
262 header_value = self.META.get(header)
263 if header_value is not None:
264 return "https" if header_value == secure_value else "http"
265 return self._get_scheme()
267 def is_secure(self):
268 return self.scheme == "https"
270 @property
271 def encoding(self):
272 return self._encoding
274 @encoding.setter
275 def encoding(self, val):
276 """
277 Set the encoding used for GET/POST accesses. If the GET or POST
278 dictionary has already been created, remove and recreate it on the
279 next access (so that it is decoded correctly).
280 """
281 self._encoding = val
282 if hasattr(self, "GET"): 282 ↛ 284line 282 didn't jump to line 284, because the condition on line 282 was never false
283 del self.GET
284 if hasattr(self, "_post"): 284 ↛ 285line 284 didn't jump to line 285, because the condition on line 284 was never true
285 del self._post
287 def _initialize_handlers(self):
288 self._upload_handlers = [
289 uploadhandler.load_handler(handler, self)
290 for handler in settings.FILE_UPLOAD_HANDLERS
291 ]
293 @property
294 def upload_handlers(self):
295 if not self._upload_handlers: 295 ↛ 298line 295 didn't jump to line 298, because the condition on line 295 was never false
296 # If there are no upload handlers defined, initialize them from settings.
297 self._initialize_handlers()
298 return self._upload_handlers
300 @upload_handlers.setter
301 def upload_handlers(self, upload_handlers):
302 if hasattr(self, "_files"):
303 raise AttributeError(
304 "You cannot set the upload handlers after the upload has been "
305 "processed."
306 )
307 self._upload_handlers = upload_handlers
309 def parse_file_upload(self, META, post_data):
310 """Return a tuple of (POST QueryDict, FILES MultiValueDict)."""
311 self.upload_handlers = ImmutableList(
312 self.upload_handlers,
313 warning=(
314 "You cannot alter upload handlers after the upload has been "
315 "processed."
316 ),
317 )
318 parser = MultiPartParser(META, post_data, self.upload_handlers, self.encoding)
319 return parser.parse()
321 @property
322 def body(self):
323 if not hasattr(self, "_body"):
324 if self._read_started:
325 raise RawPostDataException(
326 "You cannot access body after reading from request's data stream"
327 )
329 # Limit the maximum request data size that will be handled in-memory.
330 if (
331 settings.DATA_UPLOAD_MAX_MEMORY_SIZE is not None
332 and int(self.META.get("CONTENT_LENGTH") or 0)
333 > settings.DATA_UPLOAD_MAX_MEMORY_SIZE
334 ):
335 raise RequestDataTooBig(
336 "Request body exceeded settings.DATA_UPLOAD_MAX_MEMORY_SIZE."
337 )
339 try:
340 self._body = self.read()
341 except OSError as e:
342 raise UnreadablePostError(*e.args) from e
343 self._stream = BytesIO(self._body)
344 return self._body
346 def _mark_post_parse_error(self):
347 self._post = QueryDict()
348 self._files = MultiValueDict()
350 def _load_post_and_files(self):
351 """Populate self._post and self._files if the content-type is a form type"""
352 if self.method != "POST":
353 self._post, self._files = (
354 QueryDict(encoding=self._encoding),
355 MultiValueDict(),
356 )
357 return
358 if self._read_started and not hasattr(self, "_body"):
359 self._mark_post_parse_error()
360 return
362 if self.content_type == "multipart/form-data":
363 if hasattr(self, "_body"):
364 # Use already read data
365 data = BytesIO(self._body)
366 else:
367 data = self
368 try:
369 self._post, self._files = self.parse_file_upload(self.META, data)
370 except MultiPartParserError:
371 # An error occurred while parsing POST data. Since when
372 # formatting the error the request handler might access
373 # self.POST, set self._post and self._file to prevent
374 # attempts to parse POST data again.
375 self._mark_post_parse_error()
376 raise
377 elif self.content_type == "application/x-www-form-urlencoded":
378 self._post, self._files = (
379 QueryDict(self.body, encoding=self._encoding),
380 MultiValueDict(),
381 )
382 else:
383 self._post, self._files = (
384 QueryDict(encoding=self._encoding),
385 MultiValueDict(),
386 )
388 def close(self):
389 if hasattr(self, "_files"):
390 for f in chain.from_iterable(list_[1] for list_ in self._files.lists()):
391 f.close()
393 # File-like and iterator interface.
394 #
395 # Expects self._stream to be set to an appropriate source of bytes by
396 # a corresponding request subclass (e.g. WSGIRequest).
397 # Also when request data has already been read by request.POST or
398 # request.body, self._stream points to a BytesIO instance
399 # containing that data.
401 def read(self, *args, **kwargs):
402 self._read_started = True
403 try:
404 return self._stream.read(*args, **kwargs)
405 except OSError as e:
406 raise UnreadablePostError(*e.args) from e
408 def readline(self, *args, **kwargs):
409 self._read_started = True
410 try:
411 return self._stream.readline(*args, **kwargs)
412 except OSError as e:
413 raise UnreadablePostError(*e.args) from e
415 def __iter__(self):
416 return iter(self.readline, b"")
418 def readlines(self):
419 return list(self)
422class HttpHeaders(CaseInsensitiveMapping):
423 HTTP_PREFIX = "HTTP_"
424 # PEP 333 gives two headers which aren't prepended with HTTP_.
425 UNPREFIXED_HEADERS = {"CONTENT_TYPE", "CONTENT_LENGTH"}
427 def __init__(self, environ):
428 headers = {}
429 for header, value in environ.items():
430 name = self.parse_header_name(header)
431 if name:
432 headers[name] = value
433 super().__init__(headers)
435 def __getitem__(self, key):
436 """Allow header lookup using underscores in place of hyphens."""
437 return super().__getitem__(key.replace("_", "-"))
439 @classmethod
440 def parse_header_name(cls, header):
441 if header.startswith(cls.HTTP_PREFIX):
442 header = header[len(cls.HTTP_PREFIX) :]
443 elif header not in cls.UNPREFIXED_HEADERS:
444 return None
445 return header.replace("_", "-").title()
448class QueryDict(MultiValueDict):
449 """
450 A specialized MultiValueDict which represents a query string.
452 A QueryDict can be used to represent GET or POST data. It subclasses
453 MultiValueDict since keys in such data can be repeated, for instance
454 in the data from a form with a <select multiple> field.
456 By default QueryDicts are immutable, though the copy() method
457 will always return a mutable copy.
459 Both keys and values set on this class are converted from the given encoding
460 (DEFAULT_CHARSET by default) to str.
461 """
463 # These are both reset in __init__, but is specified here at the class
464 # level so that unpickling will have valid values
465 _mutable = True
466 _encoding = None
468 def __init__(self, query_string=None, mutable=False, encoding=None):
469 super().__init__()
470 self.encoding = encoding or settings.DEFAULT_CHARSET
471 query_string = query_string or ""
472 parse_qsl_kwargs = {
473 "keep_blank_values": True,
474 "encoding": self.encoding,
475 "max_num_fields": settings.DATA_UPLOAD_MAX_NUMBER_FIELDS,
476 }
477 if isinstance(query_string, bytes):
478 # query_string normally contains URL-encoded data, a subset of ASCII.
479 try:
480 query_string = query_string.decode(self.encoding)
481 except UnicodeDecodeError:
482 # ... but some user agents are misbehaving :-(
483 query_string = query_string.decode("iso-8859-1")
484 try:
485 for key, value in parse_qsl(query_string, **parse_qsl_kwargs):
486 self.appendlist(key, value)
487 except ValueError as e:
488 # ValueError can also be raised if the strict_parsing argument to
489 # parse_qsl() is True. As that is not used by Django, assume that
490 # the exception was raised by exceeding the value of max_num_fields
491 # instead of fragile checks of exception message strings.
492 raise TooManyFieldsSent(
493 "The number of GET/POST parameters exceeded "
494 "settings.DATA_UPLOAD_MAX_NUMBER_FIELDS."
495 ) from e
496 self._mutable = mutable
498 @classmethod
499 def fromkeys(cls, iterable, value="", mutable=False, encoding=None):
500 """
501 Return a new QueryDict with keys (may be repeated) from an iterable and
502 values from value.
503 """
504 q = cls("", mutable=True, encoding=encoding)
505 for key in iterable:
506 q.appendlist(key, value)
507 if not mutable:
508 q._mutable = False
509 return q
511 @property
512 def encoding(self):
513 if self._encoding is None: 513 ↛ 514line 513 didn't jump to line 514, because the condition on line 513 was never true
514 self._encoding = settings.DEFAULT_CHARSET
515 return self._encoding
517 @encoding.setter
518 def encoding(self, value):
519 self._encoding = value
521 def _assert_mutable(self):
522 if not self._mutable: 522 ↛ 523line 522 didn't jump to line 523, because the condition on line 522 was never true
523 raise AttributeError("This QueryDict instance is immutable")
525 def __setitem__(self, key, value):
526 self._assert_mutable()
527 key = bytes_to_text(key, self.encoding)
528 value = bytes_to_text(value, self.encoding)
529 super().__setitem__(key, value)
531 def __delitem__(self, key):
532 self._assert_mutable()
533 super().__delitem__(key)
535 def __copy__(self):
536 result = self.__class__("", mutable=True, encoding=self.encoding)
537 for key, value in self.lists():
538 result.setlist(key, value)
539 return result
541 def __deepcopy__(self, memo):
542 result = self.__class__("", mutable=True, encoding=self.encoding)
543 memo[id(self)] = result
544 for key, value in self.lists(): 544 ↛ 545line 544 didn't jump to line 545, because the loop on line 544 never started
545 result.setlist(copy.deepcopy(key, memo), copy.deepcopy(value, memo))
546 return result
548 def setlist(self, key, list_):
549 self._assert_mutable()
550 key = bytes_to_text(key, self.encoding)
551 list_ = [bytes_to_text(elt, self.encoding) for elt in list_]
552 super().setlist(key, list_)
554 def setlistdefault(self, key, default_list=None):
555 self._assert_mutable()
556 return super().setlistdefault(key, default_list)
558 def appendlist(self, key, value):
559 self._assert_mutable()
560 key = bytes_to_text(key, self.encoding)
561 value = bytes_to_text(value, self.encoding)
562 super().appendlist(key, value)
564 def pop(self, key, *args):
565 self._assert_mutable()
566 return super().pop(key, *args)
568 def popitem(self):
569 self._assert_mutable()
570 return super().popitem()
572 def clear(self):
573 self._assert_mutable()
574 super().clear()
576 def setdefault(self, key, default=None):
577 self._assert_mutable()
578 key = bytes_to_text(key, self.encoding)
579 default = bytes_to_text(default, self.encoding)
580 return super().setdefault(key, default)
582 def copy(self):
583 """Return a mutable copy of this object."""
584 return self.__deepcopy__({})
586 def urlencode(self, safe=None):
587 """
588 Return an encoded string of all query string arguments.
590 `safe` specifies characters which don't require quoting, for example::
592 >>> q = QueryDict(mutable=True)
593 >>> q['next'] = '/a&b/'
594 >>> q.urlencode()
595 'next=%2Fa%26b%2F'
596 >>> q.urlencode(safe='/')
597 'next=/a%26b/'
598 """
599 output = []
600 if safe:
601 safe = safe.encode(self.encoding)
603 def encode(k, v):
604 return "%s=%s" % ((quote(k, safe), quote(v, safe)))
606 else:
608 def encode(k, v):
609 return urlencode({k: v})
611 for k, list_ in self.lists():
612 output.extend(
613 encode(k.encode(self.encoding), str(v).encode(self.encoding))
614 for v in list_
615 )
616 return "&".join(output)
619class MediaType:
620 def __init__(self, media_type_raw_line):
621 full_type, self.params = parse_header(
622 media_type_raw_line.encode("ascii") if media_type_raw_line else b""
623 )
624 self.main_type, _, self.sub_type = full_type.partition("/")
626 def __str__(self):
627 params_str = "".join(
628 "; %s=%s" % (k, v.decode("ascii")) for k, v in self.params.items()
629 )
630 return "%s%s%s" % (
631 self.main_type,
632 ("/%s" % self.sub_type) if self.sub_type else "",
633 params_str,
634 )
636 def __repr__(self):
637 return "<%s: %s>" % (self.__class__.__qualname__, self)
639 @property
640 def is_all_types(self):
641 return self.main_type == "*" and self.sub_type == "*"
643 def match(self, other):
644 if self.is_all_types:
645 return True
646 other = MediaType(other)
647 if self.main_type == other.main_type and self.sub_type in {"*", other.sub_type}:
648 return True
649 return False
652# It's neither necessary nor appropriate to use
653# django.utils.encoding.force_str() for parsing URLs and form inputs. Thus,
654# this slightly more restricted function, used by QueryDict.
655def bytes_to_text(s, encoding):
656 """
657 Convert bytes objects to strings, using the given encoding. Illegally
658 encoded input characters are replaced with Unicode "unknown" codepoint
659 (\ufffd).
661 Return any non-bytes objects without change.
662 """
663 if isinstance(s, bytes): 663 ↛ 664line 663 didn't jump to line 664, because the condition on line 663 was never true
664 return str(s, encoding, "replace")
665 else:
666 return s
669def split_domain_port(host):
670 """
671 Return a (domain, port) tuple from a given host.
673 Returned domain is lowercased. If the host is invalid, the domain will be
674 empty.
675 """
676 host = host.lower()
678 if not host_validation_re.match(host): 678 ↛ 679line 678 didn't jump to line 679, because the condition on line 678 was never true
679 return "", ""
681 if host[-1] == "]": 681 ↛ 683line 681 didn't jump to line 683, because the condition on line 681 was never true
682 # It's an IPv6 address without a port.
683 return host, ""
684 bits = host.rsplit(":", 1)
685 domain, port = bits if len(bits) == 2 else (bits[0], "")
686 # Remove a trailing dot (if present) from the domain.
687 domain = domain[:-1] if domain.endswith(".") else domain
688 return domain, port
691def validate_host(host, allowed_hosts):
692 """
693 Validate the given host for this site.
695 Check that the host looks valid and matches a host or host pattern in the
696 given list of ``allowed_hosts``. Any pattern beginning with a period
697 matches a domain and all its subdomains (e.g. ``.example.com`` matches
698 ``example.com`` and any subdomain), ``*`` matches anything, and anything
699 else must match exactly.
701 Note: This function assumes that the given host is lowercased and has
702 already had the port, if any, stripped off.
704 Return ``True`` for a valid host, ``False`` otherwise.
705 """
706 return any( 706 ↛ exitline 706 didn't finish the generator expression on line 706
707 pattern == "*" or is_same_domain(host, pattern) for pattern in allowed_hosts
708 )
711def parse_accept_header(header):
712 return [MediaType(token) for token in header.split(",") if token.strip()]