Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/corsheaders/middleware.py: 39%

97 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import annotations 

2 

3import re 

4from typing import Any 

5from urllib.parse import ParseResult, urlparse 

6 

7from django.http import HttpRequest, HttpResponse 

8from django.utils.cache import patch_vary_headers 

9from django.utils.deprecation import MiddlewareMixin 

10 

11from corsheaders.conf import conf 

12from corsheaders.signals import check_request_enabled 

13 

14ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin" 

15ACCESS_CONTROL_EXPOSE_HEADERS = "Access-Control-Expose-Headers" 

16ACCESS_CONTROL_ALLOW_CREDENTIALS = "Access-Control-Allow-Credentials" 

17ACCESS_CONTROL_ALLOW_HEADERS = "Access-Control-Allow-Headers" 

18ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods" 

19ACCESS_CONTROL_MAX_AGE = "Access-Control-Max-Age" 

20 

21 

22class CorsPostCsrfMiddleware(MiddlewareMixin): 

23 def _https_referer_replace_reverse(self, request: HttpRequest) -> None: 

24 """ 

25 Put the HTTP_REFERER back to its original value and delete the 

26 temporary storage 

27 """ 

28 if conf.CORS_REPLACE_HTTPS_REFERER and "ORIGINAL_HTTP_REFERER" in request.META: 

29 http_referer = request.META["ORIGINAL_HTTP_REFERER"] 

30 request.META["HTTP_REFERER"] = http_referer 

31 del request.META["ORIGINAL_HTTP_REFERER"] 

32 

33 def process_request(self, request: HttpRequest) -> None: 

34 self._https_referer_replace_reverse(request) 

35 return None 

36 

37 def process_view( 

38 self, 

39 request: HttpRequest, 

40 callback: Any, 

41 callback_args: Any, 

42 callback_kwargs: Any, 

43 ) -> None: 

44 self._https_referer_replace_reverse(request) 

45 return None 

46 

47 

48class CorsMiddleware(MiddlewareMixin): 

49 def _https_referer_replace(self, request: HttpRequest) -> None: 

50 """ 

51 When https is enabled, django CSRF checking includes referer checking 

52 which breaks when using CORS. This function updates the HTTP_REFERER 

53 header to make sure it matches HTTP_HOST, provided that our cors logic 

54 succeeds 

55 """ 

56 origin = request.META.get("HTTP_ORIGIN") 

57 

58 if ( 

59 request.is_secure() 

60 and origin 

61 and "ORIGINAL_HTTP_REFERER" not in request.META 

62 ): 

63 

64 url = urlparse(origin) 

65 if ( 

66 not conf.CORS_ALLOW_ALL_ORIGINS 

67 and not self.origin_found_in_white_lists(origin, url) 

68 ): 

69 return 

70 

71 try: 

72 http_referer = request.META["HTTP_REFERER"] 

73 http_host = "https://%s/" % request.META["HTTP_HOST"] 

74 request.META = request.META.copy() 

75 request.META["ORIGINAL_HTTP_REFERER"] = http_referer 

76 request.META["HTTP_REFERER"] = http_host 

77 except KeyError: 

78 pass 

79 

80 def process_request(self, request: HttpRequest) -> HttpResponse | None: 

81 """ 

82 If CORS preflight header, then create an 

83 empty body response (200 OK) and return it 

84 

85 Django won't bother calling any other request 

86 view/exception middleware along with the requested view; 

87 it will call any response middlewares 

88 """ 

89 request._cors_enabled = self.is_enabled(request) 

90 if request._cors_enabled: 90 ↛ 101line 90 didn't jump to line 101, because the condition on line 90 was never false

91 if conf.CORS_REPLACE_HTTPS_REFERER: 91 ↛ 92line 91 didn't jump to line 92, because the condition on line 91 was never true

92 self._https_referer_replace(request) 

93 

94 if ( 94 ↛ 98line 94 didn't jump to line 98

95 request.method == "OPTIONS" 

96 and "HTTP_ACCESS_CONTROL_REQUEST_METHOD" in request.META 

97 ): 

98 response = HttpResponse() 

99 response["Content-Length"] = "0" 

100 return response 

101 return None 

102 

