Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/integrations/rq.py: 10%

84 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import absolute_import 

2 

3import weakref 

4 

5from sentry_sdk.hub import Hub 

6from sentry_sdk.integrations import DidNotEnable, Integration 

7from sentry_sdk.integrations.logging import ignore_logger 

8from sentry_sdk.tracing import Transaction 

9from sentry_sdk.utils import capture_internal_exceptions, event_from_exception 

10 

11try: 

12 from rq.queue import Queue 

13 from rq.timeouts import JobTimeoutException 

14 from rq.version import VERSION as RQ_VERSION 

15 from rq.worker import Worker 

16except ImportError: 

17 raise DidNotEnable("RQ not installed") 

18 

19from sentry_sdk._types import MYPY 

20 

21if MYPY: 

22 from typing import Any, Callable, Dict 

23 

24 from sentry_sdk._types import EventProcessor 

25 from sentry_sdk.utils import ExcInfo 

26 

27 from rq.job import Job 

28 

29 

30class RqIntegration(Integration): 

31 identifier = "rq" 

32 

33 @staticmethod 

34 def setup_once(): 

35 # type: () -> None 

36 

37 try: 

38 version = tuple(map(int, RQ_VERSION.split(".")[:3])) 

39 except (ValueError, TypeError): 

40 raise DidNotEnable("Unparsable RQ version: {}".format(RQ_VERSION)) 

41 

42 if version < (0, 6): 

43 raise DidNotEnable("RQ 0.6 or newer is required.") 

44 

45 old_perform_job = Worker.perform_job 

46 

47 def sentry_patched_perform_job(self, job, *args, **kwargs): 

48 # type: (Any, Job, *Queue, **Any) -> bool 

49 hub = Hub.current 

50 integration = hub.get_integration(RqIntegration) 

51 

52 if integration is None: 

53 return old_perform_job(self, job, *args, **kwargs) 

54 

55 client = hub.client 

56 assert client is not None 

57 

58 with hub.push_scope() as scope: 

59 scope.clear_breadcrumbs() 

60 scope.add_event_processor(_make_event_processor(weakref.ref(job))) 

61 

62 transaction = Transaction.continue_from_headers( 

63 job.meta.get("_sentry_trace_headers") or {}, 

64 op="rq.task", 

65 name="unknown RQ task", 

66 ) 

67 

68 with capture_internal_exceptions(): 

69 transaction.name = job.func_name 

70 

71 with hub.start_transaction( 

72 transaction, custom_sampling_context={"rq_job": job} 

73 ): 

74 rv = old_perform_job(self, job, *args, **kwargs) 

75 

76 if self.is_horse: 

77 # We're inside of a forked process and RQ is 

78 # about to call `os._exit`. Make sure that our 

79 # events get sent out. 

80 client.flush() 

81 

82 return rv 

83 

84 Worker.perform_job = sentry_patched_perform_job 

85 

86 old_handle_exception = Worker.handle_exception 

87 

88 def sentry_patched_handle_exception(self, job, *exc_info, **kwargs): 

89 # type: (Worker, Any, *Any, **Any) -> Any 

90 if job.is_failed: 

91 _capture_exception(exc_info) # type: ignore 

92 

93 return old_handle_exception(self, job, *exc_info, **kwargs) 

94 

95 Worker.handle_exception = sentry_patched_handle_exception 

96 

97 old_enqueue_job = Queue.enqueue_job 

98 

99 def sentry_patched_enqueue_job(self, job, **kwargs): 

100 # type: (Queue, Any, **Any) -> Any 

101 hub = Hub.current 

102 if hub.get_integration(RqIntegration) is not None: 

103 job.meta["_sentry_trace_headers"] = dict( 

104 hub.iter_trace_propagation_headers() 

105 ) 

106 

107 return old_enqueue_job(self, job, **kwargs) 

108 

109 Queue.enqueue_job = sentry_patched_enqueue_job 

110 

111 ignore_logger("rq.worker") 

112 

113 

114def _make_event_processor(weak_job): 

115 # type: (Callable[[], Job]) -> EventProcessor 

116 def event_processor(event, hint): 

117 # type: (Dict[str, Any], Dict[str, Any]) -> Dict[str, Any] 

118 job = weak_job() 

119 if job is not None: 

120 with capture_internal_exceptions(): 

121 extra = event.setdefault("extra", {}) 

122 extra["rq-job"] = { 

123 "job_id": job.id, 

124 "func": job.func_name, 

125 "args": job.args, 

126 "kwargs": job.kwargs, 

127 "description": job.description, 

128 } 

129 

130 if "exc_info" in hint: 

131 with capture_internal_exceptions(): 

132 if issubclass(hint["exc_info"][0], JobTimeoutException): 

133 event["fingerprint"] = ["rq", "JobTimeoutException", job.func_name] 

134 

135 return event 

136 

137 return event_processor 

138 

139 

140def _capture_exception(exc_info, **kwargs): 

141 # type: (ExcInfo, **Any) -> None 

142 hub = Hub.current 

143 if hub.get_integration(RqIntegration) is None: 

144 return 

145 

146 # If an integration is there, a client has to be there. 

147 client = hub.client # type: Any 

148 

149 event, hint = event_from_exception( 

150 exc_info, 

151 client_options=client.options, 

152 mechanism={"type": "rq", "handled": False}, 

153 ) 

154 

155 hub.capture_event(event, hint=hint)