Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/core/handlers/wsgi.py: 70%

127 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from io import BytesIO 

2 

3from django.conf import settings 

4from django.core import signals 

5from django.core.handlers import base 

6from django.http import HttpRequest, QueryDict, parse_cookie 

7from django.urls import set_script_prefix 

8from django.utils.encoding import repercent_broken_unicode 

9from django.utils.functional import cached_property 

10from django.utils.regex_helper import _lazy_re_compile 

11 

12_slashes_re = _lazy_re_compile(rb"/+") 

13 

14 

15class LimitedStream: 

16 """Wrap another stream to disallow reading it past a number of bytes.""" 

17 

18 def __init__(self, stream, limit, buf_size=64 * 1024 * 1024): 

19 self.stream = stream 

20 self.remaining = limit 

21 self.buffer = b"" 

22 self.buf_size = buf_size 

23 

24 def _read_limited(self, size=None): 

25 if size is None or size > self.remaining: 25 ↛ 27line 25 didn't jump to line 27, because the condition on line 25 was never false

26 size = self.remaining 

27 if size == 0: 

28 return b"" 

29 result = self.stream.read(size) 

30 self.remaining -= len(result) 

31 return result 

32 

33 def read(self, size=None): 

34 if size is None: 

35 result = self.buffer + self._read_limited() 

36 self.buffer = b"" 

37 elif size < len(self.buffer): 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true

38 result = self.buffer[:size] 

39 self.buffer = self.buffer[size:] 

40 else: # size >= len(self.buffer) 

41 result = self.buffer + self._read_limited(size - len(self.buffer)) 

42 self.buffer = b"" 

43 return result 

44 

45 def readline(self, size=None): 

46 while b"\n" not in self.buffer and (size is None or len(self.buffer) < size): 46 ↛ 55line 46 didn't jump to line 55, because the condition on line 46 was never false

47 if size: 47 ↛ 49line 47 didn't jump to line 49, because the condition on line 47 was never true

48 # since size is not None here, len(self.buffer) < size 

49 chunk = self._read_limited(size - len(self.buffer)) 

50 else: 

51 chunk = self._read_limited() 

52 if not chunk: 52 ↛ 54line 52 didn't jump to line 54, because the condition on line 52 was never false

53 break 

54 self.buffer += chunk 

55 sio = BytesIO(self.buffer) 

56 if size: 56 ↛ 57line 56 didn't jump to line 57, because the condition on line 56 was never true

57 line = sio.readline(size) 

58 else: 

59 line = sio.readline() 

60 self.buffer = sio.read() 

61 return line 

62 

63 

64class WSGIRequest(HttpRequest): 

65 def __init__(self, environ): 

66 script_name = get_script_name(environ) 

67 # If PATH_INFO is empty (e.g. accessing the SCRIPT_NAME URL without a 

68 # trailing slash), operate as if '/' was requested. 

69 path_info = get_path_info(environ) or "/" 

70 self.environ = environ 

71 self.path_info = path_info 

72 # be careful to only replace the first slash in the path because of 

73 # http://test/something and http://test//something being different as 

74 # stated in https://www.ietf.org/rfc/rfc2396.txt 

75 self.path = "%s/%s" % (script_name.rstrip("/"), path_info.replace("/", "", 1)) 

76 self.META = environ 

77 self.META["PATH_INFO"] = path_info 

78 self.META["SCRIPT_NAME"] = script_name 

79 self.method = environ["REQUEST_METHOD"].upper() 

80 # Set content_type, content_params, and encoding. 

81 self._set_content_type_params(environ) 

82 try: 

83 content_length = int(environ.get("CONTENT_LENGTH")) 

84 except (ValueError, TypeError): 

85 content_length = 0 

86 self._stream = LimitedStream(self.environ["wsgi.input"], content_length) 

87 self._read_started = False 

88 self.resolver_match = None 

89 

90 def _get_scheme(self): 

91 return self.environ.get("wsgi.url_scheme") 

92 

93 @cached_property 

94 def GET(self): 

95 # The WSGI spec says 'QUERY_STRING' may be absent. 

96 raw_query_string = get_bytes_from_wsgi(self.environ, "QUERY_STRING", "") 

97 return QueryDict(raw_query_string, encoding=self._encoding) 

98 

99 def _get_post(self): 

100 if not hasattr(self, "_post"): 

101 self._load_post_and_files() 

102 return self._post 

103 

104 def _set_post(self, post): 

105 self._post = post 

106 

107 @cached_property 

108 def COOKIES(self): 

