Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/http/response.py: 53%
339 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import datetime
2import json
3import mimetypes
4import os
5import re
6import sys
7import time
8from collections.abc import Mapping
9from email.header import Header
10from http.client import responses
11from urllib.parse import quote, urlparse
13from django.conf import settings
14from django.core import signals, signing
15from django.core.exceptions import DisallowedRedirect
16from django.core.serializers.json import DjangoJSONEncoder
17from django.http.cookie import SimpleCookie
18from django.utils import timezone
19from django.utils.datastructures import (
20 CaseInsensitiveMapping,
21 _destruct_iterable_mapping_values,
22)
23from django.utils.encoding import iri_to_uri
24from django.utils.http import http_date
25from django.utils.regex_helper import _lazy_re_compile
27_charset_from_content_type_re = _lazy_re_compile(
28 r";\s*charset=(?P<charset>[^\s;]+)", re.I
29)
32class ResponseHeaders(CaseInsensitiveMapping):
33 def __init__(self, data):
34 """
35 Populate the initial data using __setitem__ to ensure values are
36 correctly encoded.
37 """
38 if not isinstance(data, Mapping): 38 ↛ 39line 38 didn't jump to line 39, because the condition on line 38 was never true
39 data = {k: v for k, v in _destruct_iterable_mapping_values(data)}
40 self._store = {}
41 for header, value in data.items(): 41 ↛ 42line 41 didn't jump to line 42, because the loop on line 41 never started
42 self[header] = value
44 def _convert_to_charset(self, value, charset, mime_encode=False):
45 """
46 Convert headers key/value to ascii/latin-1 native strings.
47 `charset` must be 'ascii' or 'latin-1'. If `mime_encode` is True and
48 `value` can't be represented in the given charset, apply MIME-encoding.
49 """
50 if not isinstance(value, (bytes, str)): 50 ↛ 51line 50 didn't jump to line 51, because the condition on line 50 was never true
51 value = str(value)
52 if (isinstance(value, bytes) and (b"\n" in value or b"\r" in value)) or ( 52 ↛ 55line 52 didn't jump to line 55, because the condition on line 52 was never true
53 isinstance(value, str) and ("\n" in value or "\r" in value)
54 ):
55 raise BadHeaderError(
56 "Header values can't contain newlines (got %r)" % value
57 )
58 try:
59 if isinstance(value, str): 59 ↛ 64line 59 didn't jump to line 64, because the condition on line 59 was never false
60 # Ensure string is valid in given charset
61 value.encode(charset)
62 else:
63 # Convert bytestring using given charset
64 value = value.decode(charset)
65 except UnicodeError as e:
66 if mime_encode:
67 value = Header(value, "utf-8", maxlinelen=sys.maxsize).encode()
68 else:
69 e.reason += ", HTTP response headers must be in %s format" % charset
70 raise
71 return value
73 def __delitem__(self, key):
74 self.pop(key)
76 def __setitem__(self, key, value):
77 key = self._convert_to_charset(key, "ascii")
78 value = self._convert_to_charset(value, "latin-1", mime_encode=True)
79 self._store[key.lower()] = (key, value)
81 def pop(self, key, default=None):
82 return self._store.pop(key.lower(), default)
84 def setdefault(self, key, value):
85 if key not in self: 85 ↛ exitline 85 didn't return from function 'setdefault', because the condition on line 85 was never false
86 self[key] = value
89class BadHeaderError(ValueError):
90 pass
93class HttpResponseBase:
94 """
95 An HTTP response base class with dictionary-accessed headers.
97 This class doesn't handle content. It should not be used directly.
98 Use the HttpResponse and StreamingHttpResponse subclasses instead.
99 """
101 status_code = 200
103 def __init__(
104 self, content_type=None, status=None, reason=None, charset=None, headers=None
105 ):
106 self.headers = ResponseHeaders(headers or {})
107 self._charset = charset
108 if content_type and "Content-Type" in self.headers: 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true
109 raise ValueError(
110 "'headers' must not contain 'Content-Type' when the "
111 "'content_type' parameter is provided."
112 )
113 if "Content-Type" not in self.headers: 113 ↛ 117line 113 didn't jump to line 117, because the condition on line 113 was never false
114 if content_type is None: 114 ↛ 116line 114 didn't jump to line 116, because the condition on line 114 was never false
115 content_type = "text/html; charset=%s" % self.charset
116 self.headers["Content-Type"] = content_type
117 self._resource_closers = []
118 # This parameter is set by the handler. It's necessary to preserve the
119 # historical behavior of request_finished.
120 self._handler_class = None
121 self.cookies = SimpleCookie()
122 self.closed = False
123 if status is not None:
124 try:
125 self.status_code = int(status)
126 except (ValueError, TypeError):
127 raise TypeError("HTTP status code must be an integer.")
129 if not 100 <= self.status_code <= 599: 129 ↛ 130line 129 didn't jump to line 130, because the condition on line 129 was never true
130 raise ValueError("HTTP status code must be an integer from 100 to 599.")
131 self._reason_phrase = reason
133 @property
134 def reason_phrase(self):
135 if self._reason_phrase is not None: 135 ↛ 136line 135 didn't jump to line 136, because the condition on line 135 was never true
136 return self._reason_phrase
137 # Leave self._reason_phrase unset in order to use the default
138 # reason phrase for status code.
139 return responses.get(self.status_code, "Unknown Status Code")
141 @reason_phrase.setter
142 def reason_phrase(self, value):
143 self._reason_phrase = value
145 @property
146 def charset(self):
147 if self._charset is not None: 147 ↛ 148line 147 didn't jump to line 148, because the condition on line 147 was never true
148 return self._charset
149 content_type = self.get("Content-Type", "")
150 matched = _charset_from_content_type_re.search(content_type)
151 if matched:
152 # Extract the charset and strip its double quotes
153 return matched["charset"].replace('"', "")
154 return settings.DEFAULT_CHARSET
156 @charset.setter
157 def charset(self, value):
158 self._charset = value
160 def serialize_headers(self):
161 """HTTP headers as a bytestring."""
162 return b"\r\n".join(
163 [
164 key.encode("ascii") + b": " + value.encode("latin-1")
165 for key, value in self.headers.items()
166 ]
167 )
169 __bytes__ = serialize_headers
171 @property
172 def _content_type_for_repr(self):
173 return (
174 ', "%s"' % self.headers["Content-Type"]
175 if "Content-Type" in self.headers
176 else ""
177 )
179 def __setitem__(self, header, value):
180 self.headers[header] = value
182 def __delitem__(self, header):
183 del self.headers[header]
185 def __getitem__(self, header):
186 return self.headers[header]
188 def has_header(self, header):
189 """Case-insensitive check for a header."""
190 return header in self.headers
192 __contains__ = has_header
194 def items(self):
195 return self.headers.items()
197 def get(self, header, alternate=None):
198 return self.headers.get(header, alternate)
200 def set_cookie(
201 self,
202 key,
203 value="",
204 max_age=None,
205 expires=None,
206 path="/",
207 domain=None,
208 secure=False,
209 httponly=False,
210 samesite=None,
211 ):
212 """
213 Set a cookie.
215 ``expires`` can be:
216 - a string in the correct format,
217 - a naive ``datetime.datetime`` object in UTC,
218 - an aware ``datetime.datetime`` object in any time zone.
219 If it is a ``datetime.datetime`` object then calculate ``max_age``.
220 """
221 self.cookies[key] = value
222 if expires is not None:
223 if isinstance(expires, datetime.datetime):
224 if timezone.is_naive(expires):
225 expires = timezone.make_aware(expires, timezone.utc)
226 delta = expires - datetime.datetime.now(tz=timezone.utc)
227 # Add one second so the date matches exactly (a fraction of
228 # time gets lost between converting to a timedelta and
229 # then the date string).
230 delta = delta + datetime.timedelta(seconds=1)
231 # Just set max_age - the max_age logic will set expires.
232 expires = None
233 max_age = max(0, delta.days * 86400 + delta.seconds)
234 else:
235 self.cookies[key]["expires"] = expires
236 else:
237 self.cookies[key]["expires"] = ""
238 if max_age is not None:
239 self.cookies[key]["max-age"] = int(max_age)
240 # IE requires expires, so set it if hasn't been already.
241 if not expires:
242 self.cookies[key]["expires"] = http_date(time.time() + max_age)
243 if path is not None:
244 self.cookies[key]["path"] = path
245 if domain is not None:
246 self.cookies[key]["domain"] = domain
247 if secure:
248 self.cookies[key]["secure"] = True
249 if httponly:
250 self.cookies[key]["httponly"] = True
251 if samesite:
252 if samesite.lower() not in ("lax", "none", "strict"):
253 raise ValueError('samesite must be "lax", "none", or "strict".')
254 self.cookies[key]["samesite"] = samesite
256 def setdefault(self, key, value):
257 """Set a header unless it has already been set."""
258 self.headers.setdefault(key, value)
260 def set_signed_cookie(self, key, value, salt="", **kwargs):
261 value = signing.get_cookie_signer(salt=key + salt).sign(value)
262 return self.set_cookie(key, value, **kwargs)
264 def delete_cookie(self, key, path="/", domain=None, samesite=None):
265 # Browsers can ignore the Set-Cookie header if the cookie doesn't use
266 # the secure flag and:
267 # - the cookie name starts with "__Host-" or "__Secure-", or
268 # - the samesite is "none".
269 secure = key.startswith(("__Secure-", "__Host-")) or (
270 samesite and samesite.lower() == "none"
271 )
272 self.set_cookie(
273 key,
274 max_age=0,
275 path=path,
276 domain=domain,
277 secure=secure,
278 expires="Thu, 01 Jan 1970 00:00:00 GMT",
279 samesite=samesite,
280 )
282 # Common methods used by subclasses
284 def make_bytes(self, value):
285 """Turn a value into a bytestring encoded in the output charset."""
286 # Per PEP 3333, this response body must be bytes. To avoid returning
287 # an instance of a subclass, this function returns `bytes(value)`.
288 # This doesn't make a copy when `value` already contains bytes.
290 # Handle string types -- we can't rely on force_bytes here because:
291 # - Python attempts str conversion first
292 # - when self._charset != 'utf-8' it re-encodes the content
293 if isinstance(value, (bytes, memoryview)):
294 return bytes(value)
295 if isinstance(value, str): 295 ↛ 298line 295 didn't jump to line 298, because the condition on line 295 was never false
296 return bytes(value.encode(self.charset))
297 # Handle non-string types.
298 return str(value).encode(self.charset)
300 # These methods partially implement the file-like object interface.
301 # See https://docs.python.org/library/io.html#io.IOBase
303 # The WSGI server must call this method upon completion of the request.
304 # See http://blog.dscpl.com.au/2012/10/obligations-for-calling-close-on.html
305 def close(self):
306 for closer in self._resource_closers:
307 try:
308 closer()
309 except Exception:
310 pass
311 # Free resources that were still referenced.
312 self._resource_closers.clear()
313 self.closed = True
314 signals.request_finished.send(sender=self._handler_class)
316 def write(self, content):
317 raise OSError("This %s instance is not writable" % self.__class__.__name__)
319 def flush(self):
320 pass
322 def tell(self):
323 raise OSError(
324 "This %s instance cannot tell its position" % self.__class__.__name__
325 )
327 # These methods partially implement a stream-like object interface.
328 # See https://docs.python.org/library/io.html#io.IOBase
330 def readable(self):
331 return False
333 def seekable(self):
334 return False
336 def writable(self):
337 return False
339 def writelines(self, lines):
340 raise OSError("This %s instance is not writable" % self.__class__.__name__)
343class HttpResponse(HttpResponseBase):
344 """
345 An HTTP response class with a string as content.
347 This content can be read, appended to, or replaced.
348 """
350 streaming = False
352 def __init__(self, content=b"", *args, **kwargs):
353 super().__init__(*args, **kwargs)
354 # Content is a bytestring. See the `content` property methods.
355 self.content = content
357 def __repr__(self):
358 return "<%(cls)s status_code=%(status_code)d%(content_type)s>" % {
359 "cls": self.__class__.__name__,
360 "status_code": self.status_code,
361 "content_type": self._content_type_for_repr,
362 }
364 def serialize(self):
365 """Full HTTP message, including headers, as a bytestring."""
366 return self.serialize_headers() + b"\r\n\r\n" + self.content
368 __bytes__ = serialize
370 @property
371 def content(self):
372 return b"".join(self._container)
374 @content.setter
375 def content(self, value):
376 # Consume iterators upon assignment to allow repeated iteration.
377 if hasattr(value, "__iter__") and not isinstance( 377 ↛ 380line 377 didn't jump to line 380, because the condition on line 377 was never true
378 value, (bytes, memoryview, str)
379 ):
380 content = b"".join(self.make_bytes(chunk) for chunk in value)
381 if hasattr(value, "close"):
382 try:
383 value.close()
384 except Exception:
385 pass
386 else:
387 content = self.make_bytes(value)
388 # Create a list of properly encoded bytestrings to support write().
389 self._container = [content]
391 def __iter__(self):
392 return iter(self._container)
394 def write(self, content):
395 self._container.append(self.make_bytes(content))
397 def tell(self):
398 return len(self.content)
400 def getvalue(self):
401 return self.content
403 def writable(self):
404 return True
406 def writelines(self, lines):
407 for line in lines:
408 self.write(line)
411class StreamingHttpResponse(HttpResponseBase):
412 """
413 A streaming HTTP response class with an iterator as content.
415 This should only be iterated once, when the response is streamed to the
416 client. However, it can be appended to or replaced with a new iterator
417 that wraps the original content (or yields entirely new content).
418 """
420 streaming = True
422 def __init__(self, streaming_content=(), *args, **kwargs):
423 super().__init__(*args, **kwargs)
424 # `streaming_content` should be an iterable of bytestrings.
425 # See the `streaming_content` property methods.
426 self.streaming_content = streaming_content
428 def __repr__(self):
429 return "<%(cls)s status_code=%(status_code)d%(content_type)s>" % {
430 "cls": self.__class__.__qualname__,
431 "status_code": self.status_code,
432 "content_type": self._content_type_for_repr,
433 }
435 @property
436 def content(self):
437 raise AttributeError(
438 "This %s instance has no `content` attribute. Use "
439 "`streaming_content` instead." % self.__class__.__name__
440 )
442 @property
443 def streaming_content(self):
444 return map(self.make_bytes, self._iterator)
446 @streaming_content.setter
447 def streaming_content(self, value):
448 self._set_streaming_content(value)
450 def _set_streaming_content(self, value):
451 # Ensure we can never iterate on "value" more than once.
452 self._iterator = iter(value)
453 if hasattr(value, "close"):
454 self._resource_closers.append(value.close)
456 def __iter__(self):
457 return self.streaming_content
459 def getvalue(self):
460 return b"".join(self.streaming_content)
463class FileResponse(StreamingHttpResponse):
464 """
465 A streaming HTTP response class optimized for files.
466 """
468 block_size = 4096
470 def __init__(self, *args, as_attachment=False, filename="", **kwargs):
471 self.as_attachment = as_attachment
472 self.filename = filename
473 super().__init__(*args, **kwargs)
475 def _set_streaming_content(self, value):
476 if not hasattr(value, "read"):
477 self.file_to_stream = None
478 return super()._set_streaming_content(value)
480 self.file_to_stream = filelike = value
481 if hasattr(filelike, "close"):
482 self._resource_closers.append(filelike.close)
483 value = iter(lambda: filelike.read(self.block_size), b"")
484 self.set_headers(filelike)
485 super()._set_streaming_content(value)
487 def set_headers(self, filelike):
488 """
489 Set some common response headers (Content-Length, Content-Type, and
490 Content-Disposition) based on the `filelike` response content.
491 """
492 encoding_map = {
493 "bzip2": "application/x-bzip",
494 "gzip": "application/gzip",
495 "xz": "application/x-xz",
496 }
497 filename = getattr(filelike, "name", None)
498 filename = (
499 filename if (isinstance(filename, str) and filename) else self.filename
500 )
501 if os.path.isabs(filename):
502 self.headers["Content-Length"] = os.path.getsize(filelike.name)
503 elif hasattr(filelike, "getbuffer"):
504 self.headers["Content-Length"] = filelike.getbuffer().nbytes
506 if self.headers.get("Content-Type", "").startswith("text/html"):
507 if filename:
508 content_type, encoding = mimetypes.guess_type(filename)
509 # Encoding isn't set to prevent browsers from automatically
510 # uncompressing files.
511 content_type = encoding_map.get(encoding, content_type)
512 self.headers["Content-Type"] = (
513 content_type or "application/octet-stream"
514 )
515 else:
516 self.headers["Content-Type"] = "application/octet-stream"
518 filename = self.filename or os.path.basename(filename)
519 if filename:
520 disposition = "attachment" if self.as_attachment else "inline"
521 try:
522 filename.encode("ascii")
523 file_expr = 'filename="{}"'.format(filename)
524 except UnicodeEncodeError:
525 file_expr = "filename*=utf-8''{}".format(quote(filename))
526 self.headers["Content-Disposition"] = "{}; {}".format(
527 disposition, file_expr
528 )
529 elif self.as_attachment:
530 self.headers["Content-Disposition"] = "attachment"
533class HttpResponseRedirectBase(HttpResponse):
534 allowed_schemes = ["http", "https", "ftp"]
536 def __init__(self, redirect_to, *args, **kwargs):
537 super().__init__(*args, **kwargs)
538 self["Location"] = iri_to_uri(redirect_to)
539 parsed = urlparse(str(redirect_to))
540 if parsed.scheme and parsed.scheme not in self.allowed_schemes:
541 raise DisallowedRedirect(
542 "Unsafe redirect to URL with protocol '%s'" % parsed.scheme
543 )
545 url = property(lambda self: self["Location"]) 545 ↛ exitline 545 didn't run the lambda on line 545
547 def __repr__(self):
548 return (
549 '<%(cls)s status_code=%(status_code)d%(content_type)s, url="%(url)s">'
550 % {
551 "cls": self.__class__.__name__,
552 "status_code": self.status_code,
553 "content_type": self._content_type_for_repr,
554 "url": self.url,
555 }
556 )
559class HttpResponseRedirect(HttpResponseRedirectBase):
560 status_code = 302
563class HttpResponsePermanentRedirect(HttpResponseRedirectBase):
564 status_code = 301
567class HttpResponseNotModified(HttpResponse):
568 status_code = 304
570 def __init__(self, *args, **kwargs):
571 super().__init__(*args, **kwargs)
572 del self["content-type"]
574 @HttpResponse.content.setter
575 def content(self, value):
576 if value:
577 raise AttributeError(
578 "You cannot set content to a 304 (Not Modified) response"
579 )
580 self._container = []
583class HttpResponseBadRequest(HttpResponse):
584 status_code = 400
587class HttpResponseNotFound(HttpResponse):
588 status_code = 404
591class HttpResponseForbidden(HttpResponse):
592 status_code = 403
595class HttpResponseNotAllowed(HttpResponse):
596 status_code = 405
598 def __init__(self, permitted_methods, *args, **kwargs):
599 super().__init__(*args, **kwargs)
600 self["Allow"] = ", ".join(permitted_methods)
602 def __repr__(self):
603 return "<%(cls)s [%(methods)s] status_code=%(status_code)d%(content_type)s>" % {
604 "cls": self.__class__.__name__,
605 "status_code": self.status_code,
606 "content_type": self._content_type_for_repr,
607 "methods": self["Allow"],
608 }
611class HttpResponseGone(HttpResponse):
612 status_code = 410
615class HttpResponseServerError(HttpResponse):
616 status_code = 500
619class Http404(Exception):
620 pass
623class JsonResponse(HttpResponse):
624 """
625 An HTTP response class that consumes data to be serialized to JSON.
627 :param data: Data to be dumped into json. By default only ``dict`` objects
628 are allowed to be passed due to a security flaw before ECMAScript 5. See
629 the ``safe`` parameter for more information.
630 :param encoder: Should be a json encoder class. Defaults to
631 ``django.core.serializers.json.DjangoJSONEncoder``.
632 :param safe: Controls if only ``dict`` objects may be serialized. Defaults
633 to ``True``.
634 :param json_dumps_params: A dictionary of kwargs passed to json.dumps().
635 """
637 def __init__(
638 self,
639 data,
640 encoder=DjangoJSONEncoder,
641 safe=True,
642 json_dumps_params=None,
643 **kwargs,
644 ):
645 if safe and not isinstance(data, dict):
646 raise TypeError(
647 "In order to allow non-dict objects to be serialized set the "
648 "safe parameter to False."
649 )
650 if json_dumps_params is None:
651 json_dumps_params = {}
652 kwargs.setdefault("content_type", "application/json")
653 data = json.dumps(data, cls=encoder, **json_dumps_params)
654 super().__init__(content=data, **kwargs)