Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/transport.py: 28%

246 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import print_function 

2 

3import io 

4import urllib3 # type: ignore 

5import certifi 

6import gzip 

7import time 

8 

9from datetime import datetime, timedelta 

10from collections import defaultdict 

11 

12from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions, json_dumps 

13from sentry_sdk.worker import BackgroundWorker 

14from sentry_sdk.envelope import Envelope, Item, PayloadRef 

15 

16from sentry_sdk._types import MYPY 

17 

18if MYPY: 18 ↛ 19line 18 didn't jump to line 19, because the condition on line 18 was never true

19 from typing import Any 

20 from typing import Callable 

21 from typing import Dict 

22 from typing import Iterable 

23 from typing import Optional 

24 from typing import Tuple 

25 from typing import Type 

26 from typing import Union 

27 from typing import DefaultDict 

28 

29 from urllib3.poolmanager import PoolManager # type: ignore 

30 from urllib3.poolmanager import ProxyManager 

31 

32 from sentry_sdk._types import Event, EndpointType 

33 

34 DataCategory = Optional[str] 

35 

36try: 

37 from urllib.request import getproxies 

38except ImportError: 

39 from urllib import getproxies # type: ignore 

40 

41 

42class Transport(object): 

43 """Baseclass for all transports. 

44 

45 A transport is used to send an event to sentry. 

46 """ 

47 

48 parsed_dsn = None # type: Optional[Dsn] 

49 

50 def __init__( 

51 self, options=None # type: Optional[Dict[str, Any]] 

52 ): 

53 # type: (...) -> None 

54 self.options = options 

55 if options and options["dsn"] is not None and options["dsn"]: 55 ↛ 58line 55 didn't jump to line 58, because the condition on line 55 was never false

56 self.parsed_dsn = Dsn(options["dsn"]) 

57 else: 

58 self.parsed_dsn = None 

59 

60 def capture_event( 

61 self, event # type: Event 

62 ): 

63 # type: (...) -> None 

64 """ 

65 This gets invoked with the event dictionary when an event should 

66 be sent to sentry. 

67 """ 

68 raise NotImplementedError() 

69 

70 def capture_envelope( 

71 self, envelope # type: Envelope 

72 ): 

73 # type: (...) -> None 

74 """ 

75 Send an envelope to Sentry. 

76 

77 Envelopes are a data container format that can hold any type of data 

78 submitted to Sentry. We use it for transactions and sessions, but 

79 regular "error" events should go through `capture_event` for backwards 

80 compat. 

81 """ 

82 raise NotImplementedError() 

83 

84 def flush( 

85 self, 

86 timeout, # type: float 

87 callback=None, # type: Optional[Any] 

88 ): 

89 # type: (...) -> None 

90 """Wait `timeout` seconds for the current events to be sent out.""" 

91 pass 

92 

93 def kill(self): 

94 # type: () -> None 

95 """Forcefully kills the transport.""" 

96 pass 

97 

98 def record_lost_event( 

99 self, 

100 reason, # type: str 

101 data_category=None, # type: Optional[str] 

102 item=None, # type: Optional[Item] 

103 ): 

104 # type: (...) -> None 

105 """This increments a counter for event loss by reason and 

106 data category. 

107 """ 

108 return None 

109 

110 def __del__(self): 

111 # type: () -> None 

112 try: 

113 self.kill() 

114 except Exception: 

115 pass 

116 

117 

118def _parse_rate_limits(header, now=None): 

119 # type: (Any, Optional[datetime]) -> Iterable[Tuple[DataCategory, datetime]] 

120 if now is None: 

121 now = datetime.utcnow() 

122 

123 for limit in header.split(","): 

124 try: 

125 retry_after, categories, _ = limit.strip().split(":", 2) 

126 retry_after = now + timedelta(seconds=int(retry_after)) 

127 for category in categories and categories.split(";") or (None,): 

128 yield category, retry_after 

129 except (LookupError, ValueError): 

130 continue 

131 

132 

133class HttpTransport(Transport): 

134 """The default HTTP transport.""" 

135 

136 def __init__( 

137 self, options # type: Dict[str, Any] 

138 ): 

139 # type: (...) -> None 

140 from sentry_sdk.consts import VERSION 

141 

142 Transport.__init__(self, options) 

143 assert self.parsed_dsn is not None 

144 self.options = options # type: Dict[str, Any] 

145 self._worker = BackgroundWorker(queue_size=options["transport_queue_size"]) 

146 self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) 

