Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/integrations/celery.py: 10%
139 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import absolute_import
3import sys
5from sentry_sdk.hub import Hub
6from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK
7from sentry_sdk.utils import (
8 capture_internal_exceptions,
9 event_from_exception,
10)
11from sentry_sdk.tracing import Transaction
12from sentry_sdk._compat import reraise
13from sentry_sdk.integrations import Integration, DidNotEnable
14from sentry_sdk.integrations.logging import ignore_logger
15from sentry_sdk._types import MYPY
16from sentry_sdk._functools import wraps
18if MYPY: 18 ↛ 19line 18 didn't jump to line 19, because the condition on line 18 was never true
19 from typing import Any
20 from typing import TypeVar
21 from typing import Callable
22 from typing import Optional
24 from sentry_sdk._types import EventProcessor, Event, Hint, ExcInfo
26 F = TypeVar("F", bound=Callable[..., Any])
29try:
30 from celery import VERSION as CELERY_VERSION
31 from celery.exceptions import ( # type: ignore
32 SoftTimeLimitExceeded,
33 Retry,
34 Ignore,
35 Reject,
36 )
37 from celery.app.trace import task_has_custom
38except ImportError:
39 raise DidNotEnable("Celery not installed")
42CELERY_CONTROL_FLOW_EXCEPTIONS = (Retry, Ignore, Reject)
45class CeleryIntegration(Integration):
46 identifier = "celery"
48 def __init__(self, propagate_traces=True):
49 # type: (bool) -> None
50 self.propagate_traces = propagate_traces
52 @staticmethod
53 def setup_once():
54 # type: () -> None
55 if CELERY_VERSION < (3,):
56 raise DidNotEnable("Celery 3 or newer required.")
58 import celery.app.trace as trace # type: ignore
60 old_build_tracer = trace.build_tracer
62 def sentry_build_tracer(name, task, *args, **kwargs):
63 # type: (Any, Any, *Any, **Any) -> Any
64 if not getattr(task, "_sentry_is_patched", False):
65 # determine whether Celery will use __call__ or run and patch
66 # accordingly
67 if task_has_custom(task, "__call__"):
68 type(task).__call__ = _wrap_task_call(task, type(task).__call__)
69 else:
70 task.run = _wrap_task_call(task, task.run)
72 # `build_tracer` is apparently called for every task
73 # invocation. Can't wrap every celery task for every invocation
74 # or we will get infinitely nested wrapper functions.
75 task._sentry_is_patched = True
77 return _wrap_tracer(task, old_build_tracer(name, task, *args, **kwargs))
79 trace.build_tracer = sentry_build_tracer
81 from celery.app.task import Task # type: ignore
83 Task.apply_async = _wrap_apply_async(Task.apply_async)
85 _patch_worker_exit()
87 # This logger logs every status of every task that ran on the worker.
88 # Meaning that every task's breadcrumbs are full of stuff like "Task
89 # <foo> raised unexpected <bar>".
90 ignore_logger("celery.worker.job")
91 ignore_logger("celery.app.trace")
93 # This is stdout/err redirected to a logger, can't deal with this
94 # (need event_level=logging.WARN to reproduce)
95 ignore_logger("celery.redirected")
98def _wrap_apply_async(f):
99 # type: (F) -> F
100 @wraps(f)
101 def apply_async(*args, **kwargs):
102 # type: (*Any, **Any) -> Any
103 hub = Hub.current
104 integration = hub.get_integration(CeleryIntegration)
105 if integration is not None and integration.propagate_traces:
106 with hub.start_span(op="celery.submit", description=args[0].name) as span:
107 with capture_internal_exceptions():
108 headers = dict(hub.iter_trace_propagation_headers(span))
110 if headers:
111 # Note: kwargs can contain headers=None, so no setdefault!
112 # Unsure which backend though.
113 kwarg_headers = kwargs.get("headers") or {}
114 kwarg_headers.update(headers)
116 # https://github.com/celery/celery/issues/4875
117 #
118 # Need to setdefault the inner headers too since other
119 # tracing tools (dd-trace-py) also employ this exact
120 # workaround and we don't want to break them.
121 kwarg_headers.setdefault("headers", {}).update(headers)
122 kwargs["headers"] = kwarg_headers
124 return f(*args, **kwargs)
125 else:
126 return f(*args, **kwargs)
128 return apply_async # type: ignore
131def _wrap_tracer(task, f):
132 # type: (Any, F) -> F
134 # Need to wrap tracer for pushing the scope before prerun is sent, and
135 # popping it after postrun is sent.
136 #
137 # This is the reason we don't use signals for hooking in the first place.
138 # Also because in Celery 3, signal dispatch returns early if one handler
139 # crashes.
140 @wraps(f)
141 def _inner(*args, **kwargs):
142 # type: (*Any, **Any) -> Any
143 hub = Hub.current
144 if hub.get_integration(CeleryIntegration) is None:
145 return f(*args, **kwargs)
147 with hub.push_scope() as scope:
148 scope._name = "celery"
149 scope.clear_breadcrumbs()
150 scope.add_event_processor(_make_event_processor(task, *args, **kwargs))
152 transaction = None
154 # Celery task objects are not a thing to be trusted. Even
155 # something such as attribute access can fail.
156 with capture_internal_exceptions():
157 transaction = Transaction.continue_from_headers(
158 args[3].get("headers") or {},
159 op="celery.task",
160 name="unknown celery task",
161 source=TRANSACTION_SOURCE_TASK,
162 )
163 transaction.name = task.name
164 transaction.set_status("ok")
166 if transaction is None:
167 return f(*args, **kwargs)
169 with hub.start_transaction(
170 transaction,
171 custom_sampling_context={
172 "celery_job": {
173 "task": task.name,
174 # for some reason, args[1] is a list if non-empty but a
175 # tuple if empty
176 "args": list(args[1]),
177 "kwargs": args[2],
178 }
179 },
180 ):
181 return f(*args, **kwargs)
183 return _inner # type: ignore
186def _wrap_task_call(task, f):
187 # type: (Any, F) -> F
189 # Need to wrap task call because the exception is caught before we get to
190 # see it. Also celery's reported stacktrace is untrustworthy.
192 # functools.wraps is important here because celery-once looks at this
193 # method's name.
194 # https://github.com/getsentry/sentry-python/issues/421
195 @wraps(f)
196 def _inner(*args, **kwargs):
197 # type: (*Any, **Any) -> Any
198 try:
199 return f(*args, **kwargs)
200 except Exception:
201 exc_info = sys.exc_info()
202 with capture_internal_exceptions():
203 _capture_exception(task, exc_info)
204 reraise(*exc_info)
206 return _inner # type: ignore
209def _make_event_processor(task, uuid, args, kwargs, request=None):
210 # type: (Any, Any, Any, Any, Optional[Any]) -> EventProcessor
211 def event_processor(event, hint):
212 # type: (Event, Hint) -> Optional[Event]
214 with capture_internal_exceptions():
215 tags = event.setdefault("tags", {})
216 tags["celery_task_id"] = uuid
217 extra = event.setdefault("extra", {})
218 extra["celery-job"] = {
219 "task_name": task.name,
220 "args": args,
221 "kwargs": kwargs,
222 }
224 if "exc_info" in hint:
225 with capture_internal_exceptions():
226 if issubclass(hint["exc_info"][0], SoftTimeLimitExceeded):
227 event["fingerprint"] = [
228 "celery",
229 "SoftTimeLimitExceeded",
230 getattr(task, "name", task),
231 ]
233 return event
235 return event_processor
238def _capture_exception(task, exc_info):
239 # type: (Any, ExcInfo) -> None
240 hub = Hub.current
242 if hub.get_integration(CeleryIntegration) is None:
243 return
244 if isinstance(exc_info[1], CELERY_CONTROL_FLOW_EXCEPTIONS):
245 # ??? Doesn't map to anything
246 _set_status(hub, "aborted")
247 return
249 _set_status(hub, "internal_error")
251 if hasattr(task, "throws") and isinstance(exc_info[1], task.throws):
252 return
254 # If an integration is there, a client has to be there.
255 client = hub.client # type: Any
257 event, hint = event_from_exception(
258 exc_info,
259 client_options=client.options,
260 mechanism={"type": "celery", "handled": False},
261 )
263 hub.capture_event(event, hint=hint)
266def _set_status(hub, status):
267 # type: (Hub, str) -> None
268 with capture_internal_exceptions():
269 with hub.configure_scope() as scope:
270 if scope.span is not None:
271 scope.span.set_status(status)
274def _patch_worker_exit():
275 # type: () -> None
277 # Need to flush queue before worker shutdown because a crashing worker will
278 # call os._exit
279 from billiard.pool import Worker # type: ignore
281 old_workloop = Worker.workloop
283 def sentry_workloop(*args, **kwargs):
284 # type: (*Any, **Any) -> Any
285 try:
286 return old_workloop(*args, **kwargs)
287 finally:
288 with capture_internal_exceptions():
289 hub = Hub.current
290 if hub.get_integration(CeleryIntegration) is not None:
291 hub.flush()
293 Worker.workloop = sentry_workloop