Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/sessions.py: 23%

112 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import os 

2import time 

3from threading import Thread, Lock 

4from contextlib import contextmanager 

5 

6import sentry_sdk 

7from sentry_sdk.envelope import Envelope 

8from sentry_sdk.session import Session 

9from sentry_sdk._types import MYPY 

10from sentry_sdk.utils import format_timestamp 

11 

12if MYPY: 12 ↛ 13line 12 didn't jump to line 13, because the condition on line 12 was never true

13 from typing import Any 

14 from typing import Callable 

15 from typing import Dict 

16 from typing import Generator 

17 from typing import List 

18 from typing import Optional 

19 from typing import Union 

20 

21 

22def is_auto_session_tracking_enabled(hub=None): 

23 # type: (Optional[sentry_sdk.Hub]) -> Union[Any, bool, None] 

24 """Utility function to find out if session tracking is enabled.""" 

25 if hub is None: 

26 hub = sentry_sdk.Hub.current 

27 

28 should_track = hub.scope._force_auto_session_tracking 

29 

30 if should_track is None: 

31 client_options = hub.client.options if hub.client else {} 

32 should_track = client_options.get("auto_session_tracking", False) 

33 

34 return should_track 

35 

36 

37@contextmanager 

38def auto_session_tracking(hub=None, session_mode="application"): 

39 # type: (Optional[sentry_sdk.Hub], str) -> Generator[None, None, None] 

40 """Starts and stops a session automatically around a block.""" 

41 if hub is None: 

42 hub = sentry_sdk.Hub.current 

43 should_track = is_auto_session_tracking_enabled(hub) 

44 if should_track: 

45 hub.start_session(session_mode=session_mode) 

46 try: 

47 yield 

48 finally: 

49 if should_track: 

50 hub.end_session() 

51 

52 

53TERMINAL_SESSION_STATES = ("exited", "abnormal", "crashed") 

54MAX_ENVELOPE_ITEMS = 100 

55 

56 

57def make_aggregate_envelope(aggregate_states, attrs): 

58 # type: (Any, Any) -> Any 

59 return {"attrs": dict(attrs), "aggregates": list(aggregate_states.values())} 

60 

61 

62class SessionFlusher(object): 

63 def __init__( 

64 self, 

65 capture_func, # type: Callable[[Envelope], None] 

66 flush_interval=60, # type: int 

67 ): 

68 # type: (...) -> None 

69 self.capture_func = capture_func 

70 self.flush_interval = flush_interval 

71 self.pending_sessions = [] # type: List[Any] 

72 self.pending_aggregates = {} # type: Dict[Any, Any] 

73 self._thread = None # type: Optional[Thread] 

74 self._thread_lock = Lock() 

75 self._aggregate_lock = Lock() 

76 self._thread_for_pid = None # type: Optional[int] 

77 self._running = True 

78 

79 def flush(self): 

80 # type: (...) -> None 

81 pending_sessions = self.pending_sessions 

82 self.pending_sessions = [] 

83 

84 with self._aggregate_lock: 

85 pending_aggregates = self.pending_aggregates 

86 self.pending_aggregates = {} 

87 

88 envelope = Envelope() 

89 for session in pending_sessions: 

90 if len(envelope.items) == MAX_ENVELOPE_ITEMS: 

91 self.capture_func(envelope) 

92 envelope = Envelope() 

93 

94 envelope.add_session(session) 

95 

96 for (attrs, states) in pending_aggregates.items(): 

97 if len(envelope.items) == MAX_ENVELOPE_ITEMS: 

98 self.capture_func(envelope) 

99 envelope = Envelope() 

100 

101 envelope.add_sessions(make_aggregate_envelope(states, attrs)) 

102 

103 if len(envelope.items) > 0: 

104 self.capture_func(envelope) 

105 

106 def _ensure_running(self): 

107 # type: (...) -> None 

108 if self._thread_for_pid == os.getpid() and self._thread is not None: 

109 return None 

110 with self._thread_lock: 

111 if self._thread_for_pid == os.getpid() and self._thread is not None: 

112 return None 

113 

114 def _thread(): 

115 # type: (...) -> None 

116 while self._running: 

117 time.sleep(self.flush_interval) 

118 if self._running: 

119 self.flush() 

120 

121 thread = Thread(target=_thread) 

122 thread.daemon = True 

123 thread.start() 

124 self._thread = thread 

125 self._thread_for_pid = os.getpid() 

126 return None 

127 

128 def add_aggregate_session( 

129 self, session # type: Session 

130 ): 

131 # type: (...) -> None 

132 # NOTE on `session.did`: 

133 # the protocol can deal with buckets that have a distinct-id, however 

134 # in practice we expect the python SDK to have an extremely high cardinality 

135 # here, effectively making aggregation useless, therefore we do not 

136 # aggregate per-did. 

137 

138 # For this part we can get away with using the global interpreter lock 

139 with self._aggregate_lock: 

140 attrs = session.get_json_attrs(with_user_info=False) 

141 primary_key = tuple(sorted(attrs.items())) 

142 secondary_key = session.truncated_started # (, session.did) 

143 states = self.pending_aggregates.setdefault(primary_key, {}) 

144 state = states.setdefault(secondary_key, {}) 

145 

146 if "started" not in state: 

147 state["started"] = format_timestamp(session.truncated_started) 

148 # if session.did is not None: 

149 # state["did"] = session.did 

150 if session.status == "crashed": 

151 state["crashed"] = state.get("crashed", 0) + 1 

152 elif session.status == "abnormal": 

153 state["abnormal"] = state.get("abnormal", 0) + 1 

154 elif session.errors > 0: 

155 state["errored"] = state.get("errored", 0) + 1 

156 else: 

157 state["exited"] = state.get("exited", 0) + 1 

158 

159 def add_session( 

160 self, session # type: Session 

161 ): 

162 # type: (...) -> None 

163 if session.session_mode == "request": 

164 self.add_aggregate_session(session) 

165 else: 

166 self.pending_sessions.append(session.to_json()) 

167 self._ensure_running() 

168 

169 def kill(self): 

170 # type: (...) -> None 

171 self._running = False 

172 

173 def __del__(self): 

174 # type: (...) -> None 

175 self.kill()