147 self._disabled_until = {} # type: Dict[DataCategory, datetime] 

148 self._retry = urllib3.util.Retry() 

149 self._discarded_events = defaultdict( 

150 int 

151 ) # type: DefaultDict[Tuple[str, str], int] 

152 self._last_client_report_sent = time.time() 

153 

154 self._pool = self._make_pool( 

155 self.parsed_dsn, 

156 http_proxy=options["http_proxy"], 

157 https_proxy=options["https_proxy"], 

158 ca_certs=options["ca_certs"], 

159 ) 

160 

161 from sentry_sdk import Hub 

162 

163 self.hub_cls = Hub 

164 

165 def record_lost_event( 

166 self, 

167 reason, # type: str 

168 data_category=None, # type: Optional[str] 

169 item=None, # type: Optional[Item] 

170 ): 

171 # type: (...) -> None 

172 if not self.options["send_client_reports"]: 

173 return 

174 

175 quantity = 1 

176 if item is not None: 

177 data_category = item.data_category 

178 if data_category == "attachment": 

179 # quantity of 0 is actually 1 as we do not want to count 

180 # empty attachments as actually empty. 

181 quantity = len(item.get_bytes()) or 1 

182 elif data_category is None: 

183 raise TypeError("data category not provided") 

184 

185 self._discarded_events[data_category, reason] += quantity 

186 

187 def _update_rate_limits(self, response): 

188 # type: (urllib3.HTTPResponse) -> None 

189 

190 # new sentries with more rate limit insights. We honor this header 

191 # no matter of the status code to update our internal rate limits. 

192 header = response.headers.get("x-sentry-rate-limits") 

193 if header: 

194 logger.warning("Rate-limited via x-sentry-rate-limits") 

195 self._disabled_until.update(_parse_rate_limits(header)) 

196 

197 # old sentries only communicate global rate limit hits via the 

198 # retry-after header on 429. This header can also be emitted on new 

199 # sentries if a proxy in front wants to globally slow things down. 

200 elif response.status == 429: 

201 logger.warning("Rate-limited via 429") 

202 self._disabled_until[None] = datetime.utcnow() + timedelta( 

203 seconds=self._retry.get_retry_after(response) or 60 

204 ) 

205 

206 def _send_request( 

207 self, 

208 body, # type: bytes 

209 headers, # type: Dict[str, str] 

210 endpoint_type="store", # type: EndpointType 

211 envelope=None, # type: Optional[Envelope] 

212 ): 

213 # type: (...) -> None 

214 

215 def record_loss(reason): 

216 # type: (str) -> None 

217 if envelope is None: 

218 self.record_lost_event(reason, data_category="error") 

219 else: 

220 for item in envelope.items: 

221 self.record_lost_event(reason, item=item) 

222 

223 headers.update( 

224 { 

225 "User-Agent": str(self._auth.client), 

226 "X-Sentry-Auth": str(self._auth.to_header()), 

227 } 

228 ) 

229 try: 

230 response = self._pool.request( 

231 "POST", 

232 str(self._auth.get_api_url(endpoint_type)), 

233 body=body, 

234 headers=headers, 

235 ) 

236 except Exception: 

237 self.on_dropped_event("network") 

238 record_loss("network_error") 

239 raise 

240 

241 try: 

242 self._update_rate_limits(response) 

243 

244 if response.status == 429: 

245 # if we hit a 429. Something was rate limited but we already 

246 # acted on this in `self._update_rate_limits`. Note that we 

247 # do not want to record event loss here as we will have recorded 

248 # an outcome in relay already. 

249 self.on_dropped_event("status_429") 

250 pass 

251 

252 elif response.status >= 300 or response.status < 200: 

253 logger.error( 

254 "Unexpected status code: %s (body: %s)", 

255 response.status, 

256 response.data, 

257 ) 

258 self.on_dropped_event("status_{}".format(response.status)) 

259 record_loss("network_error") 

260 finally: 

261 response.close() 

262 

263 def on_dropped_event(self, reason): 

264 # type: (str) -> None 

265 return None 

266 

267 def _fetch_pending_client_report(self, force=False, interval=60): 

268 # type: (bool, int) -> Optional[Item] 

269 if not self.options["send_client_reports"]: 

270 return None 

271 

272 if not (force or self._last_client_report_sent < time.time() - interval): 

273 return None 

274 

275 discarded_events = self._discarded_events 

276 self._discarded_events = defaultdict(int) 

277 self._last_client_report_sent = time.time() 

278 

279 if not discarded_events: 

