Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/sessions.py: 23%
112 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import os
2import time
3from threading import Thread, Lock
4from contextlib import contextmanager
6import sentry_sdk
7from sentry_sdk.envelope import Envelope
8from sentry_sdk.session import Session
9from sentry_sdk._types import MYPY
10from sentry_sdk.utils import format_timestamp
12if MYPY: 12 ↛ 13line 12 didn't jump to line 13, because the condition on line 12 was never true
13 from typing import Any
14 from typing import Callable
15 from typing import Dict
16 from typing import Generator
17 from typing import List
18 from typing import Optional
19 from typing import Union
22def is_auto_session_tracking_enabled(hub=None):
23 # type: (Optional[sentry_sdk.Hub]) -> Union[Any, bool, None]
24 """Utility function to find out if session tracking is enabled."""
25 if hub is None:
26 hub = sentry_sdk.Hub.current
28 should_track = hub.scope._force_auto_session_tracking
30 if should_track is None:
31 client_options = hub.client.options if hub.client else {}
32 should_track = client_options.get("auto_session_tracking", False)
34 return should_track
37@contextmanager
38def auto_session_tracking(hub=None, session_mode="application"):
39 # type: (Optional[sentry_sdk.Hub], str) -> Generator[None, None, None]
40 """Starts and stops a session automatically around a block."""
41 if hub is None:
42 hub = sentry_sdk.Hub.current
43 should_track = is_auto_session_tracking_enabled(hub)
44 if should_track:
45 hub.start_session(session_mode=session_mode)
46 try:
47 yield
48 finally:
49 if should_track:
50 hub.end_session()
53TERMINAL_SESSION_STATES = ("exited", "abnormal", "crashed")
54MAX_ENVELOPE_ITEMS = 100
57def make_aggregate_envelope(aggregate_states, attrs):
58 # type: (Any, Any) -> Any
59 return {"attrs": dict(attrs), "aggregates": list(aggregate_states.values())}
62class SessionFlusher(object):
63 def __init__(
64 self,
65 capture_func, # type: Callable[[Envelope], None]
66 flush_interval=60, # type: int
67 ):
68 # type: (...) -> None
69 self.capture_func = capture_func
70 self.flush_interval = flush_interval
71 self.pending_sessions = [] # type: List[Any]
72 self.pending_aggregates = {} # type: Dict[Any, Any]
73 self._thread = None # type: Optional[Thread]
74 self._thread_lock = Lock()
75 self._aggregate_lock = Lock()
76 self._thread_for_pid = None # type: Optional[int]
77 self._running = True
79 def flush(self):
80 # type: (...) -> None
81 pending_sessions = self.pending_sessions
82 self.pending_sessions = []
84 with self._aggregate_lock:
85 pending_aggregates = self.pending_aggregates
86 self.pending_aggregates = {}
88 envelope = Envelope()
89 for session in pending_sessions:
90 if len(envelope.items) == MAX_ENVELOPE_ITEMS:
91 self.capture_func(envelope)
92 envelope = Envelope()
94 envelope.add_session(session)
96 for (attrs, states) in pending_aggregates.items():
97 if len(envelope.items) == MAX_ENVELOPE_ITEMS:
98 self.capture_func(envelope)
99 envelope = Envelope()
101 envelope.add_sessions(make_aggregate_envelope(states, attrs))
103 if len(envelope.items) > 0:
104 self.capture_func(envelope)
106 def _ensure_running(self):
107 # type: (...) -> None
108 if self._thread_for_pid == os.getpid() and self._thread is not None:
109 return None
110 with self._thread_lock:
111 if self._thread_for_pid == os.getpid() and self._thread is not None:
112 return None
114 def _thread():
115 # type: (...) -> None
116 while self._running:
117 time.sleep(self.flush_interval)
118 if self._running:
119 self.flush()
121 thread = Thread(target=_thread)
122 thread.daemon = True
123 thread.start()
124 self._thread = thread
125 self._thread_for_pid = os.getpid()
126 return None
128 def add_aggregate_session(
129 self, session # type: Session
130 ):
131 # type: (...) -> None
132 # NOTE on `session.did`:
133 # the protocol can deal with buckets that have a distinct-id, however
134 # in practice we expect the python SDK to have an extremely high cardinality
135 # here, effectively making aggregation useless, therefore we do not
136 # aggregate per-did.
138 # For this part we can get away with using the global interpreter lock
139 with self._aggregate_lock:
140 attrs = session.get_json_attrs(with_user_info=False)
141 primary_key = tuple(sorted(attrs.items()))
142 secondary_key = session.truncated_started # (, session.did)
143 states = self.pending_aggregates.setdefault(primary_key, {})
144 state = states.setdefault(secondary_key, {})
146 if "started" not in state:
147 state["started"] = format_timestamp(session.truncated_started)
148 # if session.did is not None:
149 # state["did"] = session.did
150 if session.status == "crashed":
151 state["crashed"] = state.get("crashed", 0) + 1
152 elif session.status == "abnormal":
153 state["abnormal"] = state.get("abnormal", 0) + 1
154 elif session.errors > 0:
155 state["errored"] = state.get("errored", 0) + 1
156 else:
157 state["exited"] = state.get("exited", 0) + 1
159 def add_session(
160 self, session # type: Session
161 ):
162 # type: (...) -> None
163 if session.session_mode == "request":
164 self.add_aggregate_session(session)
165 else:
166 self.pending_sessions.append(session.to_json())
167 self._ensure_running()
169 def kill(self):
170 # type: (...) -> None
171 self._running = False
173 def __del__(self):
174 # type: (...) -> None
175 self.kill()