Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/worker.py: 23%
93 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import os
2import threading
4from time import sleep, time
5from sentry_sdk._compat import check_thread_support
6from sentry_sdk._queue import Queue, FullError
7from sentry_sdk.utils import logger
8from sentry_sdk.consts import DEFAULT_QUEUE_SIZE
10from sentry_sdk._types import MYPY
12if MYPY: 12 ↛ 13line 12 didn't jump to line 13, because the condition on line 12 was never true
13 from typing import Any
14 from typing import Optional
15 from typing import Callable
18_TERMINATOR = object()
21class BackgroundWorker(object):
22 def __init__(self, queue_size=DEFAULT_QUEUE_SIZE):
23 # type: (int) -> None
24 check_thread_support()
25 self._queue = Queue(queue_size) # type: Queue
26 self._lock = threading.Lock()
27 self._thread = None # type: Optional[threading.Thread]
28 self._thread_for_pid = None # type: Optional[int]
30 @property
31 def is_alive(self):
32 # type: () -> bool
33 if self._thread_for_pid != os.getpid():
34 return False
35 if not self._thread:
36 return False
37 return self._thread.is_alive()
39 def _ensure_thread(self):
40 # type: () -> None
41 if not self.is_alive:
42 self.start()
44 def _timed_queue_join(self, timeout):
45 # type: (float) -> bool
46 deadline = time() + timeout
47 queue = self._queue
49 queue.all_tasks_done.acquire()
51 try:
52 while queue.unfinished_tasks:
53 delay = deadline - time()
54 if delay <= 0:
55 return False
56 queue.all_tasks_done.wait(timeout=delay)
58 return True
59 finally:
60 queue.all_tasks_done.release()
62 def start(self):
63 # type: () -> None
64 with self._lock:
65 if not self.is_alive:
66 self._thread = threading.Thread(
67 target=self._target, name="raven-sentry.BackgroundWorker"
68 )
69 self._thread.daemon = True
70 self._thread.start()
71 self._thread_for_pid = os.getpid()
73 def kill(self):
74 # type: () -> None
75 """
76 Kill worker thread. Returns immediately. Not useful for
77 waiting on shutdown for events, use `flush` for that.
78 """
79 logger.debug("background worker got kill request")
80 with self._lock:
81 if self._thread:
82 try:
83 self._queue.put_nowait(_TERMINATOR)
84 except FullError:
85 logger.debug("background worker queue full, kill failed")
87 self._thread = None
88 self._thread_for_pid = None
90 def flush(self, timeout, callback=None):
91 # type: (float, Optional[Any]) -> None
92 logger.debug("background worker got flush request")
93 with self._lock:
94 if self.is_alive and timeout > 0.0:
95 self._wait_flush(timeout, callback)
96 logger.debug("background worker flushed")
98 def _wait_flush(self, timeout, callback):
99 # type: (float, Optional[Any]) -> None
100 initial_timeout = min(0.1, timeout)
101 if not self._timed_queue_join(initial_timeout):
102 pending = self._queue.qsize() + 1
103 logger.debug("%d event(s) pending on flush", pending)
104 if callback is not None:
105 callback(pending, timeout)
107 if not self._timed_queue_join(timeout - initial_timeout):
108 pending = self._queue.qsize() + 1
109 logger.error("flush timed out, dropped %s events", pending)
111 def submit(self, callback):
112 # type: (Callable[[], None]) -> bool
113 self._ensure_thread()
114 try:
115 self._queue.put_nowait(callback)
116 return True
117 except FullError:
118 return False
120 def _target(self):
121 # type: () -> None
122 while True:
123 callback = self._queue.get()
124 try:
125 if callback is _TERMINATOR:
126 break
127 try:
128 callback()
129 except Exception:
130 logger.error("Failed processing job", exc_info=True)
131 finally:
132 self._queue.task_done()
133 sleep(0)