280 return None 

281 

282 return Item( 

283 PayloadRef( 

284 json={ 

285 "timestamp": time.time(), 

286 "discarded_events": [ 

287 {"reason": reason, "category": category, "quantity": quantity} 

288 for ( 

289 (category, reason), 

290 quantity, 

291 ) in discarded_events.items() 

292 ], 

293 } 

294 ), 

295 type="client_report", 

296 ) 

297 

298 def _flush_client_reports(self, force=False): 

299 # type: (bool) -> None 

300 client_report = self._fetch_pending_client_report(force=force, interval=60) 

301 if client_report is not None: 

302 self.capture_envelope(Envelope(items=[client_report])) 

303 

304 def _check_disabled(self, category): 

305 # type: (str) -> bool 

306 def _disabled(bucket): 

307 # type: (Any) -> bool 

308 ts = self._disabled_until.get(bucket) 

309 return ts is not None and ts > datetime.utcnow() 

310 

311 return _disabled(category) or _disabled(None) 

312 

313 def _send_event( 

314 self, event # type: Event 

315 ): 

316 # type: (...) -> None 

317 

318 if self._check_disabled("error"): 

319 self.on_dropped_event("self_rate_limits") 

320 self.record_lost_event("ratelimit_backoff", data_category="error") 

321 return None 

322 

323 body = io.BytesIO() 

324 with gzip.GzipFile(fileobj=body, mode="w") as f: 

325 f.write(json_dumps(event)) 

326 

327 assert self.parsed_dsn is not None 

328 logger.debug( 

329 "Sending event, type:%s level:%s event_id:%s project:%s host:%s" 

330 % ( 

331 event.get("type") or "null", 

332 event.get("level") or "null", 

333 event.get("event_id") or "null", 

334 self.parsed_dsn.project_id, 

335 self.parsed_dsn.host, 

336 ) 

337 ) 

338 self._send_request( 

339 body.getvalue(), 

340 headers={"Content-Type": "application/json", "Content-Encoding": "gzip"}, 

341 ) 

342 return None 

343 

344 def _send_envelope( 

345 self, envelope # type: Envelope 

346 ): 

347 # type: (...) -> None 

348 

349 # remove all items from the envelope which are over quota 

350 new_items = [] 

351 for item in envelope.items: 

352 if self._check_disabled(item.data_category): 

353 if item.data_category in ("transaction", "error", "default"): 

354 self.on_dropped_event("self_rate_limits") 

355 self.record_lost_event("ratelimit_backoff", item=item) 

356 else: 

357 new_items.append(item) 

358 

359 # Since we're modifying the envelope here make a copy so that others 

360 # that hold references do not see their envelope modified. 

361 envelope = Envelope(headers=envelope.headers, items=new_items) 

362 

363 if not envelope.items: 

364 return None 

365 

366 # since we're already in the business of sending out an envelope here 

367 # check if we have one pending for the stats session envelopes so we 

368 # can attach it to this enveloped scheduled for sending. This will 

369 # currently typically attach the client report to the most recent 

370 # session update. 

371 client_report_item = self._fetch_pending_client_report(interval=30) 

372 if client_report_item is not None: 

373 envelope.items.append(client_report_item) 

374 

375 body = io.BytesIO() 

376 with gzip.GzipFile(fileobj=body, mode="w") as f: 

377 envelope.serialize_into(f) 

378 

379 assert self.parsed_dsn is not None 

380 logger.debug( 

381 "Sending envelope [%s] project:%s host:%s", 

382 envelope.description, 

383 self.parsed_dsn.project_id, 

384 self.parsed_dsn.host, 

385 ) 

386 

387 self._send_request( 

388 body.getvalue(), 

389 headers={ 

390 "Content-Type": "application/x-sentry-envelope", 

391 "Content-Encoding": "gzip", 

392 }, 

393 endpoint_type="envelope", 

394 envelope=envelope, 

395 ) 

396 return None 

397 

398 def _get_pool_options(self, ca_certs): 

399 # type: (Optional[Any]) -> Dict[str, Any] 

400 return { 

401 "num_pools": 2, 

402 "cert_reqs": "CERT_REQUIRED", 

403 "ca_certs": ca_certs or certifi.where(), 

404 } 

405 

406 def _in_no_proxy(self, parsed_dsn): 

407 # type: (Dsn) -> bool 

408 no_proxy = getproxies().get("no") 

409 if not no_proxy: 409 ↛ 411line 409 didn't jump to line 411, because the condition on line 409 was never false

