Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/asgiref/local.py: 78%

64 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import random 

2import string 

3import sys 

4import threading 

5import weakref 

6 

7 

8class Local: 

9 """ 

10 A drop-in replacement for threading.locals that also works with asyncio 

11 Tasks (via the current_task asyncio method), and passes locals through 

12 sync_to_async and async_to_sync. 

13 

14 Specifically: 

15 - Locals work per-coroutine on any thread not spawned using asgiref 

16 - Locals work per-thread on any thread not spawned using asgiref 

17 - Locals are shared with the parent coroutine when using sync_to_async 

18 - Locals are shared with the parent thread when using async_to_sync 

19 (and if that thread was launched using sync_to_async, with its parent 

20 coroutine as well, with this working for indefinite levels of nesting) 

21 

22 Set thread_critical to True to not allow locals to pass from an async Task 

23 to a thread it spawns. This is needed for code that truly needs 

24 thread-safety, as opposed to things used for helpful context (e.g. sqlite 

25 does not like being called from a different thread to the one it is from). 

26 Thread-critical code will still be differentiated per-Task within a thread 

27 as it is expected it does not like concurrent access. 

28 

29 This doesn't use contextvars as it needs to support 3.6. Once it can support 

30 3.7 only, we can then reimplement the storage more nicely. 

31 """ 

32 

33 def __init__(self, thread_critical: bool = False) -> None: 

34 self._thread_critical = thread_critical 

35 self._thread_lock = threading.RLock() 

36 self._context_refs: "weakref.WeakSet[object]" = weakref.WeakSet() 

37 # Random suffixes stop accidental reuse between different Locals, 

38 # though we try to force deletion as well. 

39 self._attr_name = "_asgiref_local_impl_{}_{}".format( 

40 id(self), 

41 "".join(random.choice(string.ascii_letters) for i in range(8)), 

42 ) 

43 

44 def _get_context_id(self): 

45 """ 

46 Get the ID we should use for looking up variables 

47 """ 

48 # Prevent a circular reference 

49 from .sync import AsyncToSync, SyncToAsync 

50 

51 # First, pull the current task if we can 

52 context_id = SyncToAsync.get_current_task() 

53 context_is_async = True 

54 # OK, let's try for a thread ID 

55 if context_id is None: 55 ↛ 59line 55 didn't jump to line 59, because the condition on line 55 was never false

56 context_id = threading.current_thread() 

57 context_is_async = False 

58 # If we're thread-critical, we stop here, as we can't share contexts. 

59 if self._thread_critical: 

60 return context_id 

61 # Now, take those and see if we can resolve them through the launch maps 

62 for i in range(sys.getrecursionlimit()): 62 ↛ 77line 62 didn't jump to line 77, because the loop on line 62 didn't complete

63 try: 

64 if context_is_async: 64 ↛ 66line 64 didn't jump to line 66, because the condition on line 64 was never true

65 # Tasks have a source thread in AsyncToSync 

66 context_id = AsyncToSync.launch_map[context_id] 

67 context_is_async = False 

68 else: 

69 # Threads have a source task in SyncToAsync 

70 context_id = SyncToAsync.launch_map[context_id] 

71 context_is_async = True 

72 except KeyError: 

73 break 

74 else: 

75 # Catch infinite loops (they happen if you are screwing around 

76 # with AsyncToSync implementations) 

77 raise RuntimeError("Infinite launch_map loops") 

78 return context_id 

79 

80 def _get_storage(self): 

81 context_obj = self._get_context_id() 

82 if not hasattr(context_obj, self._attr_name): 

83 setattr(context_obj, self._attr_name, {}) 

84 self._context_refs.add(context_obj) 

85 return getattr(context_obj, self._attr_name) 

86 

87 def __del__(self): 

88 try: 

89 for context_obj in self._context_refs: 

90 try: 

91 delattr(context_obj, self._attr_name) 

92 except AttributeError: 

93 pass 

94 except TypeError: 

95 # WeakSet.__iter__ can crash when interpreter is shutting down due 

96 # to _IterationGuard being None. 

97 pass 

98 

99 def __getattr__(self, key): 

100 with self._thread_lock: 

101 storage = self._get_storage() 

102 if key in storage: 

103 return storage[key] 

104 else: 

105 raise AttributeError(f"{self!r} object has no attribute {key!r}") 

106 

107 def __setattr__(self, key, value): 

108 if key in ("_context_refs", "_thread_critical", "_thread_lock", "_attr_name"): 

109 return super().__setattr__(key, value) 

110 with self._thread_lock: 

111 storage = self._get_storage() 

112 storage[key] = value 

113 

114 def __delattr__(self, key): 

115 with self._thread_lock: 

116 storage = self._get_storage() 

117 if key in storage: 117 ↛ 120line 117 didn't jump to line 120, because the condition on line 117 was never false

118 del storage[key] 

119 else: 

120 raise AttributeError(f"{self!r} object has no attribute {key!r}")