Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/integrations/_wsgi_common.py: 23%

90 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import json 

2 

3from sentry_sdk.hub import Hub, _should_send_default_pii 

4from sentry_sdk.utils import AnnotatedValue 

5from sentry_sdk._compat import text_type, iteritems 

6 

7from sentry_sdk._types import MYPY 

8 

9if MYPY: 9 ↛ 10line 9 didn't jump to line 10, because the condition on line 9 was never true

10 import sentry_sdk 

11 

12 from typing import Any 

13 from typing import Dict 

14 from typing import Optional 

15 from typing import Union 

16 

17 

18SENSITIVE_ENV_KEYS = ( 

19 "REMOTE_ADDR", 

20 "HTTP_X_FORWARDED_FOR", 

21 "HTTP_SET_COOKIE", 

22 "HTTP_COOKIE", 

23 "HTTP_AUTHORIZATION", 

24 "HTTP_X_API_KEY", 

25 "HTTP_X_FORWARDED_FOR", 

26 "HTTP_X_REAL_IP", 

27) 

28 

29SENSITIVE_HEADERS = tuple( 

30 x[len("HTTP_") :] for x in SENSITIVE_ENV_KEYS if x.startswith("HTTP_") 

31) 

32 

33 

34def request_body_within_bounds(client, content_length): 

35 # type: (Optional[sentry_sdk.Client], int) -> bool 

36 if client is None: 

37 return False 

38 

39 bodies = client.options["request_bodies"] 

40 return not ( 

41 bodies == "never" 

42 or (bodies == "small" and content_length > 10**3) 

43 or (bodies == "medium" and content_length > 10**4) 

44 ) 

45 

46 

47class RequestExtractor(object): 

48 def __init__(self, request): 

49 # type: (Any) -> None 

50 self.request = request 

51 

52 def extract_into_event(self, event): 

53 # type: (Dict[str, Any]) -> None 

54 client = Hub.current.client 

55 if client is None: 

56 return 

57 

58 data = None # type: Optional[Union[AnnotatedValue, Dict[str, Any]]] 

59 

60 content_length = self.content_length() 

61 request_info = event.get("request", {}) 

62 

63 if _should_send_default_pii(): 

64 request_info["cookies"] = dict(self.cookies()) 

65 

66 if not request_body_within_bounds(client, content_length): 

67 data = AnnotatedValue( 

68 "", 

69 {"rem": [["!config", "x", 0, content_length]], "len": content_length}, 

70 ) 

71 else: 

72 parsed_body = self.parsed_body() 

73 if parsed_body is not None: 

74 data = parsed_body 

75 elif self.raw_data(): 

76 data = AnnotatedValue( 

77 "", 

78 {"rem": [["!raw", "x", 0, content_length]], "len": content_length}, 

79 ) 

80 else: 

81 data = None 

82 

83 if data is not None: 

84 request_info["data"] = data 

85 

86 event["request"] = request_info 

87 

88 def content_length(self): 

89 # type: () -> int 

90 try: 

91 return int(self.env().get("CONTENT_LENGTH", 0)) 

92 except ValueError: 

93 return 0 

94 

95 def cookies(self): 

96 # type: () -> Dict[str, Any] 

97 raise NotImplementedError() 

98 

99 def raw_data(self): 

100 # type: () -> Optional[Union[str, bytes]] 

101 raise NotImplementedError() 

102 

103 def form(self): 

104 # type: () -> Optional[Dict[str, Any]] 

105 raise NotImplementedError() 

106 

107 def parsed_body(self): 

108 # type: () -> Optional[Dict[str, Any]] 

109 form = self.form() 

110 files = self.files() 

111 if form or files: 

112 data = dict(iteritems(form)) 

113 for k, v in iteritems(files): 

114 size = self.size_of_file(v) 

115 data[k] = AnnotatedValue( 

116 "", {"len": size, "rem": [["!raw", "x", 0, size]]} 

117 ) 

118 

119 return data 

120 

121 return self.json() 

122 

123 def is_json(self): 

124 # type: () -> bool 

125 return _is_json_content_type(self.env().get("CONTENT_TYPE")) 

126 

127 def json(self): 

128 # type: () -> Optional[Any] 

129 try: 

130 if not self.is_json(): 

131 return None 

132 

133 raw_data = self.raw_data() 

134 if raw_data is None: 

135 return None 

136 

137 if isinstance(raw_data, text_type): 

138 return json.loads(raw_data) 

139 else: 

140 return json.loads(raw_data.decode("utf-8")) 

141 except ValueError: 

142 pass 

143 

144 return None 

145 

146 def files(self): 

147 # type: () -> Optional[Dict[str, Any]] 

148 raise NotImplementedError() 

149 

150 def size_of_file(self, file): 

151 # type: (Any) -> int 

152 raise NotImplementedError() 

153 

154 def env(self): 

155 # type: () -> Dict[str, Any] 

156 raise NotImplementedError() 

157 

158 

159def _is_json_content_type(ct): 

160 # type: (Optional[str]) -> bool 

161 mt = (ct or "").split(";", 1)[0] 

162 return ( 

163 mt == "application/json" 

164 or (mt.startswith("application/")) 

165 and mt.endswith("+json") 

166 ) 

167 

168 

169def _filter_headers(headers): 

170 # type: (Dict[str, str]) -> Dict[str, str] 

171 if _should_send_default_pii(): 

172 return headers 

173 

174 return { 

175 k: ( 

176 v 

177 if k.upper().replace("-", "_") not in SENSITIVE_HEADERS 

178 else AnnotatedValue("", {"rem": [["!config", "x", 0, len(v)]]}) 

179 ) 

180 for k, v in iteritems(headers) 

181 }