Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/integrations/pyramid.py: 10%

124 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import absolute_import 

2 

3import os 

4import sys 

5import weakref 

6 

7from sentry_sdk.hub import Hub, _should_send_default_pii 

8from sentry_sdk.scope import Scope 

9from sentry_sdk.tracing import SOURCE_FOR_STYLE 

10from sentry_sdk.utils import ( 

11 capture_internal_exceptions, 

12 event_from_exception, 

13) 

14from sentry_sdk._compat import reraise, iteritems 

15 

16from sentry_sdk.integrations import Integration, DidNotEnable 

17from sentry_sdk.integrations._wsgi_common import RequestExtractor 

18from sentry_sdk.integrations.wsgi import SentryWsgiMiddleware 

19 

20try: 

21 from pyramid.httpexceptions import HTTPException 

22 from pyramid.request import Request 

23except ImportError: 

24 raise DidNotEnable("Pyramid not installed") 

25 

26from sentry_sdk._types import MYPY 

27 

28if MYPY: 

29 from pyramid.response import Response 

30 from typing import Any 

31 from sentry_sdk.integrations.wsgi import _ScopedResponse 

32 from typing import Callable 

33 from typing import Dict 

34 from typing import Optional 

35 from webob.cookies import RequestCookies # type: ignore 

36 from webob.compat import cgi_FieldStorage # type: ignore 

37 

38 from sentry_sdk.utils import ExcInfo 

39 from sentry_sdk._types import EventProcessor 

40 

41 

42if getattr(Request, "authenticated_userid", None): 

43 

44 def authenticated_userid(request): 

45 # type: (Request) -> Optional[Any] 

46 return request.authenticated_userid 

47 

48else: 

49 # bw-compat for pyramid < 1.5 

50 from pyramid.security import authenticated_userid # type: ignore 

51 

52 

53TRANSACTION_STYLE_VALUES = ("route_name", "route_pattern") 

54 

55 

56class PyramidIntegration(Integration): 

57 identifier = "pyramid" 

58 

59 transaction_style = "" 

60 

61 def __init__(self, transaction_style="route_name"): 

62 # type: (str) -> None 

63 if transaction_style not in TRANSACTION_STYLE_VALUES: 

64 raise ValueError( 

65 "Invalid value for transaction_style: %s (must be in %s)" 

66 % (transaction_style, TRANSACTION_STYLE_VALUES) 

67 ) 

68 self.transaction_style = transaction_style 

69 

70 @staticmethod 

71 def setup_once(): 

72 # type: () -> None 

73 from pyramid import router 

74 

75 old_call_view = router._call_view 

76 

77 def sentry_patched_call_view(registry, request, *args, **kwargs): 

78 # type: (Any, Request, *Any, **Any) -> Response 

79 hub = Hub.current 

80 integration = hub.get_integration(PyramidIntegration) 

81 

82 if integration is not None: 

83 with hub.configure_scope() as scope: 

84 _set_transaction_name_and_source( 

85 scope, integration.transaction_style, request 

86 ) 

87 scope.add_event_processor( 

88 _make_event_processor(weakref.ref(request), integration) 

89 ) 

90 

91 return old_call_view(registry, request, *args, **kwargs) 

92 

93 router._call_view = sentry_patched_call_view 

94 

95 if hasattr(Request, "invoke_exception_view"): 

96 old_invoke_exception_view = Request.invoke_exception_view 

97 

98 def sentry_patched_invoke_exception_view(self, *args, **kwargs): 

99 # type: (Request, *Any, **Any) -> Any 

100 rv = old_invoke_exception_view(self, *args, **kwargs) 

101 

102 if ( 

103 self.exc_info 

104 and all(self.exc_info) 

105 and rv.status_int == 500 

106 and Hub.current.get_integration(PyramidIntegration) is not None 

107 ): 

108 _capture_exception(self.exc_info) 

109 

110 return rv 

111 

112 Request.invoke_exception_view = sentry_patched_invoke_exception_view 

113 

