Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/transport.py: 28%
246 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import print_function
3import io
4import urllib3 # type: ignore
5import certifi
6import gzip
7import time
9from datetime import datetime, timedelta
10from collections import defaultdict
12from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions, json_dumps
13from sentry_sdk.worker import BackgroundWorker
14from sentry_sdk.envelope import Envelope, Item, PayloadRef
16from sentry_sdk._types import MYPY
18if MYPY: 18 ↛ 19line 18 didn't jump to line 19, because the condition on line 18 was never true
19 from typing import Any
20 from typing import Callable
21 from typing import Dict
22 from typing import Iterable
23 from typing import Optional
24 from typing import Tuple
25 from typing import Type
26 from typing import Union
27 from typing import DefaultDict
29 from urllib3.poolmanager import PoolManager # type: ignore
30 from urllib3.poolmanager import ProxyManager
32 from sentry_sdk._types import Event, EndpointType
34 DataCategory = Optional[str]
36try:
37 from urllib.request import getproxies
38except ImportError:
39 from urllib import getproxies # type: ignore
42class Transport(object):
43 """Baseclass for all transports.
45 A transport is used to send an event to sentry.
46 """
48 parsed_dsn = None # type: Optional[Dsn]
50 def __init__(
51 self, options=None # type: Optional[Dict[str, Any]]
52 ):
53 # type: (...) -> None
54 self.options = options
55 if options and options["dsn"] is not None and options["dsn"]: 55 ↛ 58line 55 didn't jump to line 58, because the condition on line 55 was never false
56 self.parsed_dsn = Dsn(options["dsn"])
57 else:
58 self.parsed_dsn = None
60 def capture_event(
61 self, event # type: Event
62 ):
63 # type: (...) -> None
64 """
65 This gets invoked with the event dictionary when an event should
66 be sent to sentry.
67 """
68 raise NotImplementedError()
70 def capture_envelope(
71 self, envelope # type: Envelope
72 ):
73 # type: (...) -> None
74 """
75 Send an envelope to Sentry.
77 Envelopes are a data container format that can hold any type of data
78 submitted to Sentry. We use it for transactions and sessions, but
79 regular "error" events should go through `capture_event` for backwards
80 compat.
81 """
82 raise NotImplementedError()
84 def flush(
85 self,
86 timeout, # type: float
87 callback=None, # type: Optional[Any]
88 ):
89 # type: (...) -> None
90 """Wait `timeout` seconds for the current events to be sent out."""
91 pass
93 def kill(self):
94 # type: () -> None
95 """Forcefully kills the transport."""
96 pass
98 def record_lost_event(
99 self,
100 reason, # type: str
101 data_category=None, # type: Optional[str]
102 item=None, # type: Optional[Item]
103 ):
104 # type: (...) -> None
105 """This increments a counter for event loss by reason and
106 data category.
107 """
108 return None
110 def __del__(self):
111 # type: () -> None
112 try:
113 self.kill()
114 except Exception:
115 pass
118def _parse_rate_limits(header, now=None):
119 # type: (Any, Optional[datetime]) -> Iterable[Tuple[DataCategory, datetime]]
120 if now is None:
121 now = datetime.utcnow()
123 for limit in header.split(","):
124 try:
125 retry_after, categories, _ = limit.strip().split(":", 2)
126 retry_after = now + timedelta(seconds=int(retry_after))
127 for category in categories and categories.split(";") or (None,):
128 yield category, retry_after
129 except (LookupError, ValueError):
130 continue
133class HttpTransport(Transport):
134 """The default HTTP transport."""
136 def __init__(
137 self, options # type: Dict[str, Any]
138 ):
139 # type: (...) -> None
140 from sentry_sdk.consts import VERSION
142 Transport.__init__(self, options)
143 assert self.parsed_dsn is not None
144 self.options = options # type: Dict[str, Any]
145 self._worker = BackgroundWorker(queue_size=options["transport_queue_size"])
146 self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
147 self._disabled_until = {} # type: Dict[DataCategory, datetime]
148 self._retry = urllib3.util.Retry()
149 self._discarded_events = defaultdict(
150 int
151 ) # type: DefaultDict[Tuple[str, str], int]
152 self._last_client_report_sent = time.time()
154 self._pool = self._make_pool(
155 self.parsed_dsn,
156 http_proxy=options["http_proxy"],
157 https_proxy=options["https_proxy"],
158 ca_certs=options["ca_certs"],
159 )
161 from sentry_sdk import Hub
163 self.hub_cls = Hub
165 def record_lost_event(
166 self,
167 reason, # type: str
168 data_category=None, # type: Optional[str]
169 item=None, # type: Optional[Item]
170 ):
171 # type: (...) -> None
172 if not self.options["send_client_reports"]:
173 return
175 quantity = 1
176 if item is not None:
177 data_category = item.data_category
178 if data_category == "attachment":
179 # quantity of 0 is actually 1 as we do not want to count
180 # empty attachments as actually empty.
181 quantity = len(item.get_bytes()) or 1
182 elif data_category is None:
183 raise TypeError("data category not provided")
185 self._discarded_events[data_category, reason] += quantity
187 def _update_rate_limits(self, response):
188 # type: (urllib3.HTTPResponse) -> None
190 # new sentries with more rate limit insights. We honor this header
191 # no matter of the status code to update our internal rate limits.
192 header = response.headers.get("x-sentry-rate-limits")
193 if header:
194 logger.warning("Rate-limited via x-sentry-rate-limits")
195 self._disabled_until.update(_parse_rate_limits(header))
197 # old sentries only communicate global rate limit hits via the
198 # retry-after header on 429. This header can also be emitted on new
199 # sentries if a proxy in front wants to globally slow things down.
200 elif response.status == 429:
201 logger.warning("Rate-limited via 429")
202 self._disabled_until[None] = datetime.utcnow() + timedelta(
203 seconds=self._retry.get_retry_after(response) or 60
204 )
206 def _send_request(
207 self,
208 body, # type: bytes
209 headers, # type: Dict[str, str]
210 endpoint_type="store", # type: EndpointType
211 envelope=None, # type: Optional[Envelope]
212 ):
213 # type: (...) -> None
215 def record_loss(reason):
216 # type: (str) -> None
217 if envelope is None:
218 self.record_lost_event(reason, data_category="error")
219 else:
220 for item in envelope.items:
221 self.record_lost_event(reason, item=item)
223 headers.update(
224 {
225 "User-Agent": str(self._auth.client),
226 "X-Sentry-Auth": str(self._auth.to_header()),
227 }
228 )
229 try:
230 response = self._pool.request(
231 "POST",
232 str(self._auth.get_api_url(endpoint_type)),
233 body=body,
234 headers=headers,
235 )
236 except Exception:
237 self.on_dropped_event("network")
238 record_loss("network_error")
239 raise
241 try:
242 self._update_rate_limits(response)
244 if response.status == 429:
245 # if we hit a 429. Something was rate limited but we already
246 # acted on this in `self._update_rate_limits`. Note that we
247 # do not want to record event loss here as we will have recorded
248 # an outcome in relay already.
249 self.on_dropped_event("status_429")
250 pass
252 elif response.status >= 300 or response.status < 200:
253 logger.error(
254 "Unexpected status code: %s (body: %s)",
255 response.status,
256 response.data,
257 )
258 self.on_dropped_event("status_{}".format(response.status))
259 record_loss("network_error")
260 finally:
261 response.close()
263 def on_dropped_event(self, reason):
264 # type: (str) -> None
265 return None
267 def _fetch_pending_client_report(self, force=False, interval=60):
268 # type: (bool, int) -> Optional[Item]
269 if not self.options["send_client_reports"]:
270 return None
272 if not (force or self._last_client_report_sent < time.time() - interval):
273 return None
275 discarded_events = self._discarded_events
276 self._discarded_events = defaultdict(int)
277 self._last_client_report_sent = time.time()
279 if not discarded_events:
280 return None
282 return Item(
283 PayloadRef(
284 json={
285 "timestamp": time.time(),
286 "discarded_events": [
287 {"reason": reason, "category": category, "quantity": quantity}
288 for (
289 (category, reason),
290 quantity,
291 ) in discarded_events.items()
292 ],
293 }
294 ),
295 type="client_report",
296 )
298 def _flush_client_reports(self, force=False):
299 # type: (bool) -> None
300 client_report = self._fetch_pending_client_report(force=force, interval=60)
301 if client_report is not None:
302 self.capture_envelope(Envelope(items=[client_report]))
304 def _check_disabled(self, category):
305 # type: (str) -> bool
306 def _disabled(bucket):
307 # type: (Any) -> bool
308 ts = self._disabled_until.get(bucket)
309 return ts is not None and ts > datetime.utcnow()
311 return _disabled(category) or _disabled(None)
313 def _send_event(
314 self, event # type: Event
315 ):
316 # type: (...) -> None
318 if self._check_disabled("error"):
319 self.on_dropped_event("self_rate_limits")
320 self.record_lost_event("ratelimit_backoff", data_category="error")
321 return None
323 body = io.BytesIO()
324 with gzip.GzipFile(fileobj=body, mode="w") as f:
325 f.write(json_dumps(event))
327 assert self.parsed_dsn is not None
328 logger.debug(
329 "Sending event, type:%s level:%s event_id:%s project:%s host:%s"
330 % (
331 event.get("type") or "null",
332 event.get("level") or "null",
333 event.get("event_id") or "null",
334 self.parsed_dsn.project_id,
335 self.parsed_dsn.host,
336 )
337 )
338 self._send_request(
339 body.getvalue(),
340 headers={"Content-Type": "application/json", "Content-Encoding": "gzip"},
341 )
342 return None
344 def _send_envelope(
345 self, envelope # type: Envelope
346 ):
347 # type: (...) -> None
349 # remove all items from the envelope which are over quota
350 new_items = []
351 for item in envelope.items:
352 if self._check_disabled(item.data_category):
353 if item.data_category in ("transaction", "error", "default"):
354 self.on_dropped_event("self_rate_limits")
355 self.record_lost_event("ratelimit_backoff", item=item)
356 else:
357 new_items.append(item)
359 # Since we're modifying the envelope here make a copy so that others
360 # that hold references do not see their envelope modified.
361 envelope = Envelope(headers=envelope.headers, items=new_items)
363 if not envelope.items:
364 return None
366 # since we're already in the business of sending out an envelope here
367 # check if we have one pending for the stats session envelopes so we
368 # can attach it to this enveloped scheduled for sending. This will
369 # currently typically attach the client report to the most recent
370 # session update.
371 client_report_item = self._fetch_pending_client_report(interval=30)
372 if client_report_item is not None:
373 envelope.items.append(client_report_item)
375 body = io.BytesIO()
376 with gzip.GzipFile(fileobj=body, mode="w") as f:
377 envelope.serialize_into(f)
379 assert self.parsed_dsn is not None
380 logger.debug(
381 "Sending envelope [%s] project:%s host:%s",
382 envelope.description,
383 self.parsed_dsn.project_id,
384 self.parsed_dsn.host,
385 )
387 self._send_request(
388 body.getvalue(),
389 headers={
390 "Content-Type": "application/x-sentry-envelope",
391 "Content-Encoding": "gzip",
392 },
393 endpoint_type="envelope",
394 envelope=envelope,
395 )
396 return None
398 def _get_pool_options(self, ca_certs):
399 # type: (Optional[Any]) -> Dict[str, Any]
400 return {
401 "num_pools": 2,
402 "cert_reqs": "CERT_REQUIRED",
403 "ca_certs": ca_certs or certifi.where(),
404 }
406 def _in_no_proxy(self, parsed_dsn):
407 # type: (Dsn) -> bool
408 no_proxy = getproxies().get("no")
409 if not no_proxy: 409 ↛ 411line 409 didn't jump to line 411, because the condition on line 409 was never false
410 return False
411 for host in no_proxy.split(","):
412 host = host.strip()
413 if parsed_dsn.host.endswith(host) or parsed_dsn.netloc.endswith(host):
414 return True
415 return False
417 def _make_pool(
418 self,
419 parsed_dsn, # type: Dsn
420 http_proxy, # type: Optional[str]
421 https_proxy, # type: Optional[str]
422 ca_certs, # type: Optional[Any]
423 ):
424 # type: (...) -> Union[PoolManager, ProxyManager]
425 proxy = None
426 no_proxy = self._in_no_proxy(parsed_dsn)
428 # try HTTPS first
429 if parsed_dsn.scheme == "https" and (https_proxy != ""): 429 ↛ 433line 429 didn't jump to line 433, because the condition on line 429 was never false
430 proxy = https_proxy or (not no_proxy and getproxies().get("https"))
432 # maybe fallback to HTTP proxy
433 if not proxy and (http_proxy != ""): 433 ↛ 436line 433 didn't jump to line 436, because the condition on line 433 was never false
434 proxy = http_proxy or (not no_proxy and getproxies().get("http"))
436 opts = self._get_pool_options(ca_certs)
438 if proxy: 438 ↛ 439line 438 didn't jump to line 439, because the condition on line 438 was never true
439 return urllib3.ProxyManager(proxy, **opts)
440 else:
441 return urllib3.PoolManager(**opts)
443 def capture_event(
444 self, event # type: Event
445 ):
446 # type: (...) -> None
447 hub = self.hub_cls.current
449 def send_event_wrapper():
450 # type: () -> None
451 with hub:
452 with capture_internal_exceptions():
453 self._send_event(event)
454 self._flush_client_reports()
456 if not self._worker.submit(send_event_wrapper):
457 self.on_dropped_event("full_queue")
458 self.record_lost_event("queue_overflow", data_category="error")
460 def capture_envelope(
461 self, envelope # type: Envelope
462 ):
463 # type: (...) -> None
464 hub = self.hub_cls.current
466 def send_envelope_wrapper():
467 # type: () -> None
468 with hub:
469 with capture_internal_exceptions():
470 self._send_envelope(envelope)
471 self._flush_client_reports()
473 if not self._worker.submit(send_envelope_wrapper):
474 self.on_dropped_event("full_queue")
475 for item in envelope.items:
476 self.record_lost_event("queue_overflow", item=item)
478 def flush(
479 self,
480 timeout, # type: float
481 callback=None, # type: Optional[Any]
482 ):
483 # type: (...) -> None
484 logger.debug("Flushing HTTP transport")
486 if timeout > 0:
487 self._worker.submit(lambda: self._flush_client_reports(force=True))
488 self._worker.flush(timeout, callback)
490 def kill(self):
491 # type: () -> None
492 logger.debug("Killing HTTP transport")
493 self._worker.kill()
496class _FunctionTransport(Transport):
497 def __init__(
498 self, func # type: Callable[[Event], None]
499 ):
500 # type: (...) -> None
501 Transport.__init__(self)
502 self._func = func
504 def capture_event(
505 self, event # type: Event
506 ):
507 # type: (...) -> None
508 self._func(event)
509 return None
512def make_transport(options):
513 # type: (Dict[str, Any]) -> Optional[Transport]
514 ref_transport = options["transport"]
516 # If no transport is given, we use the http transport class
517 if ref_transport is None: 517 ↛ 519line 517 didn't jump to line 519, because the condition on line 517 was never false
518 transport_cls = HttpTransport # type: Type[Transport]
519 elif isinstance(ref_transport, Transport):
520 return ref_transport
521 elif isinstance(ref_transport, type) and issubclass(ref_transport, Transport):
522 transport_cls = ref_transport
523 elif callable(ref_transport):
524 return _FunctionTransport(ref_transport) # type: ignore
526 # if a transport class is given only instantiate it if the dsn is not
527 # empty or None
528 if options["dsn"]: 528 ↛ 531line 528 didn't jump to line 531, because the condition on line 528 was never false
529 return transport_cls(options)
531 return None