410 return False 

411 for host in no_proxy.split(","): 

412 host = host.strip() 

413 if parsed_dsn.host.endswith(host) or parsed_dsn.netloc.endswith(host): 

414 return True 

415 return False 

416 

417 def _make_pool( 

418 self, 

419 parsed_dsn, # type: Dsn 

420 http_proxy, # type: Optional[str] 

421 https_proxy, # type: Optional[str] 

422 ca_certs, # type: Optional[Any] 

423 ): 

424 # type: (...) -> Union[PoolManager, ProxyManager] 

425 proxy = None 

426 no_proxy = self._in_no_proxy(parsed_dsn) 

427 

428 # try HTTPS first 

429 if parsed_dsn.scheme == "https" and (https_proxy != ""): 429 ↛ 433line 429 didn't jump to line 433, because the condition on line 429 was never false

430 proxy = https_proxy or (not no_proxy and getproxies().get("https")) 

431 

432 # maybe fallback to HTTP proxy 

433 if not proxy and (http_proxy != ""): 433 ↛ 436line 433 didn't jump to line 436, because the condition on line 433 was never false

434 proxy = http_proxy or (not no_proxy and getproxies().get("http")) 

435 

436 opts = self._get_pool_options(ca_certs) 

437 

438 if proxy: 438 ↛ 439line 438 didn't jump to line 439, because the condition on line 438 was never true

439 return urllib3.ProxyManager(proxy, **opts) 

440 else: 

441 return urllib3.PoolManager(**opts) 

442 

443 def capture_event( 

444 self, event # type: Event 

445 ): 

446 # type: (...) -> None 

447 hub = self.hub_cls.current 

448 

449 def send_event_wrapper(): 

450 # type: () -> None 

451 with hub: 

452 with capture_internal_exceptions(): 

453 self._send_event(event) 

454 self._flush_client_reports() 

455 

456 if not self._worker.submit(send_event_wrapper): 

457 self.on_dropped_event("full_queue") 

458 self.record_lost_event("queue_overflow", data_category="error") 

459 

460 def capture_envelope( 

461 self, envelope # type: Envelope 

462 ): 

463 # type: (...) -> None 

464 hub = self.hub_cls.current 

465 

466 def send_envelope_wrapper(): 

467 # type: () -> None 

468 with hub: 

469 with capture_internal_exceptions(): 

470 self._send_envelope(envelope) 

471 self._flush_client_reports() 

472 

473 if not self._worker.submit(send_envelope_wrapper): 

474 self.on_dropped_event("full_queue") 

475 for item in envelope.items: 

476 self.record_lost_event("queue_overflow", item=item) 

477 

478 def flush( 

479 self, 

480 timeout, # type: float 

481 callback=None, # type: Optional[Any] 

482 ): 

483 # type: (...) -> None 

484 logger.debug("Flushing HTTP transport") 

485 

486 if timeout > 0: 

487 self._worker.submit(lambda: self._flush_client_reports(force=True)) 

488 self._worker.flush(timeout, callback) 

489 

490 def kill(self): 

491 # type: () -> None 

492 logger.debug("Killing HTTP transport") 

493 self._worker.kill() 

494 

495 

496class _FunctionTransport(Transport): 

497 def __init__( 

498 self, func # type: Callable[[Event], None] 

499 ): 

500 # type: (...) -> None 

501 Transport.__init__(self) 

502 self._func = func 

503 

504 def capture_event( 

505 self, event # type: Event 

506 ): 

507 # type: (...) -> None 

508 self._func(event) 

509 return None 

510 

511 

512def make_transport(options): 

513 # type: (Dict[str, Any]) -> Optional[Transport] 

514 ref_transport = options["transport"] 

515 

516 # If no transport is given, we use the http transport class 

517 if ref_transport is None: 517 ↛ 519line 517 didn't jump to line 519, because the condition on line 517 was never false

518 transport_cls = HttpTransport # type: Type[Transport] 

519 elif isinstance(ref_transport, Transport): 

520 return ref_transport 

521 elif isinstance(ref_transport, type) and issubclass(ref_transport, Transport): 

522 transport_cls = ref_transport 

523 elif callable(ref_transport): 

524 return _FunctionTransport(ref_transport) # type: ignore 

525 

526 # if a transport class is given only instantiate it if the dsn is not 

527 # empty or None 

528 if options["dsn"]: 528 ↛ 531line 528 didn't jump to line 531, because the condition on line 528 was never false

529 return transport_cls(options) 

530 

531 return None