Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/integrations/_wsgi_common.py: 23%
90 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import json
3from sentry_sdk.hub import Hub, _should_send_default_pii
4from sentry_sdk.utils import AnnotatedValue
5from sentry_sdk._compat import text_type, iteritems
7from sentry_sdk._types import MYPY
9if MYPY: 9 ↛ 10line 9 didn't jump to line 10, because the condition on line 9 was never true
10 import sentry_sdk
12 from typing import Any
13 from typing import Dict
14 from typing import Optional
15 from typing import Union
18SENSITIVE_ENV_KEYS = (
19 "REMOTE_ADDR",
20 "HTTP_X_FORWARDED_FOR",
21 "HTTP_SET_COOKIE",
22 "HTTP_COOKIE",
23 "HTTP_AUTHORIZATION",
24 "HTTP_X_API_KEY",
25 "HTTP_X_FORWARDED_FOR",
26 "HTTP_X_REAL_IP",
27)
29SENSITIVE_HEADERS = tuple(
30 x[len("HTTP_") :] for x in SENSITIVE_ENV_KEYS if x.startswith("HTTP_")
31)
34def request_body_within_bounds(client, content_length):
35 # type: (Optional[sentry_sdk.Client], int) -> bool
36 if client is None:
37 return False
39 bodies = client.options["request_bodies"]
40 return not (
41 bodies == "never"
42 or (bodies == "small" and content_length > 10**3)
43 or (bodies == "medium" and content_length > 10**4)
44 )
47class RequestExtractor(object):
48 def __init__(self, request):
49 # type: (Any) -> None
50 self.request = request
52 def extract_into_event(self, event):
53 # type: (Dict[str, Any]) -> None
54 client = Hub.current.client
55 if client is None:
56 return
58 data = None # type: Optional[Union[AnnotatedValue, Dict[str, Any]]]
60 content_length = self.content_length()
61 request_info = event.get("request", {})
63 if _should_send_default_pii():
64 request_info["cookies"] = dict(self.cookies())
66 if not request_body_within_bounds(client, content_length):
67 data = AnnotatedValue(
68 "",
69 {"rem": [["!config", "x", 0, content_length]], "len": content_length},
70 )
71 else:
72 parsed_body = self.parsed_body()
73 if parsed_body is not None:
74 data = parsed_body
75 elif self.raw_data():
76 data = AnnotatedValue(
77 "",
78 {"rem": [["!raw", "x", 0, content_length]], "len": content_length},
79 )
80 else:
81 data = None
83 if data is not None:
84 request_info["data"] = data
86 event["request"] = request_info
88 def content_length(self):
89 # type: () -> int
90 try:
91 return int(self.env().get("CONTENT_LENGTH", 0))
92 except ValueError:
93 return 0
95 def cookies(self):
96 # type: () -> Dict[str, Any]
97 raise NotImplementedError()
99 def raw_data(self):
100 # type: () -> Optional[Union[str, bytes]]
101 raise NotImplementedError()
103 def form(self):
104 # type: () -> Optional[Dict[str, Any]]
105 raise NotImplementedError()
107 def parsed_body(self):
108 # type: () -> Optional[Dict[str, Any]]
109 form = self.form()
110 files = self.files()
111 if form or files:
112 data = dict(iteritems(form))
113 for k, v in iteritems(files):
114 size = self.size_of_file(v)
115 data[k] = AnnotatedValue(
116 "", {"len": size, "rem": [["!raw", "x", 0, size]]}
117 )
119 return data
121 return self.json()
123 def is_json(self):
124 # type: () -> bool
125 return _is_json_content_type(self.env().get("CONTENT_TYPE"))
127 def json(self):
128 # type: () -> Optional[Any]
129 try:
130 if not self.is_json():
131 return None
133 raw_data = self.raw_data()
134 if raw_data is None:
135 return None
137 if isinstance(raw_data, text_type):
138 return json.loads(raw_data)
139 else:
140 return json.loads(raw_data.decode("utf-8"))
141 except ValueError:
142 pass
144 return None
146 def files(self):
147 # type: () -> Optional[Dict[str, Any]]
148 raise NotImplementedError()
150 def size_of_file(self, file):
151 # type: (Any) -> int
152 raise NotImplementedError()
154 def env(self):
155 # type: () -> Dict[str, Any]
156 raise NotImplementedError()
159def _is_json_content_type(ct):
160 # type: (Optional[str]) -> bool
161 mt = (ct or "").split(";", 1)[0]
162 return (
163 mt == "application/json"
164 or (mt.startswith("application/"))
165 and mt.endswith("+json")
166 )
169def _filter_headers(headers):
170 # type: (Dict[str, str]) -> Dict[str, str]
171 if _should_send_default_pii():
172 return headers
174 return {
175 k: (
176 v
177 if k.upper().replace("-", "_") not in SENSITIVE_HEADERS
178 else AnnotatedValue("", {"rem": [["!config", "x", 0, len(v)]]})
179 )
180 for k, v in iteritems(headers)
181 }