103 def process_view( 

104 self, 

105 request: HttpRequest, 

106 callback: Any, 

107 callback_args: Any, 

108 callback_kwargs: Any, 

109 ) -> None: 

110 """ 

111 Do the referer replacement here as well 

112 """ 

113 if request._cors_enabled and conf.CORS_REPLACE_HTTPS_REFERER: 113 ↛ 114line 113 didn't jump to line 114, because the condition on line 113 was never true

114 self._https_referer_replace(request) 

115 return None 

116 

117 def process_response( 

118 self, request: HttpRequest, response: HttpResponse 

119 ) -> HttpResponse: 

120 """ 

121 Add the respective CORS headers 

122 """ 

123 enabled = getattr(request, "_cors_enabled", None) 

124 if enabled is None: 124 ↛ 125line 124 didn't jump to line 125, because the condition on line 124 was never true

125 enabled = self.is_enabled(request) 

126 

127 if not enabled: 127 ↛ 128line 127 didn't jump to line 128, because the condition on line 127 was never true

128 return response 

129 

130 patch_vary_headers(response, ["Origin"]) 

131 

132 origin = request.META.get("HTTP_ORIGIN") 

133 if not origin: 133 ↛ 136line 133 didn't jump to line 136, because the condition on line 133 was never false

134 return response 

135 

136 try: 

137 url = urlparse(origin) 

138 except ValueError: 

139 return response 

140 

141 if conf.CORS_ALLOW_CREDENTIALS: 

142 response[ACCESS_CONTROL_ALLOW_CREDENTIALS] = "true" 

143 

144 if ( 

145 not conf.CORS_ALLOW_ALL_ORIGINS 

146 and not self.origin_found_in_white_lists(origin, url) 

147 and not self.check_signal(request) 

148 ): 

149 return response 

150 

151 if conf.CORS_ALLOW_ALL_ORIGINS and not conf.CORS_ALLOW_CREDENTIALS: 

152 response[ACCESS_CONTROL_ALLOW_ORIGIN] = "*" 

153 else: 

154 response[ACCESS_CONTROL_ALLOW_ORIGIN] = origin 

155 

156 if len(conf.CORS_EXPOSE_HEADERS): 

157 response[ACCESS_CONTROL_EXPOSE_HEADERS] = ", ".join( 

158 conf.CORS_EXPOSE_HEADERS 

159 ) 

160 

161 if request.method == "OPTIONS": 

162 response[ACCESS_CONTROL_ALLOW_HEADERS] = ", ".join(conf.CORS_ALLOW_HEADERS) 

163 response[ACCESS_CONTROL_ALLOW_METHODS] = ", ".join(conf.CORS_ALLOW_METHODS) 

164 if conf.CORS_PREFLIGHT_MAX_AGE: 

165 response[ACCESS_CONTROL_MAX_AGE] = str(conf.CORS_PREFLIGHT_MAX_AGE) 

166 

167 return response 

168 

169 def origin_found_in_white_lists(self, origin: str, url: ParseResult) -> bool: 

170 return ( 

171 (origin == "null" and origin in conf.CORS_ALLOWED_ORIGINS) 

172 or self._url_in_whitelist(url) 

173 or self.regex_domain_match(origin) 

174 ) 

175 

176 def regex_domain_match(self, origin: str) -> bool: 

177 return any( 

178 re.match(domain_pattern, origin) 

179 for domain_pattern in conf.CORS_ALLOWED_ORIGIN_REGEXES 

180 ) 

181 

182 def is_enabled(self, request: HttpRequest) -> bool: 

183 return bool( 

184 re.match(conf.CORS_URLS_REGEX, request.path_info) 

185 ) or self.check_signal(request) 

186 

187 def check_signal(self, request: HttpRequest) -> bool: 

188 signal_responses = check_request_enabled.send(sender=None, request=request) 

189 return any(return_value for function, return_value in signal_responses) 

190 

191 def _url_in_whitelist(self, url: ParseResult) -> bool: 

192 origins = [urlparse(o) for o in conf.CORS_ALLOWED_ORIGINS] 

193 return any( 

194 origin.scheme == url.scheme and origin.netloc == url.netloc 

195 for origin in origins 

196 )