Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/utils.py: 27%
500 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import base64
2import json
3import linecache
4import logging
5import os
6import sys
7import threading
8import subprocess
9import re
11from datetime import datetime
13import sentry_sdk
14from sentry_sdk._compat import urlparse, text_type, implements_str, PY2
16from sentry_sdk._types import MYPY
18if MYPY: 18 ↛ 19line 18 didn't jump to line 19, because the condition on line 18 was never true
19 from types import FrameType
20 from types import TracebackType
21 from typing import Any
22 from typing import Callable
23 from typing import Dict
24 from typing import ContextManager
25 from typing import Iterator
26 from typing import List
27 from typing import Optional
28 from typing import Set
29 from typing import Tuple
30 from typing import Union
31 from typing import Type
33 from sentry_sdk._types import ExcInfo, EndpointType
36epoch = datetime(1970, 1, 1)
39# The logger is created here but initialized in the debug support module
40logger = logging.getLogger("sentry_sdk.errors")
42MAX_STRING_LENGTH = 512
43BASE64_ALPHABET = re.compile(r"^[a-zA-Z0-9/+=]*$")
46def json_dumps(data):
47 # type: (Any) -> bytes
48 """Serialize data into a compact JSON representation encoded as UTF-8."""
49 return json.dumps(data, allow_nan=False, separators=(",", ":")).encode("utf-8")
52def _get_debug_hub():
53 # type: () -> Optional[sentry_sdk.Hub]
54 # This function is replaced by debug.py
55 pass
58def get_default_release():
59 # type: () -> Optional[str]
60 """Try to guess a default release."""
61 release = os.environ.get("SENTRY_RELEASE")
62 if release: 62 ↛ 63line 62 didn't jump to line 63, because the condition on line 62 was never true
63 return release
65 with open(os.path.devnull, "w+") as null:
66 try:
67 release = (
68 subprocess.Popen(
69 ["git", "rev-parse", "HEAD"],
70 stdout=subprocess.PIPE,
71 stderr=null,
72 stdin=null,
73 )
74 .communicate()[0]
75 .strip()
76 .decode("utf-8")
77 )
78 except (OSError, IOError):
79 pass
81 if release: 81 ↛ 82line 81 didn't jump to line 82, because the condition on line 81 was never true
82 return release
84 for var in (
85 "HEROKU_SLUG_COMMIT",
86 "SOURCE_VERSION",
87 "CODEBUILD_RESOLVED_SOURCE_VERSION",
88 "CIRCLE_SHA1",
89 "GAE_DEPLOYMENT_ID",
90 ):
91 release = os.environ.get(var)
92 if release: 92 ↛ 93line 92 didn't jump to line 93, because the condition on line 92 was never true
93 return release
94 return None
97class CaptureInternalException(object):
98 __slots__ = ()
100 def __enter__(self):
101 # type: () -> ContextManager[Any]
102 return self
104 def __exit__(self, ty, value, tb):
105 # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> bool
106 if ty is not None and value is not None: 106 ↛ 107line 106 didn't jump to line 107, because the condition on line 106 was never true
107 capture_internal_exception((ty, value, tb))
109 return True
112_CAPTURE_INTERNAL_EXCEPTION = CaptureInternalException()
115def capture_internal_exceptions():
116 # type: () -> ContextManager[Any]
117 return _CAPTURE_INTERNAL_EXCEPTION
120def capture_internal_exception(exc_info):
121 # type: (ExcInfo) -> None
122 hub = _get_debug_hub()
123 if hub is not None:
124 hub._capture_internal_exception(exc_info)
127def to_timestamp(value):
128 # type: (datetime) -> float
129 return (value - epoch).total_seconds()
132def format_timestamp(value):
133 # type: (datetime) -> str
134 return value.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
137def event_hint_with_exc_info(exc_info=None):
138 # type: (Optional[ExcInfo]) -> Dict[str, Optional[ExcInfo]]
139 """Creates a hint with the exc info filled in."""
140 if exc_info is None:
141 exc_info = sys.exc_info()
142 else:
143 exc_info = exc_info_from_error(exc_info)
144 if exc_info[0] is None:
145 exc_info = None
146 return {"exc_info": exc_info}
149class BadDsn(ValueError):
150 """Raised on invalid DSNs."""
153@implements_str
154class Dsn(object):
155 """Represents a DSN."""
157 def __init__(self, value):
158 # type: (Union[Dsn, str]) -> None
159 if isinstance(value, Dsn): 159 ↛ 160line 159 didn't jump to line 160, because the condition on line 159 was never true
160 self.__dict__ = dict(value.__dict__)
161 return
162 parts = urlparse.urlsplit(text_type(value))
164 if parts.scheme not in ("http", "https"): 164 ↛ 165line 164 didn't jump to line 165, because the condition on line 164 was never true
165 raise BadDsn("Unsupported scheme %r" % parts.scheme)
166 self.scheme = parts.scheme
168 if parts.hostname is None: 168 ↛ 169line 168 didn't jump to line 169, because the condition on line 168 was never true
169 raise BadDsn("Missing hostname")
171 self.host = parts.hostname
173 if parts.port is None: 173 ↛ 176line 173 didn't jump to line 176, because the condition on line 173 was never false
174 self.port = self.scheme == "https" and 443 or 80 # type: int
175 else:
176 self.port = parts.port
178 if not parts.username: 178 ↛ 179line 178 didn't jump to line 179, because the condition on line 178 was never true
179 raise BadDsn("Missing public key")
181 self.public_key = parts.username
182 self.secret_key = parts.password
184 path = parts.path.rsplit("/", 1)
186 try:
187 self.project_id = text_type(int(path.pop()))
188 except (ValueError, TypeError):
189 raise BadDsn("Invalid project in DSN (%r)" % (parts.path or "")[1:])
191 self.path = "/".join(path) + "/"
193 @property
194 def netloc(self):
195 # type: () -> str
196 """The netloc part of a DSN."""
197 rv = self.host
198 if (self.scheme, self.port) not in (("http", 80), ("https", 443)): 198 ↛ 199line 198 didn't jump to line 199, because the condition on line 198 was never true
199 rv = "%s:%s" % (rv, self.port)
200 return rv
202 def to_auth(self, client=None):
203 # type: (Optional[Any]) -> Auth
204 """Returns the auth info object for this dsn."""
205 return Auth(
206 scheme=self.scheme,
207 host=self.netloc,
208 path=self.path,
209 project_id=self.project_id,
210 public_key=self.public_key,
211 secret_key=self.secret_key,
212 client=client,
213 )
215 def __str__(self):
216 # type: () -> str
217 return "%s://%s%s@%s%s%s" % (
218 self.scheme,
219 self.public_key,
220 self.secret_key and "@" + self.secret_key or "",
221 self.netloc,
222 self.path,
223 self.project_id,
224 )
227class Auth(object):
228 """Helper object that represents the auth info."""
230 def __init__(
231 self,
232 scheme,
233 host,
234 project_id,
235 public_key,
236 secret_key=None,
237 version=7,
238 client=None,
239 path="/",
240 ):
241 # type: (str, str, str, str, Optional[str], int, Optional[Any], str) -> None
242 self.scheme = scheme
243 self.host = host
244 self.path = path
245 self.project_id = project_id
246 self.public_key = public_key
247 self.secret_key = secret_key
248 self.version = version
249 self.client = client
251 @property
252 def store_api_url(self):
253 # type: () -> str
254 """Returns the API url for storing events.
256 Deprecated: use get_api_url instead.
257 """
258 return self.get_api_url(type="store")
260 def get_api_url(
261 self, type="store" # type: EndpointType
262 ):
263 # type: (...) -> str
264 """Returns the API url for storing events."""
265 return "%s://%s%sapi/%s/%s/" % (
266 self.scheme,
267 self.host,
268 self.path,
269 self.project_id,
270 type,
271 )
273 def to_header(self):
274 # type: () -> str
275 """Returns the auth header a string."""
276 rv = [("sentry_key", self.public_key), ("sentry_version", self.version)]
277 if self.client is not None:
278 rv.append(("sentry_client", self.client))
279 if self.secret_key is not None:
280 rv.append(("sentry_secret", self.secret_key))
281 return "Sentry " + ", ".join("%s=%s" % (key, value) for key, value in rv)
284class AnnotatedValue(object):
285 __slots__ = ("value", "metadata")
287 def __init__(self, value, metadata):
288 # type: (Optional[Any], Dict[str, Any]) -> None
289 self.value = value
290 self.metadata = metadata
293if MYPY: 293 ↛ 294line 293 didn't jump to line 294, because the condition on line 293 was never true
294 from typing import TypeVar
296 T = TypeVar("T")
297 Annotated = Union[AnnotatedValue, T]
300def get_type_name(cls):
301 # type: (Optional[type]) -> Optional[str]
302 return getattr(cls, "__qualname__", None) or getattr(cls, "__name__", None)
305def get_type_module(cls):
306 # type: (Optional[type]) -> Optional[str]
307 mod = getattr(cls, "__module__", None)
308 if mod not in (None, "builtins", "__builtins__"):
309 return mod
310 return None
313def should_hide_frame(frame):
314 # type: (FrameType) -> bool
315 try:
316 mod = frame.f_globals["__name__"]
317 if mod.startswith("sentry_sdk."):
318 return True
319 except (AttributeError, KeyError):
320 pass
322 for flag_name in "__traceback_hide__", "__tracebackhide__":
323 try:
324 if frame.f_locals[flag_name]:
325 return True
326 except Exception:
327 pass
329 return False
332def iter_stacks(tb):
333 # type: (Optional[TracebackType]) -> Iterator[TracebackType]
334 tb_ = tb # type: Optional[TracebackType]
335 while tb_ is not None:
336 if not should_hide_frame(tb_.tb_frame):
337 yield tb_
338 tb_ = tb_.tb_next
341def get_lines_from_file(
342 filename, # type: str
343 lineno, # type: int
344 loader=None, # type: Optional[Any]
345 module=None, # type: Optional[str]
346):
347 # type: (...) -> Tuple[List[Annotated[str]], Optional[Annotated[str]], List[Annotated[str]]]
348 context_lines = 5
349 source = None
350 if loader is not None and hasattr(loader, "get_source"):
351 try:
352 source_str = loader.get_source(module) # type: Optional[str]
353 except (ImportError, IOError):
354 source_str = None
355 if source_str is not None:
356 source = source_str.splitlines()
358 if source is None:
359 try:
360 source = linecache.getlines(filename)
361 except (OSError, IOError):
362 return [], None, []
364 if not source:
365 return [], None, []
367 lower_bound = max(0, lineno - context_lines)
368 upper_bound = min(lineno + 1 + context_lines, len(source))
370 try:
371 pre_context = [
372 strip_string(line.strip("\r\n")) for line in source[lower_bound:lineno]
373 ]
374 context_line = strip_string(source[lineno].strip("\r\n"))
375 post_context = [
376 strip_string(line.strip("\r\n"))
377 for line in source[(lineno + 1) : upper_bound]
378 ]
379 return pre_context, context_line, post_context
380 except IndexError:
381 # the file may have changed since it was loaded into memory
382 return [], None, []
385def get_source_context(
386 frame, # type: FrameType
387 tb_lineno, # type: int
388):
389 # type: (...) -> Tuple[List[Annotated[str]], Optional[Annotated[str]], List[Annotated[str]]]
390 try:
391 abs_path = frame.f_code.co_filename # type: Optional[str]
392 except Exception:
393 abs_path = None
394 try:
395 module = frame.f_globals["__name__"]
396 except Exception:
397 return [], None, []
398 try:
399 loader = frame.f_globals["__loader__"]
400 except Exception:
401 loader = None
402 lineno = tb_lineno - 1
403 if lineno is not None and abs_path:
404 return get_lines_from_file(abs_path, lineno, loader, module)
405 return [], None, []
408def safe_str(value):
409 # type: (Any) -> str
410 try:
411 return text_type(value)
412 except Exception:
413 return safe_repr(value)
416if PY2: 416 ↛ 418line 416 didn't jump to line 418, because the condition on line 416 was never true
418 def safe_repr(value):
419 # type: (Any) -> str
420 try:
421 rv = repr(value).decode("utf-8", "replace")
423 # At this point `rv` contains a bunch of literal escape codes, like
424 # this (exaggerated example):
425 #
426 # u"\\x2f"
427 #
428 # But we want to show this string as:
429 #
430 # u"/"
431 try:
432 # unicode-escape does this job, but can only decode latin1. So we
433 # attempt to encode in latin1.
434 return rv.encode("latin1").decode("unicode-escape")
435 except Exception:
436 # Since usually strings aren't latin1 this can break. In those
437 # cases we just give up.
438 return rv
439 except Exception:
440 # If e.g. the call to `repr` already fails
441 return "<broken repr>"
443else:
445 def safe_repr(value):
446 # type: (Any) -> str
447 try:
448 return repr(value)
449 except Exception:
450 return "<broken repr>"
453def filename_for_module(module, abs_path):
454 # type: (Optional[str], Optional[str]) -> Optional[str]
455 if not abs_path or not module:
456 return abs_path
458 try:
459 if abs_path.endswith(".pyc"):
460 abs_path = abs_path[:-1]
462 base_module = module.split(".", 1)[0]
463 if base_module == module:
464 return os.path.basename(abs_path)
466 base_module_path = sys.modules[base_module].__file__
467 if not base_module_path:
468 return abs_path
470 return abs_path.split(base_module_path.rsplit(os.sep, 2)[0], 1)[-1].lstrip(
471 os.sep
472 )
473 except Exception:
474 return abs_path
477def serialize_frame(frame, tb_lineno=None, with_locals=True):
478 # type: (FrameType, Optional[int], bool) -> Dict[str, Any]
479 f_code = getattr(frame, "f_code", None)
480 if not f_code:
481 abs_path = None
482 function = None
483 else:
484 abs_path = frame.f_code.co_filename
485 function = frame.f_code.co_name
486 try:
487 module = frame.f_globals["__name__"]
488 except Exception:
489 module = None
491 if tb_lineno is None:
492 tb_lineno = frame.f_lineno
494 pre_context, context_line, post_context = get_source_context(frame, tb_lineno)
496 rv = {
497 "filename": filename_for_module(module, abs_path) or None,
498 "abs_path": os.path.abspath(abs_path) if abs_path else None,
499 "function": function or "<unknown>",
500 "module": module,
501 "lineno": tb_lineno,
502 "pre_context": pre_context,
503 "context_line": context_line,
504 "post_context": post_context,
505 } # type: Dict[str, Any]
506 if with_locals:
507 rv["vars"] = frame.f_locals
509 return rv
512def current_stacktrace(with_locals=True):
513 # type: (bool) -> Any
514 __tracebackhide__ = True
515 frames = []
517 f = sys._getframe() # type: Optional[FrameType]
518 while f is not None:
519 if not should_hide_frame(f):
520 frames.append(serialize_frame(f, with_locals=with_locals))
521 f = f.f_back
523 frames.reverse()
525 return {"frames": frames}
528def get_errno(exc_value):
529 # type: (BaseException) -> Optional[Any]
530 return getattr(exc_value, "errno", None)
533def single_exception_from_error_tuple(
534 exc_type, # type: Optional[type]
535 exc_value, # type: Optional[BaseException]
536 tb, # type: Optional[TracebackType]
537 client_options=None, # type: Optional[Dict[str, Any]]
538 mechanism=None, # type: Optional[Dict[str, Any]]
539):
540 # type: (...) -> Dict[str, Any]
541 if exc_value is not None:
542 errno = get_errno(exc_value)
543 else:
544 errno = None
546 if errno is not None:
547 mechanism = mechanism or {"type": "generic"}
548 mechanism.setdefault("meta", {}).setdefault("errno", {}).setdefault(
549 "number", errno
550 )
552 if client_options is None:
553 with_locals = True
554 else:
555 with_locals = client_options["with_locals"]
557 frames = [
558 serialize_frame(tb.tb_frame, tb_lineno=tb.tb_lineno, with_locals=with_locals)
559 for tb in iter_stacks(tb)
560 ]
562 rv = {
563 "module": get_type_module(exc_type),
564 "type": get_type_name(exc_type),
565 "value": safe_str(exc_value),
566 "mechanism": mechanism,
567 }
569 if frames:
570 rv["stacktrace"] = {"frames": frames}
572 return rv
575HAS_CHAINED_EXCEPTIONS = hasattr(Exception, "__suppress_context__")
577if HAS_CHAINED_EXCEPTIONS: 577 ↛ 611line 577 didn't jump to line 611, because the condition on line 577 was never false
579 def walk_exception_chain(exc_info):
580 # type: (ExcInfo) -> Iterator[ExcInfo]
581 exc_type, exc_value, tb = exc_info
583 seen_exceptions = []
584 seen_exception_ids = set() # type: Set[int]
586 while (
587 exc_type is not None
588 and exc_value is not None
589 and id(exc_value) not in seen_exception_ids
590 ):
591 yield exc_type, exc_value, tb
593 # Avoid hashing random types we don't know anything
594 # about. Use the list to keep a ref so that the `id` is
595 # not used for another object.
596 seen_exceptions.append(exc_value)
597 seen_exception_ids.add(id(exc_value))
599 if exc_value.__suppress_context__:
600 cause = exc_value.__cause__
601 else:
602 cause = exc_value.__context__
603 if cause is None:
604 break
605 exc_type = type(cause)
606 exc_value = cause
607 tb = getattr(cause, "__traceback__", None)
609else:
611 def walk_exception_chain(exc_info):
612 # type: (ExcInfo) -> Iterator[ExcInfo]
613 yield exc_info
616def exceptions_from_error_tuple(
617 exc_info, # type: ExcInfo
618 client_options=None, # type: Optional[Dict[str, Any]]
619 mechanism=None, # type: Optional[Dict[str, Any]]
620):
621 # type: (...) -> List[Dict[str, Any]]
622 exc_type, exc_value, tb = exc_info
623 rv = []
624 for exc_type, exc_value, tb in walk_exception_chain(exc_info):
625 rv.append(
626 single_exception_from_error_tuple(
627 exc_type, exc_value, tb, client_options, mechanism
628 )
629 )
631 rv.reverse()
633 return rv
636def to_string(value):
637 # type: (str) -> str
638 try:
639 return text_type(value)
640 except UnicodeDecodeError:
641 return repr(value)[1:-1]
644def iter_event_stacktraces(event):
645 # type: (Dict[str, Any]) -> Iterator[Dict[str, Any]]
646 if "stacktrace" in event:
647 yield event["stacktrace"]
648 if "threads" in event:
649 for thread in event["threads"].get("values") or ():
650 if "stacktrace" in thread:
651 yield thread["stacktrace"]
652 if "exception" in event:
653 for exception in event["exception"].get("values") or ():
654 if "stacktrace" in exception:
655 yield exception["stacktrace"]
658def iter_event_frames(event):
659 # type: (Dict[str, Any]) -> Iterator[Dict[str, Any]]
660 for stacktrace in iter_event_stacktraces(event):
661 for frame in stacktrace.get("frames") or ():
662 yield frame
665def handle_in_app(event, in_app_exclude=None, in_app_include=None):
666 # type: (Dict[str, Any], Optional[List[str]], Optional[List[str]]) -> Dict[str, Any]
667 for stacktrace in iter_event_stacktraces(event):
668 handle_in_app_impl(
669 stacktrace.get("frames"),
670 in_app_exclude=in_app_exclude,
671 in_app_include=in_app_include,
672 )
674 return event
677def handle_in_app_impl(frames, in_app_exclude, in_app_include):
678 # type: (Any, Optional[List[str]], Optional[List[str]]) -> Optional[Any]
679 if not frames:
680 return None
682 any_in_app = False
683 for frame in frames:
684 in_app = frame.get("in_app")
685 if in_app is not None:
686 if in_app:
687 any_in_app = True
688 continue
690 module = frame.get("module")
691 if not module:
692 continue
693 elif _module_in_set(module, in_app_include):
694 frame["in_app"] = True
695 any_in_app = True
696 elif _module_in_set(module, in_app_exclude):
697 frame["in_app"] = False
699 if not any_in_app:
700 for frame in frames:
701 if frame.get("in_app") is None:
702 frame["in_app"] = True
704 return frames
707def exc_info_from_error(error):
708 # type: (Union[BaseException, ExcInfo]) -> ExcInfo
709 if isinstance(error, tuple) and len(error) == 3:
710 exc_type, exc_value, tb = error
711 elif isinstance(error, BaseException):
712 tb = getattr(error, "__traceback__", None)
713 if tb is not None:
714 exc_type = type(error)
715 exc_value = error
716 else:
717 exc_type, exc_value, tb = sys.exc_info()
718 if exc_value is not error:
719 tb = None
720 exc_value = error
721 exc_type = type(error)
723 else:
724 raise ValueError("Expected Exception object to report, got %s!" % type(error))
726 return exc_type, exc_value, tb
729def event_from_exception(
730 exc_info, # type: Union[BaseException, ExcInfo]
731 client_options=None, # type: Optional[Dict[str, Any]]
732 mechanism=None, # type: Optional[Dict[str, Any]]
733):
734 # type: (...) -> Tuple[Dict[str, Any], Dict[str, Any]]
735 exc_info = exc_info_from_error(exc_info)
736 hint = event_hint_with_exc_info(exc_info)
737 return (
738 {
739 "level": "error",
740 "exception": {
741 "values": exceptions_from_error_tuple(
742 exc_info, client_options, mechanism
743 )
744 },
745 },
746 hint,
747 )
750def _module_in_set(name, set):
751 # type: (str, Optional[List[str]]) -> bool
752 if not set:
753 return False
754 for item in set or ():
755 if item == name or name.startswith(item + "."):
756 return True
757 return False
760def strip_string(value, max_length=None):
761 # type: (str, Optional[int]) -> Union[AnnotatedValue, str]
762 # TODO: read max_length from config
763 if not value:
764 return value
766 if max_length is None:
767 # This is intentionally not just the default such that one can patch `MAX_STRING_LENGTH` and affect `strip_string`.
768 max_length = MAX_STRING_LENGTH
770 length = len(value)
772 if length > max_length:
773 return AnnotatedValue(
774 value=value[: max_length - 3] + "...",
775 metadata={
776 "len": length,
777 "rem": [["!limit", "x", max_length - 3, max_length]],
778 },
779 )
780 return value
783def _is_contextvars_broken():
784 # type: () -> bool
785 """
786 Returns whether gevent/eventlet have patched the stdlib in a way where thread locals are now more "correct" than contextvars.
787 """
788 try:
789 import gevent # type: ignore
790 from gevent.monkey import is_object_patched # type: ignore
792 # Get the MAJOR and MINOR version numbers of Gevent
793 version_tuple = tuple(
794 [int(part) for part in re.split(r"a|b|rc|\.", gevent.__version__)[:2]]
795 )
796 if is_object_patched("threading", "local"):
797 # Gevent 20.9.0 depends on Greenlet 0.4.17 which natively handles switching
798 # context vars when greenlets are switched, so, Gevent 20.9.0+ is all fine.
799 # Ref: https://github.com/gevent/gevent/blob/83c9e2ae5b0834b8f84233760aabe82c3ba065b4/src/gevent/monkey.py#L604-L609
800 # Gevent 20.5, that doesn't depend on Greenlet 0.4.17 with native support
801 # for contextvars, is able to patch both thread locals and contextvars, in
802 # that case, check if contextvars are effectively patched.
803 if (
804 # Gevent 20.9.0+
805 (sys.version_info >= (3, 7) and version_tuple >= (20, 9))
806 # Gevent 20.5.0+ or Python < 3.7
807 or (is_object_patched("contextvars", "ContextVar"))
808 ):
809 return False
811 return True
812 except ImportError:
813 pass
815 try:
816 from eventlet.patcher import is_monkey_patched # type: ignore
818 if is_monkey_patched("thread"):
819 return True
820 except ImportError:
821 pass
823 return False
826def _make_threadlocal_contextvars(local):
827 # type: (type) -> type
828 class ContextVar(object):
829 # Super-limited impl of ContextVar
831 def __init__(self, name):
832 # type: (str) -> None
833 self._name = name
834 self._local = local()
836 def get(self, default):
837 # type: (Any) -> Any
838 return getattr(self._local, "value", default)
840 def set(self, value):
841 # type: (Any) -> None
842 self._local.value = value
844 return ContextVar
847def _get_contextvars():
848 # type: () -> Tuple[bool, type]
849 """
850 Figure out the "right" contextvars installation to use. Returns a
851 `contextvars.ContextVar`-like class with a limited API.
853 See https://docs.sentry.io/platforms/python/contextvars/ for more information.
854 """
855 if not _is_contextvars_broken(): 855 ↛ 880line 855 didn't jump to line 880, because the condition on line 855 was never false
856 # aiocontextvars is a PyPI package that ensures that the contextvars
857 # backport (also a PyPI package) works with asyncio under Python 3.6
858 #
859 # Import it if available.
860 if sys.version_info < (3, 7): 860 ↛ 863line 860 didn't jump to line 863, because the condition on line 860 was never true
861 # `aiocontextvars` is absolutely required for functional
862 # contextvars on Python 3.6.
863 try:
864 from aiocontextvars import ContextVar # noqa
866 return True, ContextVar
867 except ImportError:
868 pass
869 else:
870 # On Python 3.7 contextvars are functional.
871 try:
872 from contextvars import ContextVar
874 return True, ContextVar
875 except ImportError:
876 pass
878 # Fall back to basic thread-local usage.
880 from threading import local
882 return False, _make_threadlocal_contextvars(local)
885HAS_REAL_CONTEXTVARS, ContextVar = _get_contextvars()
887CONTEXTVARS_ERROR_MESSAGE = """
889With asyncio/ASGI applications, the Sentry SDK requires a functional
890installation of `contextvars` to avoid leaking scope/context data across
891requests.
893Please refer to https://docs.sentry.io/platforms/python/contextvars/ for more information.
894"""
897def transaction_from_function(func):
898 # type: (Callable[..., Any]) -> Optional[str]
899 # Methods in Python 2
900 try:
901 return "%s.%s.%s" % (
902 func.im_class.__module__, # type: ignore
903 func.im_class.__name__, # type: ignore
904 func.__name__,
905 )
906 except Exception:
907 pass
909 func_qualname = (
910 getattr(func, "__qualname__", None) or getattr(func, "__name__", None) or None
911 ) # type: Optional[str]
913 if not func_qualname: 913 ↛ 915line 913 didn't jump to line 915, because the condition on line 913 was never true
914 # No idea what it is
915 return None
917 # Methods in Python 3
918 # Functions
919 # Classes
920 try:
921 return "%s.%s" % (func.__module__, func_qualname)
922 except Exception:
923 pass
925 # Possibly a lambda
926 return func_qualname
929disable_capture_event = ContextVar("disable_capture_event")
932class ServerlessTimeoutWarning(Exception): # noqa: N818
933 """Raised when a serverless method is about to reach its timeout."""
935 pass
938class TimeoutThread(threading.Thread):
939 """Creates a Thread which runs (sleeps) for a time duration equal to
940 waiting_time and raises a custom ServerlessTimeout exception.
941 """
943 def __init__(self, waiting_time, configured_timeout):
944 # type: (float, int) -> None
945 threading.Thread.__init__(self)
946 self.waiting_time = waiting_time
947 self.configured_timeout = configured_timeout
948 self._stop_event = threading.Event()
950 def stop(self):
951 # type: () -> None
952 self._stop_event.set()
954 def run(self):
955 # type: () -> None
957 self._stop_event.wait(self.waiting_time)
959 if self._stop_event.is_set():
960 return
962 integer_configured_timeout = int(self.configured_timeout)
964 # Setting up the exact integer value of configured time(in seconds)
965 if integer_configured_timeout < self.configured_timeout:
966 integer_configured_timeout = integer_configured_timeout + 1
968 # Raising Exception after timeout duration is reached
969 raise ServerlessTimeoutWarning(
970 "WARNING : Function is expected to get timed out. Configured timeout duration = {} seconds.".format(
971 integer_configured_timeout
972 )
973 )
976def to_base64(original):
977 # type: (str) -> Optional[str]
978 """
979 Convert a string to base64, via UTF-8. Returns None on invalid input.
980 """
981 base64_string = None
983 try:
984 utf8_bytes = original.encode("UTF-8")
985 base64_bytes = base64.b64encode(utf8_bytes)
986 base64_string = base64_bytes.decode("UTF-8")
987 except Exception as err:
988 logger.warning("Unable to encode {orig} to base64:".format(orig=original), err)
990 return base64_string
993def from_base64(base64_string):
994 # type: (str) -> Optional[str]
995 """
996 Convert a string from base64, via UTF-8. Returns None on invalid input.
997 """
998 utf8_string = None
1000 try:
1001 only_valid_chars = BASE64_ALPHABET.match(base64_string)
1002 assert only_valid_chars
1004 base64_bytes = base64_string.encode("UTF-8")
1005 utf8_bytes = base64.b64decode(base64_bytes)
1006 utf8_string = utf8_bytes.decode("UTF-8")
1007 except Exception as err:
1008 logger.warning(
1009 "Unable to decode {b64} from base64:".format(b64=base64_string), err
1010 )
1012 return utf8_string