114 old_wsgi_call = router.Router.__call__ 

115 

116 def sentry_patched_wsgi_call(self, environ, start_response): 

117 # type: (Any, Dict[str, str], Callable[..., Any]) -> _ScopedResponse 

118 hub = Hub.current 

119 integration = hub.get_integration(PyramidIntegration) 

120 if integration is None: 

121 return old_wsgi_call(self, environ, start_response) 

122 

123 def sentry_patched_inner_wsgi_call(environ, start_response): 

124 # type: (Dict[str, Any], Callable[..., Any]) -> Any 

125 try: 

126 return old_wsgi_call(self, environ, start_response) 

127 except Exception: 

128 einfo = sys.exc_info() 

129 _capture_exception(einfo) 

130 reraise(*einfo) 

131 

132 return SentryWsgiMiddleware(sentry_patched_inner_wsgi_call)( 

133 environ, start_response 

134 ) 

135 

136 router.Router.__call__ = sentry_patched_wsgi_call 

137 

138 

139def _capture_exception(exc_info): 

140 # type: (ExcInfo) -> None 

141 if exc_info[0] is None or issubclass(exc_info[0], HTTPException): 

142 return 

143 hub = Hub.current 

144 if hub.get_integration(PyramidIntegration) is None: 

145 return 

146 

147 # If an integration is there, a client has to be there. 

148 client = hub.client # type: Any 

149 

150 event, hint = event_from_exception( 

151 exc_info, 

152 client_options=client.options, 

153 mechanism={"type": "pyramid", "handled": False}, 

154 ) 

155 

156 hub.capture_event(event, hint=hint) 

157 

158 

159def _set_transaction_name_and_source(scope, transaction_style, request): 

160 # type: (Scope, str, Request) -> None 

161 try: 

162 name_for_style = { 

163 "route_name": request.matched_route.name, 

164 "route_pattern": request.matched_route.pattern, 

165 } 

166 scope.set_transaction_name( 

167 name_for_style[transaction_style], 

168 source=SOURCE_FOR_STYLE[transaction_style], 

169 ) 

170 except Exception: 

171 pass 

172 

173 

174class PyramidRequestExtractor(RequestExtractor): 

175 def url(self): 

176 # type: () -> str 

177 return self.request.path_url 

178 

179 def env(self): 

180 # type: () -> Dict[str, str] 

181 return self.request.environ 

182 

183 def cookies(self): 

184 # type: () -> RequestCookies 

185 return self.request.cookies 

186 

187 def raw_data(self): 

188 # type: () -> str 

189 return self.request.text 

190 

191 def form(self): 

192 # type: () -> Dict[str, str] 

193 return { 

194 key: value 

195 for key, value in iteritems(self.request.POST) 

196 if not getattr(value, "filename", None) 

197 } 

198 

199 def files(self): 

200 # type: () -> Dict[str, cgi_FieldStorage] 

201 return { 

202 key: value 

203 for key, value in iteritems(self.request.POST) 

204 if getattr(value, "filename", None) 

205 } 

206 

207 def size_of_file(self, postdata): 

208 # type: (cgi_FieldStorage) -> int 

209 file = postdata.file 

210 try: 

211 return os.fstat(file.fileno()).st_size 

212 except Exception: 

213 return 0 

214 

215 

216def _make_event_processor(weak_request, integration): 

217 # type: (Callable[[], Request], PyramidIntegration) -> EventProcessor 

218 def event_processor(event, hint): 

219 # type: (Dict[str, Any], Dict[str, Any]) -> Dict[str, Any] 

220 request = weak_request() 

221 if request is None: 

222 return event 

223 

224 with capture_internal_exceptions(): 

225 PyramidRequestExtractor(request).extract_into_event(event) 

226 

227 if _should_send_default_pii(): 

228 with capture_internal_exceptions(): 

229 user_info = event.setdefault("user", {}) 

230 user_info.setdefault("id", authenticated_userid(request)) 

231 

232 return event 

233 

234 return event_processor