Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/asgiref/local.py: 78%
64 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import random
2import string
3import sys
4import threading
5import weakref
8class Local:
9 """
10 A drop-in replacement for threading.locals that also works with asyncio
11 Tasks (via the current_task asyncio method), and passes locals through
12 sync_to_async and async_to_sync.
14 Specifically:
15 - Locals work per-coroutine on any thread not spawned using asgiref
16 - Locals work per-thread on any thread not spawned using asgiref
17 - Locals are shared with the parent coroutine when using sync_to_async
18 - Locals are shared with the parent thread when using async_to_sync
19 (and if that thread was launched using sync_to_async, with its parent
20 coroutine as well, with this working for indefinite levels of nesting)
22 Set thread_critical to True to not allow locals to pass from an async Task
23 to a thread it spawns. This is needed for code that truly needs
24 thread-safety, as opposed to things used for helpful context (e.g. sqlite
25 does not like being called from a different thread to the one it is from).
26 Thread-critical code will still be differentiated per-Task within a thread
27 as it is expected it does not like concurrent access.
29 This doesn't use contextvars as it needs to support 3.6. Once it can support
30 3.7 only, we can then reimplement the storage more nicely.
31 """
33 def __init__(self, thread_critical: bool = False) -> None:
34 self._thread_critical = thread_critical
35 self._thread_lock = threading.RLock()
36 self._context_refs: "weakref.WeakSet[object]" = weakref.WeakSet()
37 # Random suffixes stop accidental reuse between different Locals,
38 # though we try to force deletion as well.
39 self._attr_name = "_asgiref_local_impl_{}_{}".format(
40 id(self),
41 "".join(random.choice(string.ascii_letters) for i in range(8)),
42 )
44 def _get_context_id(self):
45 """
46 Get the ID we should use for looking up variables
47 """
48 # Prevent a circular reference
49 from .sync import AsyncToSync, SyncToAsync
51 # First, pull the current task if we can
52 context_id = SyncToAsync.get_current_task()
53 context_is_async = True
54 # OK, let's try for a thread ID
55 if context_id is None: 55 ↛ 59line 55 didn't jump to line 59, because the condition on line 55 was never false
56 context_id = threading.current_thread()
57 context_is_async = False
58 # If we're thread-critical, we stop here, as we can't share contexts.
59 if self._thread_critical:
60 return context_id
61 # Now, take those and see if we can resolve them through the launch maps
62 for i in range(sys.getrecursionlimit()): 62 ↛ 77line 62 didn't jump to line 77, because the loop on line 62 didn't complete
63 try:
64 if context_is_async: 64 ↛ 66line 64 didn't jump to line 66, because the condition on line 64 was never true
65 # Tasks have a source thread in AsyncToSync
66 context_id = AsyncToSync.launch_map[context_id]
67 context_is_async = False
68 else:
69 # Threads have a source task in SyncToAsync
70 context_id = SyncToAsync.launch_map[context_id]
71 context_is_async = True
72 except KeyError:
73 break
74 else:
75 # Catch infinite loops (they happen if you are screwing around
76 # with AsyncToSync implementations)
77 raise RuntimeError("Infinite launch_map loops")
78 return context_id
80 def _get_storage(self):
81 context_obj = self._get_context_id()
82 if not hasattr(context_obj, self._attr_name):
83 setattr(context_obj, self._attr_name, {})
84 self._context_refs.add(context_obj)
85 return getattr(context_obj, self._attr_name)
87 def __del__(self):
88 try:
89 for context_obj in self._context_refs:
90 try:
91 delattr(context_obj, self._attr_name)
92 except AttributeError:
93 pass
94 except TypeError:
95 # WeakSet.__iter__ can crash when interpreter is shutting down due
96 # to _IterationGuard being None.
97 pass
99 def __getattr__(self, key):
100 with self._thread_lock:
101 storage = self._get_storage()
102 if key in storage:
103 return storage[key]
104 else:
105 raise AttributeError(f"{self!r} object has no attribute {key!r}")
107 def __setattr__(self, key, value):
108 if key in ("_context_refs", "_thread_critical", "_thread_lock", "_attr_name"):
109 return super().__setattr__(key, value)
110 with self._thread_lock:
111 storage = self._get_storage()
112 storage[key] = value
114 def __delattr__(self, key):
115 with self._thread_lock:
116 storage = self._get_storage()
117 if key in storage: 117 ↛ 120line 117 didn't jump to line 120, because the condition on line 117 was never false
118 del storage[key]
119 else:
120 raise AttributeError(f"{self!r} object has no attribute {key!r}")