109 raw_cookie = get_str_from_wsgi(self.environ, "HTTP_COOKIE", "") 

110 return parse_cookie(raw_cookie) 

111 

112 @property 

113 def FILES(self): 

114 if not hasattr(self, "_files"): 

115 self._load_post_and_files() 

116 return self._files 

117 

118 POST = property(_get_post, _set_post) 

119 

120 

121class WSGIHandler(base.BaseHandler): 

122 request_class = WSGIRequest 

123 

124 def __init__(self, *args, **kwargs): 

125 super().__init__(*args, **kwargs) 

126 self.load_middleware() 

127 

128 def __call__(self, environ, start_response): 

129 set_script_prefix(get_script_name(environ)) 

130 signals.request_started.send(sender=self.__class__, environ=environ) 

131 request = self.request_class(environ) 

132 response = self.get_response(request) 

133 

134 response._handler_class = self.__class__ 

135 

136 status = "%d %s" % (response.status_code, response.reason_phrase) 

137 response_headers = [ 

138 *response.items(), 

139 *(("Set-Cookie", c.output(header="")) for c in response.cookies.values()), 

140 ] 

141 start_response(status, response_headers) 

142 if getattr(response, "file_to_stream", None) is not None and environ.get( 

143 "wsgi.file_wrapper" 

144 ): 

145 # If `wsgi.file_wrapper` is used the WSGI server does not call 

146 # .close on the response, but on the file wrapper. Patch it to use 

147 # response.close instead which takes care of closing all files. 

148 response.file_to_stream.close = response.close 

149 response = environ["wsgi.file_wrapper"]( 

150 response.file_to_stream, response.block_size 

151 ) 

152 return response 

153 

154 

155def get_path_info(environ): 

156 """Return the HTTP request's PATH_INFO as a string.""" 

157 path_info = get_bytes_from_wsgi(environ, "PATH_INFO", "/") 

158 

159 return repercent_broken_unicode(path_info).decode() 

160 

161 

162def get_script_name(environ): 

163 """ 

164 Return the equivalent of the HTTP request's SCRIPT_NAME environment 

165 variable. If Apache mod_rewrite is used, return what would have been 

166 the script name prior to any rewriting (so it's the script name as seen 

167 from the client's perspective), unless the FORCE_SCRIPT_NAME setting is 

168 set (to anything). 

169 """ 

170 if settings.FORCE_SCRIPT_NAME is not None: 170 ↛ 171line 170 didn't jump to line 171, because the condition on line 170 was never true

171 return settings.FORCE_SCRIPT_NAME 

172 

173 # If Apache's mod_rewrite had a whack at the URL, Apache set either 

174 # SCRIPT_URL or REDIRECT_URL to the full resource URL before applying any 

175 # rewrites. Unfortunately not every web server (lighttpd!) passes this 

176 # information through all the time, so FORCE_SCRIPT_NAME, above, is still 

177 # needed. 

178 script_url = get_bytes_from_wsgi(environ, "SCRIPT_URL", "") or get_bytes_from_wsgi( 

179 environ, "REDIRECT_URL", "" 

180 ) 

181 

182 if script_url: 182 ↛ 183line 182 didn't jump to line 183, because the condition on line 182 was never true

183 if b"//" in script_url: 

184 # mod_wsgi squashes multiple successive slashes in PATH_INFO, 

185 # do the same with script_url before manipulating paths (#17133). 

186 script_url = _slashes_re.sub(b"/", script_url) 

187 path_info = get_bytes_from_wsgi(environ, "PATH_INFO", "") 

188 script_name = script_url[: -len(path_info)] if path_info else script_url 

189 else: 

190 script_name = get_bytes_from_wsgi(environ, "SCRIPT_NAME", "") 

191 

192 return script_name.decode() 

193 

194 

195def get_bytes_from_wsgi(environ, key, default): 

196 """ 

197 Get a value from the WSGI environ dictionary as bytes. 

198 

199 key and default should be strings. 

200 """ 

201 value = environ.get(key, default) 

202 # Non-ASCII values in the WSGI environ are arbitrarily decoded with 

203 # ISO-8859-1. This is wrong for Django websites where UTF-8 is the default. 

204 # Re-encode to recover the original bytestring. 

205 return value.encode("iso-8859-1") 

206 

207 

208def get_str_from_wsgi(environ, key, default): 

209 """ 

210 Get a value from the WSGI environ dictionary as str. 

211 

212 key and default should be str objects. 

213 """ 

214 value = get_bytes_from_wsgi(environ, key, default) 

215 return value.decode(errors="replace")