Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/worker.py: 23%

93 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import os 

2import threading 

3 

4from time import sleep, time 

5from sentry_sdk._compat import check_thread_support 

6from sentry_sdk._queue import Queue, FullError 

7from sentry_sdk.utils import logger 

8from sentry_sdk.consts import DEFAULT_QUEUE_SIZE 

9 

10from sentry_sdk._types import MYPY 

11 

12if MYPY: 12 ↛ 13line 12 didn't jump to line 13, because the condition on line 12 was never true

13 from typing import Any 

14 from typing import Optional 

15 from typing import Callable 

16 

17 

18_TERMINATOR = object() 

19 

20 

21class BackgroundWorker(object): 

22 def __init__(self, queue_size=DEFAULT_QUEUE_SIZE): 

23 # type: (int) -> None 

24 check_thread_support() 

25 self._queue = Queue(queue_size) # type: Queue 

26 self._lock = threading.Lock() 

27 self._thread = None # type: Optional[threading.Thread] 

28 self._thread_for_pid = None # type: Optional[int] 

29 

30 @property 

31 def is_alive(self): 

32 # type: () -> bool 

33 if self._thread_for_pid != os.getpid(): 

34 return False 

35 if not self._thread: 

36 return False 

37 return self._thread.is_alive() 

38 

39 def _ensure_thread(self): 

40 # type: () -> None 

41 if not self.is_alive: 

42 self.start() 

43 

44 def _timed_queue_join(self, timeout): 

45 # type: (float) -> bool 

46 deadline = time() + timeout 

47 queue = self._queue 

48 

49 queue.all_tasks_done.acquire() 

50 

51 try: 

52 while queue.unfinished_tasks: 

53 delay = deadline - time() 

54 if delay <= 0: 

55 return False 

56 queue.all_tasks_done.wait(timeout=delay) 

57 

58 return True 

59 finally: 

60 queue.all_tasks_done.release() 

61 

62 def start(self): 

63 # type: () -> None 

64 with self._lock: 

65 if not self.is_alive: 

66 self._thread = threading.Thread( 

67 target=self._target, name="raven-sentry.BackgroundWorker" 

68 ) 

69 self._thread.daemon = True 

70 self._thread.start() 

71 self._thread_for_pid = os.getpid() 

72 

73 def kill(self): 

74 # type: () -> None 

75 """ 

76 Kill worker thread. Returns immediately. Not useful for 

77 waiting on shutdown for events, use `flush` for that. 

78 """ 

79 logger.debug("background worker got kill request") 

80 with self._lock: 

81 if self._thread: 

82 try: 

83 self._queue.put_nowait(_TERMINATOR) 

84 except FullError: 

85 logger.debug("background worker queue full, kill failed") 

86 

87 self._thread = None 

88 self._thread_for_pid = None 

89 

90 def flush(self, timeout, callback=None): 

91 # type: (float, Optional[Any]) -> None 

92 logger.debug("background worker got flush request") 

93 with self._lock: 

94 if self.is_alive and timeout > 0.0: 

95 self._wait_flush(timeout, callback) 

96 logger.debug("background worker flushed") 

97 

98 def _wait_flush(self, timeout, callback): 

99 # type: (float, Optional[Any]) -> None 

100 initial_timeout = min(0.1, timeout) 

101 if not self._timed_queue_join(initial_timeout): 

102 pending = self._queue.qsize() + 1 

103 logger.debug("%d event(s) pending on flush", pending) 

104 if callback is not None: 

105 callback(pending, timeout) 

106 

107 if not self._timed_queue_join(timeout - initial_timeout): 

108 pending = self._queue.qsize() + 1 

109 logger.error("flush timed out, dropped %s events", pending) 

110 

111 def submit(self, callback): 

112 # type: (Callable[[], None]) -> bool 

113 self._ensure_thread() 

114 try: 

115 self._queue.put_nowait(callback) 

116 return True 

117 except FullError: 

118 return False 

119 

120 def _target(self): 

121 # type: () -> None 

122 while True: 

123 callback = self._queue.get() 

124 try: 

125 if callback is _TERMINATOR: 

126 break 

127 try: 

128 callback() 

129 except Exception: 

130 logger.error("Failed processing job", exc_info=True) 

131 finally: 

132 self._queue.task_done() 

133 sleep(0)