Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/integrations/celery.py: 10%

139 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import absolute_import 

2 

3import sys 

4 

5from sentry_sdk.hub import Hub 

6from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK 

7from sentry_sdk.utils import ( 

8 capture_internal_exceptions, 

9 event_from_exception, 

10) 

11from sentry_sdk.tracing import Transaction 

12from sentry_sdk._compat import reraise 

13from sentry_sdk.integrations import Integration, DidNotEnable 

14from sentry_sdk.integrations.logging import ignore_logger 

15from sentry_sdk._types import MYPY 

16from sentry_sdk._functools import wraps 

17 

18if MYPY: 18 ↛ 19line 18 didn't jump to line 19, because the condition on line 18 was never true

19 from typing import Any 

20 from typing import TypeVar 

21 from typing import Callable 

22 from typing import Optional 

23 

24 from sentry_sdk._types import EventProcessor, Event, Hint, ExcInfo 

25 

26 F = TypeVar("F", bound=Callable[..., Any]) 

27 

28 

29try: 

30 from celery import VERSION as CELERY_VERSION 

31 from celery.exceptions import ( # type: ignore 

32 SoftTimeLimitExceeded, 

33 Retry, 

34 Ignore, 

35 Reject, 

36 ) 

37 from celery.app.trace import task_has_custom 

38except ImportError: 

39 raise DidNotEnable("Celery not installed") 

40 

41 

42CELERY_CONTROL_FLOW_EXCEPTIONS = (Retry, Ignore, Reject) 

43 

44 

45class CeleryIntegration(Integration): 

46 identifier = "celery" 

47 

48 def __init__(self, propagate_traces=True): 

49 # type: (bool) -> None 

50 self.propagate_traces = propagate_traces 

51 

52 @staticmethod 

53 def setup_once(): 

54 # type: () -> None 

55 if CELERY_VERSION < (3,): 

56 raise DidNotEnable("Celery 3 or newer required.") 

57 

58 import celery.app.trace as trace # type: ignore 

59 

60 old_build_tracer = trace.build_tracer 

61 

62 def sentry_build_tracer(name, task, *args, **kwargs): 

63 # type: (Any, Any, *Any, **Any) -> Any 

64 if not getattr(task, "_sentry_is_patched", False): 

65 # determine whether Celery will use __call__ or run and patch 

66 # accordingly 

67 if task_has_custom(task, "__call__"): 

68 type(task).__call__ = _wrap_task_call(task, type(task).__call__) 

69 else: 

70 task.run = _wrap_task_call(task, task.run) 

71 

72 # `build_tracer` is apparently called for every task 

73 # invocation. Can't wrap every celery task for every invocation 

74 # or we will get infinitely nested wrapper functions. 

75 task._sentry_is_patched = True 

76 

77 return _wrap_tracer(task, old_build_tracer(name, task, *args, **kwargs)) 

78 

79 trace.build_tracer = sentry_build_tracer 

80 

81 from celery.app.task import Task # type: ignore 

82 

83 Task.apply_async = _wrap_apply_async(Task.apply_async) 

84 

85 _patch_worker_exit() 

86 

87 # This logger logs every status of every task that ran on the worker. 

88 # Meaning that every task's breadcrumbs are full of stuff like "Task 

89 # <foo> raised unexpected <bar>". 

90 ignore_logger("celery.worker.job") 

91 ignore_logger("celery.app.trace") 

92 

93 # This is stdout/err redirected to a logger, can't deal with this 

94 # (need event_level=logging.WARN to reproduce) 

95 ignore_logger("celery.redirected") 

96 

97 

98def _wrap_apply_async(f): 

99 # type: (F) -> F 

100 @wraps(f) 

101 def apply_async(*args, **kwargs): 

102 # type: (*Any, **Any) -> Any 

103 hub = Hub.current 

104 integration = hub.get_integration(CeleryIntegration) 

105 if integration is not None and integration.propagate_traces: 

106 with hub.start_span(op="celery.submit", description=args[0].name) as span: 

107 with capture_internal_exceptions(): 

108 headers = dict(hub.iter_trace_propagation_headers(span)) 

109 

110 if headers: 

111 # Note: kwargs can contain headers=None, so no setdefault! 

112 # Unsure which backend though. 

113 kwarg_headers = kwargs.get("headers") or {} 

114 kwarg_headers.update(headers) 

115 

116 # https://github.com/celery/celery/issues/4875 

117 # 

118 # Need to setdefault the inner headers too since other 

119 # tracing tools (dd-trace-py) also employ this exact 

120 # workaround and we don't want to break them. 

121 kwarg_headers.setdefault("headers", {}).update(headers) 

122 kwargs["headers"] = kwarg_headers 

123 

124 return f(*args, **kwargs) 

125 else: 

126 return f(*args, **kwargs) 

127 

128 return apply_async # type: ignore 

129 

130 

131def _wrap_tracer(task, f): 

132 # type: (Any, F) -> F 

133 

134 # Need to wrap tracer for pushing the scope before prerun is sent, and 

135 # popping it after postrun is sent. 

136 # 

137 # This is the reason we don't use signals for hooking in the first place. 

138 # Also because in Celery 3, signal dispatch returns early if one handler 

