Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/core/handlers/wsgi.py: 70%
127 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from io import BytesIO
3from django.conf import settings
4from django.core import signals
5from django.core.handlers import base
6from django.http import HttpRequest, QueryDict, parse_cookie
7from django.urls import set_script_prefix
8from django.utils.encoding import repercent_broken_unicode
9from django.utils.functional import cached_property
10from django.utils.regex_helper import _lazy_re_compile
12_slashes_re = _lazy_re_compile(rb"/+")
15class LimitedStream:
16 """Wrap another stream to disallow reading it past a number of bytes."""
18 def __init__(self, stream, limit, buf_size=64 * 1024 * 1024):
19 self.stream = stream
20 self.remaining = limit
21 self.buffer = b""
22 self.buf_size = buf_size
24 def _read_limited(self, size=None):
25 if size is None or size > self.remaining: 25 ↛ 27line 25 didn't jump to line 27, because the condition on line 25 was never false
26 size = self.remaining
27 if size == 0:
28 return b""
29 result = self.stream.read(size)
30 self.remaining -= len(result)
31 return result
33 def read(self, size=None):
34 if size is None:
35 result = self.buffer + self._read_limited()
36 self.buffer = b""
37 elif size < len(self.buffer): 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true
38 result = self.buffer[:size]
39 self.buffer = self.buffer[size:]
40 else: # size >= len(self.buffer)
41 result = self.buffer + self._read_limited(size - len(self.buffer))
42 self.buffer = b""
43 return result
45 def readline(self, size=None):
46 while b"\n" not in self.buffer and (size is None or len(self.buffer) < size): 46 ↛ 55line 46 didn't jump to line 55, because the condition on line 46 was never false
47 if size: 47 ↛ 49line 47 didn't jump to line 49, because the condition on line 47 was never true
48 # since size is not None here, len(self.buffer) < size
49 chunk = self._read_limited(size - len(self.buffer))
50 else:
51 chunk = self._read_limited()
52 if not chunk: 52 ↛ 54line 52 didn't jump to line 54, because the condition on line 52 was never false
53 break
54 self.buffer += chunk
55 sio = BytesIO(self.buffer)
56 if size: 56 ↛ 57line 56 didn't jump to line 57, because the condition on line 56 was never true
57 line = sio.readline(size)
58 else:
59 line = sio.readline()
60 self.buffer = sio.read()
61 return line
64class WSGIRequest(HttpRequest):
65 def __init__(self, environ):
66 script_name = get_script_name(environ)
67 # If PATH_INFO is empty (e.g. accessing the SCRIPT_NAME URL without a
68 # trailing slash), operate as if '/' was requested.
69 path_info = get_path_info(environ) or "/"
70 self.environ = environ
71 self.path_info = path_info
72 # be careful to only replace the first slash in the path because of
73 # http://test/something and http://test//something being different as
74 # stated in https://www.ietf.org/rfc/rfc2396.txt
75 self.path = "%s/%s" % (script_name.rstrip("/"), path_info.replace("/", "", 1))
76 self.META = environ
77 self.META["PATH_INFO"] = path_info
78 self.META["SCRIPT_NAME"] = script_name
79 self.method = environ["REQUEST_METHOD"].upper()
80 # Set content_type, content_params, and encoding.
81 self._set_content_type_params(environ)
82 try:
83 content_length = int(environ.get("CONTENT_LENGTH"))
84 except (ValueError, TypeError):
85 content_length = 0
86 self._stream = LimitedStream(self.environ["wsgi.input"], content_length)
87 self._read_started = False
88 self.resolver_match = None
90 def _get_scheme(self):
91 return self.environ.get("wsgi.url_scheme")
93 @cached_property
94 def GET(self):
95 # The WSGI spec says 'QUERY_STRING' may be absent.
96 raw_query_string = get_bytes_from_wsgi(self.environ, "QUERY_STRING", "")
97 return QueryDict(raw_query_string, encoding=self._encoding)
99 def _get_post(self):
100 if not hasattr(self, "_post"):
101 self._load_post_and_files()
102 return self._post
104 def _set_post(self, post):
105 self._post = post
107 @cached_property
108 def COOKIES(self):
109 raw_cookie = get_str_from_wsgi(self.environ, "HTTP_COOKIE", "")
110 return parse_cookie(raw_cookie)
112 @property
113 def FILES(self):
114 if not hasattr(self, "_files"):
115 self._load_post_and_files()
116 return self._files
118 POST = property(_get_post, _set_post)
121class WSGIHandler(base.BaseHandler):
122 request_class = WSGIRequest
124 def __init__(self, *args, **kwargs):
125 super().__init__(*args, **kwargs)
126 self.load_middleware()
128 def __call__(self, environ, start_response):
129 set_script_prefix(get_script_name(environ))
130 signals.request_started.send(sender=self.__class__, environ=environ)
131 request = self.request_class(environ)
132 response = self.get_response(request)
134 response._handler_class = self.__class__
136 status = "%d %s" % (response.status_code, response.reason_phrase)
137 response_headers = [
138 *response.items(),
139 *(("Set-Cookie", c.output(header="")) for c in response.cookies.values()),
140 ]
141 start_response(status, response_headers)
142 if getattr(response, "file_to_stream", None) is not None and environ.get(
143 "wsgi.file_wrapper"
144 ):
145 # If `wsgi.file_wrapper` is used the WSGI server does not call
146 # .close on the response, but on the file wrapper. Patch it to use
147 # response.close instead which takes care of closing all files.
148 response.file_to_stream.close = response.close
149 response = environ["wsgi.file_wrapper"](
150 response.file_to_stream, response.block_size
151 )
152 return response
155def get_path_info(environ):
156 """Return the HTTP request's PATH_INFO as a string."""
157 path_info = get_bytes_from_wsgi(environ, "PATH_INFO", "/")
159 return repercent_broken_unicode(path_info).decode()
162def get_script_name(environ):
163 """
164 Return the equivalent of the HTTP request's SCRIPT_NAME environment
165 variable. If Apache mod_rewrite is used, return what would have been
166 the script name prior to any rewriting (so it's the script name as seen
167 from the client's perspective), unless the FORCE_SCRIPT_NAME setting is
168 set (to anything).
169 """
170 if settings.FORCE_SCRIPT_NAME is not None: 170 ↛ 171line 170 didn't jump to line 171, because the condition on line 170 was never true
171 return settings.FORCE_SCRIPT_NAME
173 # If Apache's mod_rewrite had a whack at the URL, Apache set either
174 # SCRIPT_URL or REDIRECT_URL to the full resource URL before applying any
175 # rewrites. Unfortunately not every web server (lighttpd!) passes this
176 # information through all the time, so FORCE_SCRIPT_NAME, above, is still
177 # needed.
178 script_url = get_bytes_from_wsgi(environ, "SCRIPT_URL", "") or get_bytes_from_wsgi(
179 environ, "REDIRECT_URL", ""
180 )
182 if script_url: 182 ↛ 183line 182 didn't jump to line 183, because the condition on line 182 was never true
183 if b"//" in script_url:
184 # mod_wsgi squashes multiple successive slashes in PATH_INFO,
185 # do the same with script_url before manipulating paths (#17133).
186 script_url = _slashes_re.sub(b"/", script_url)
187 path_info = get_bytes_from_wsgi(environ, "PATH_INFO", "")
188 script_name = script_url[: -len(path_info)] if path_info else script_url
189 else:
190 script_name = get_bytes_from_wsgi(environ, "SCRIPT_NAME", "")
192 return script_name.decode()
195def get_bytes_from_wsgi(environ, key, default):
196 """
197 Get a value from the WSGI environ dictionary as bytes.
199 key and default should be strings.
200 """
201 value = environ.get(key, default)
202 # Non-ASCII values in the WSGI environ are arbitrarily decoded with
203 # ISO-8859-1. This is wrong for Django websites where UTF-8 is the default.
204 # Re-encode to recover the original bytestring.
205 return value.encode("iso-8859-1")
208def get_str_from_wsgi(environ, key, default):
209 """
210 Get a value from the WSGI environ dictionary as str.
212 key and default should be str objects.
213 """
214 value = get_bytes_from_wsgi(environ, key, default)
215 return value.decode(errors="replace")