139 # crashes. 

140 @wraps(f) 

141 def _inner(*args, **kwargs): 

142 # type: (*Any, **Any) -> Any 

143 hub = Hub.current 

144 if hub.get_integration(CeleryIntegration) is None: 

145 return f(*args, **kwargs) 

146 

147 with hub.push_scope() as scope: 

148 scope._name = "celery" 

149 scope.clear_breadcrumbs() 

150 scope.add_event_processor(_make_event_processor(task, *args, **kwargs)) 

151 

152 transaction = None 

153 

154 # Celery task objects are not a thing to be trusted. Even 

155 # something such as attribute access can fail. 

156 with capture_internal_exceptions(): 

157 transaction = Transaction.continue_from_headers( 

158 args[3].get("headers") or {}, 

159 op="celery.task", 

160 name="unknown celery task", 

161 source=TRANSACTION_SOURCE_TASK, 

162 ) 

163 transaction.name = task.name 

164 transaction.set_status("ok") 

165 

166 if transaction is None: 

167 return f(*args, **kwargs) 

168 

169 with hub.start_transaction( 

170 transaction, 

171 custom_sampling_context={ 

172 "celery_job": { 

173 "task": task.name, 

174 # for some reason, args[1] is a list if non-empty but a 

175 # tuple if empty 

176 "args": list(args[1]), 

177 "kwargs": args[2], 

178 } 

179 }, 

180 ): 

181 return f(*args, **kwargs) 

182 

183 return _inner # type: ignore 

184 

185 

186def _wrap_task_call(task, f): 

187 # type: (Any, F) -> F 

188 

189 # Need to wrap task call because the exception is caught before we get to 

190 # see it. Also celery's reported stacktrace is untrustworthy. 

191 

192 # functools.wraps is important here because celery-once looks at this 

193 # method's name. 

194 # https://github.com/getsentry/sentry-python/issues/421 

195 @wraps(f) 

196 def _inner(*args, **kwargs): 

197 # type: (*Any, **Any) -> Any 

198 try: 

199 return f(*args, **kwargs) 

200 except Exception: 

201 exc_info = sys.exc_info() 

202 with capture_internal_exceptions(): 

203 _capture_exception(task, exc_info) 

204 reraise(*exc_info) 

205 

206 return _inner # type: ignore 

207 

208 

209def _make_event_processor(task, uuid, args, kwargs, request=None): 

210 # type: (Any, Any, Any, Any, Optional[Any]) -> EventProcessor 

211 def event_processor(event, hint): 

212 # type: (Event, Hint) -> Optional[Event] 

213 

214 with capture_internal_exceptions(): 

215 tags = event.setdefault("tags", {}) 

216 tags["celery_task_id"] = uuid 

217 extra = event.setdefault("extra", {}) 

218 extra["celery-job"] = { 

219 "task_name": task.name, 

220 "args": args, 

221 "kwargs": kwargs, 

222 } 

223 

224 if "exc_info" in hint: 

225 with capture_internal_exceptions(): 

226 if issubclass(hint["exc_info"][0], SoftTimeLimitExceeded): 

227 event["fingerprint"] = [ 

228 "celery", 

229 "SoftTimeLimitExceeded", 

230 getattr(task, "name", task), 

231 ] 

232 

233 return event 

234 

235 return event_processor 

236 

237 

238def _capture_exception(task, exc_info): 

239 # type: (Any, ExcInfo) -> None 

240 hub = Hub.current 

241 

242 if hub.get_integration(CeleryIntegration) is None: 

243 return 

244 if isinstance(exc_info[1], CELERY_CONTROL_FLOW_EXCEPTIONS): 

245 # ??? Doesn't map to anything 

246 _set_status(hub, "aborted") 

247 return 

248 

249 _set_status(hub, "internal_error") 

250 

251 if hasattr(task, "throws") and isinstance(exc_info[1], task.throws): 

252 return 

253 

254 # If an integration is there, a client has to be there. 

255 client = hub.client # type: Any 

256 

257 event, hint = event_from_exception( 

258 exc_info, 

259 client_options=client.options, 

260 mechanism={"type": "celery", "handled": False}, 

261 ) 

262 

263 hub.capture_event(event, hint=hint) 

264 

265 

266def _set_status(hub, status): 

267 # type: (Hub, str) -> None 

268 with capture_internal_exceptions(): 

269 with hub.configure_scope() as scope: 

270 if scope.span is not None: 

271 scope.span.set_status(status) 

272 

273 

274def _patch_worker_exit(): 

275 # type: () -> None 

276 

277 # Need to flush queue before worker shutdown because a crashing worker will 

278 # call os._exit 

279 from billiard.pool import Worker # type: ignore 

280 

281 old_workloop = Worker.workloop 

282 

283 def sentry_workloop(*args, **kwargs): 

284 # type: (*Any, **Any) -> Any 

285 try: 

286 return old_workloop(*args, **kwargs) 

287 finally: 

288 with capture_internal_exceptions(): 

289 hub = Hub.current 

290 if hub.get_integration(CeleryIntegration) is not None: 

291 hub.flush() 

292 

293 Worker.workloop = sentry_workloop