Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/asgiref/sync.py: 18%
234 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import asyncio
2import asyncio.coroutines
3import contextvars
4import functools
5import inspect
6import os
7import sys
8import threading
9import warnings
10import weakref
11from concurrent.futures import Future, ThreadPoolExecutor
12from typing import Any, Callable, Dict, Optional, overload
14from .current_thread_executor import CurrentThreadExecutor
15from .local import Local
18def _restore_context(context):
19 # Check for changes in contextvars, and set them to the current
20 # context for downstream consumers
21 for cvar in context:
22 try:
23 if cvar.get() != context.get(cvar):
24 cvar.set(context.get(cvar))
25 except LookupError:
26 cvar.set(context.get(cvar))
29def _iscoroutinefunction_or_partial(func: Any) -> bool:
30 # Python < 3.8 does not correctly determine partially wrapped
31 # coroutine functions are coroutine functions, hence the need for
32 # this to exist. Code taken from CPython.
33 if sys.version_info >= (3, 8):
34 return asyncio.iscoroutinefunction(func)
35 else:
36 while inspect.ismethod(func):
37 func = func.__func__
38 while isinstance(func, functools.partial):
39 func = func.func
41 return asyncio.iscoroutinefunction(func)
44class ThreadSensitiveContext:
45 """Async context manager to manage context for thread sensitive mode
47 This context manager controls which thread pool executor is used when in
48 thread sensitive mode. By default, a single thread pool executor is shared
49 within a process.
51 In Python 3.7+, the ThreadSensitiveContext() context manager may be used to
52 specify a thread pool per context.
54 This context manager is re-entrant, so only the outer-most call to
55 ThreadSensitiveContext will set the context.
57 Usage:
59 >>> import time
60 >>> async with ThreadSensitiveContext():
61 ... await sync_to_async(time.sleep, 1)()
62 """
64 def __init__(self):
65 self.token = None
67 async def __aenter__(self):
68 try:
69 SyncToAsync.thread_sensitive_context.get()
70 except LookupError:
71 self.token = SyncToAsync.thread_sensitive_context.set(self)
73 return self
75 async def __aexit__(self, exc, value, tb):
76 if not self.token:
77 return
79 executor = SyncToAsync.context_to_thread_executor.pop(self, None)
80 if executor:
81 executor.shutdown()
82 SyncToAsync.thread_sensitive_context.reset(self.token)
85class AsyncToSync:
86 """
87 Utility class which turns an awaitable that only works on the thread with
88 the event loop into a synchronous callable that works in a subthread.
90 If the call stack contains an async loop, the code runs there.
91 Otherwise, the code runs in a new loop in a new thread.
93 Either way, this thread then pauses and waits to run any thread_sensitive
94 code called from further down the call stack using SyncToAsync, before
95 finally exiting once the async task returns.
96 """
98 # Maps launched Tasks to the threads that launched them (for locals impl)
99 launch_map: "Dict[asyncio.Task[object], threading.Thread]" = {}
101 # Keeps track of which CurrentThreadExecutor to use. This uses an asgiref
102 # Local, not a threadlocal, so that tasks can work out what their parent used.
103 executors = Local()
105 # When we can't find a CurrentThreadExecutor from the context, such as
106 # inside create_task, we'll look it up here from the running event loop.
107 loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {}
109 def __init__(self, awaitable, force_new_loop=False):
110 if not callable(awaitable) or (
111 not _iscoroutinefunction_or_partial(awaitable)
112 and not _iscoroutinefunction_or_partial(
113 getattr(awaitable, "__call__", awaitable)
114 )
115 ):
116 # Python does not have very reliable detection of async functions
117 # (lots of false negatives) so this is just a warning.
118 warnings.warn(
119 "async_to_sync was passed a non-async-marked callable", stacklevel=2
120 )
121 self.awaitable = awaitable
122 try:
123 self.__self__ = self.awaitable.__self__
124 except AttributeError:
125 pass
126 if force_new_loop:
127 # They have asked that we always run in a new sub-loop.
128 self.main_event_loop = None
129 else:
130 try:
131 self.main_event_loop = asyncio.get_running_loop()
132 except RuntimeError:
133 # There's no event loop in this thread. Look for the threadlocal if
134 # we're inside SyncToAsync
135 main_event_loop_pid = getattr(
136 SyncToAsync.threadlocal, "main_event_loop_pid", None
137 )
138 # We make sure the parent loop is from the same process - if
139 # they've forked, this is not going to be valid any more (#194)
140 if main_event_loop_pid and main_event_loop_pid == os.getpid():
141 self.main_event_loop = getattr(
142 SyncToAsync.threadlocal, "main_event_loop", None
143 )
144 else:
145 self.main_event_loop = None
147 def __call__(self, *args, **kwargs):
148 # You can't call AsyncToSync from a thread with a running event loop
149 try:
150 event_loop = asyncio.get_running_loop()
151 except RuntimeError:
152 pass
153 else:
154 if event_loop.is_running():
155 raise RuntimeError(
156 "You cannot use AsyncToSync in the same thread as an async event loop - "
157 "just await the async function directly."
158 )
160 # Wrapping context in list so it can be reassigned from within
161 # `main_wrap`.
162 context = [contextvars.copy_context()]
164 # Make a future for the return information
165 call_result = Future()
166 # Get the source thread
167 source_thread = threading.current_thread()
168 # Make a CurrentThreadExecutor we'll use to idle in this thread - we
169 # need one for every sync frame, even if there's one above us in the
170 # same thread.
171 if hasattr(self.executors, "current"):
172 old_current_executor = self.executors.current
173 else:
174 old_current_executor = None
175 current_executor = CurrentThreadExecutor()
176 self.executors.current = current_executor
177 loop = None
178 # Use call_soon_threadsafe to schedule a synchronous callback on the
179 # main event loop's thread if it's there, otherwise make a new loop
180 # in this thread.
181 try:
182 awaitable = self.main_wrap(
183 args, kwargs, call_result, source_thread, sys.exc_info(), context
184 )
186 if not (self.main_event_loop and self.main_event_loop.is_running()):
187 # Make our own event loop - in a new thread - and run inside that.
188 loop = asyncio.new_event_loop()
189 self.loop_thread_executors[loop] = current_executor
190 loop_executor = ThreadPoolExecutor(max_workers=1)
191 loop_future = loop_executor.submit(
192 self._run_event_loop, loop, awaitable
193 )
194 if current_executor:
195 # Run the CurrentThreadExecutor until the future is done
196 current_executor.run_until_future(loop_future)
197 # Wait for future and/or allow for exception propagation
198 loop_future.result()
199 else:
200 # Call it inside the existing loop
201 self.main_event_loop.call_soon_threadsafe(
202 self.main_event_loop.create_task, awaitable
203 )
204 if current_executor:
205 # Run the CurrentThreadExecutor until the future is done
206 current_executor.run_until_future(call_result)
207 finally:
208 # Clean up any executor we were running
209 if loop is not None:
210 del self.loop_thread_executors[loop]
211 if hasattr(self.executors, "current"):
212 del self.executors.current
213 if old_current_executor:
214 self.executors.current = old_current_executor
215 _restore_context(context[0])
217 # Wait for results from the future.
218 return call_result.result()
220 def _run_event_loop(self, loop, coro):
221 """
222 Runs the given event loop (designed to be called in a thread).
223 """
224 asyncio.set_event_loop(loop)
225 try:
226 loop.run_until_complete(coro)
227 finally:
228 try:
229 # mimic asyncio.run() behavior
230 # cancel unexhausted async generators
231 tasks = asyncio.all_tasks(loop)
232 for task in tasks:
233 task.cancel()
235 async def gather():
236 await asyncio.gather(*tasks, return_exceptions=True)
238 loop.run_until_complete(gather())
239 for task in tasks:
240 if task.cancelled():
241 continue
242 if task.exception() is not None:
243 loop.call_exception_handler(
244 {
245 "message": "unhandled exception during loop shutdown",
246 "exception": task.exception(),
247 "task": task,
248 }
249 )
250 if hasattr(loop, "shutdown_asyncgens"):
251 loop.run_until_complete(loop.shutdown_asyncgens())
252 finally:
253 loop.close()
254 asyncio.set_event_loop(self.main_event_loop)
256 def __get__(self, parent, objtype):
257 """
258 Include self for methods
259 """
260 func = functools.partial(self.__call__, parent)
261 return functools.update_wrapper(func, self.awaitable)
263 async def main_wrap(
264 self, args, kwargs, call_result, source_thread, exc_info, context
265 ):
266 """
267 Wraps the awaitable with something that puts the result into the
268 result/exception future.
269 """
270 if context is not None:
271 _restore_context(context[0])
273 current_task = SyncToAsync.get_current_task()
274 self.launch_map[current_task] = source_thread
275 try:
276 # If we have an exception, run the function inside the except block
277 # after raising it so exc_info is correctly populated.
278 if exc_info[1]:
279 try:
280 raise exc_info[1]
281 except BaseException:
282 result = await self.awaitable(*args, **kwargs)
283 else:
284 result = await self.awaitable(*args, **kwargs)
285 except BaseException as e:
286 call_result.set_exception(e)
287 else:
288 call_result.set_result(result)
289 finally:
290 del self.launch_map[current_task]
292 context[0] = contextvars.copy_context()
295class SyncToAsync:
296 """
297 Utility class which turns a synchronous callable into an awaitable that
298 runs in a threadpool. It also sets a threadlocal inside the thread so
299 calls to AsyncToSync can escape it.
301 If thread_sensitive is passed, the code will run in the same thread as any
302 outer code. This is needed for underlying Python code that is not
303 threadsafe (for example, code which handles SQLite database connections).
305 If the outermost program is async (i.e. SyncToAsync is outermost), then
306 this will be a dedicated single sub-thread that all sync code runs in,
307 one after the other. If the outermost program is sync (i.e. AsyncToSync is
308 outermost), this will just be the main thread. This is achieved by idling
309 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent,
310 rather than just blocking.
312 If executor is passed in, that will be used instead of the loop's default executor.
313 In order to pass in an executor, thread_sensitive must be set to False, otherwise
314 a TypeError will be raised.
315 """
317 # If they've set ASGI_THREADS, update the default asyncio executor for now
318 if "ASGI_THREADS" in os.environ: 318 ↛ 321line 318 didn't jump to line 321, because the condition on line 318 was never true
319 # We use get_event_loop here - not get_running_loop - as this will
320 # be run at import time, and we want to update the main thread's loop.
321 loop = asyncio.get_event_loop()
322 loop.set_default_executor(
323 ThreadPoolExecutor(max_workers=int(os.environ["ASGI_THREADS"]))
324 )
326 # Maps launched threads to the coroutines that spawned them
327 launch_map: "Dict[threading.Thread, asyncio.Task[object]]" = {}
329 # Storage for main event loop references
330 threadlocal = threading.local()
332 # Single-thread executor for thread-sensitive code
333 single_thread_executor = ThreadPoolExecutor(max_workers=1)
335 # Maintain a contextvar for the current execution context. Optionally used
336 # for thread sensitive mode.
337 thread_sensitive_context: "contextvars.ContextVar[str]" = contextvars.ContextVar(
338 "thread_sensitive_context"
339 )
341 # Contextvar that is used to detect if the single thread executor
342 # would be awaited on while already being used in the same context
343 deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar(
344 "deadlock_context"
345 )
347 # Maintaining a weak reference to the context ensures that thread pools are
348 # erased once the context goes out of scope. This terminates the thread pool.
349 context_to_thread_executor: "weakref.WeakKeyDictionary[object, ThreadPoolExecutor]" = (
350 weakref.WeakKeyDictionary()
351 )
353 def __init__(
354 self,
355 func: Callable[..., Any],
356 thread_sensitive: bool = True,
357 executor: Optional["ThreadPoolExecutor"] = None,
358 ) -> None:
359 if (
360 not callable(func)
361 or _iscoroutinefunction_or_partial(func)
362 or _iscoroutinefunction_or_partial(getattr(func, "__call__", func))
363 ):
364 raise TypeError("sync_to_async can only be applied to sync functions.")
365 self.func = func
366 functools.update_wrapper(self, func)
367 self._thread_sensitive = thread_sensitive
368 self._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore
369 if thread_sensitive and executor is not None:
370 raise TypeError("executor must not be set when thread_sensitive is True")
371 self._executor = executor
372 try:
373 self.__self__ = func.__self__ # type: ignore
374 except AttributeError:
375 pass
377 async def __call__(self, *args, **kwargs):
378 loop = asyncio.get_running_loop()
380 # Work out what thread to run the code in
381 if self._thread_sensitive:
382 if hasattr(AsyncToSync.executors, "current"):
383 # If we have a parent sync thread above somewhere, use that
384 executor = AsyncToSync.executors.current
385 elif self.thread_sensitive_context and self.thread_sensitive_context.get(
386 None
387 ):
388 # If we have a way of retrieving the current context, attempt
389 # to use a per-context thread pool executor
390 thread_sensitive_context = self.thread_sensitive_context.get()
392 if thread_sensitive_context in self.context_to_thread_executor:
393 # Re-use thread executor in current context
394 executor = self.context_to_thread_executor[thread_sensitive_context]
395 else:
396 # Create new thread executor in current context
397 executor = ThreadPoolExecutor(max_workers=1)
398 self.context_to_thread_executor[thread_sensitive_context] = executor
399 elif loop in AsyncToSync.loop_thread_executors:
400 # Re-use thread executor for running loop
401 executor = AsyncToSync.loop_thread_executors[loop]
402 elif self.deadlock_context and self.deadlock_context.get(False):
403 raise RuntimeError(
404 "Single thread executor already being used, would deadlock"
405 )
406 else:
407 # Otherwise, we run it in a fixed single thread
408 executor = self.single_thread_executor
409 if self.deadlock_context:
410 self.deadlock_context.set(True)
411 else:
412 # Use the passed in executor, or the loop's default if it is None
413 executor = self._executor
415 context = contextvars.copy_context()
416 child = functools.partial(self.func, *args, **kwargs)
417 func = context.run
418 args = (child,)
419 kwargs = {}
421 try:
422 # Run the code in the right thread
423 future = loop.run_in_executor(
424 executor,
425 functools.partial(
426 self.thread_handler,
427 loop,
428 self.get_current_task(),
429 sys.exc_info(),
430 func,
431 *args,
432 **kwargs,
433 ),
434 )
435 ret = await asyncio.wait_for(future, timeout=None)
437 finally:
438 _restore_context(context)
439 if self.deadlock_context:
440 self.deadlock_context.set(False)
442 return ret
444 def __get__(self, parent, objtype):
445 """
446 Include self for methods
447 """
448 return functools.partial(self.__call__, parent)
450 def thread_handler(self, loop, source_task, exc_info, func, *args, **kwargs):
451 """
452 Wraps the sync application with exception handling.
453 """
454 # Set the threadlocal for AsyncToSync
455 self.threadlocal.main_event_loop = loop
456 self.threadlocal.main_event_loop_pid = os.getpid()
457 # Set the task mapping (used for the locals module)
458 current_thread = threading.current_thread()
459 if AsyncToSync.launch_map.get(source_task) == current_thread:
460 # Our parent task was launched from this same thread, so don't make
461 # a launch map entry - let it shortcut over us! (and stop infinite loops)
462 parent_set = False
463 else:
464 self.launch_map[current_thread] = source_task
465 parent_set = True
466 # Run the function
467 try:
468 # If we have an exception, run the function inside the except block
469 # after raising it so exc_info is correctly populated.
470 if exc_info[1]:
471 try:
472 raise exc_info[1]
473 except BaseException:
474 return func(*args, **kwargs)
475 else:
476 return func(*args, **kwargs)
477 finally:
478 # Only delete the launch_map parent if we set it, otherwise it is
479 # from someone else.
480 if parent_set:
481 del self.launch_map[current_thread]
483 @staticmethod
484 def get_current_task():
485 """
486 Implementation of asyncio.current_task()
487 that returns None if there is no task.
488 """
489 try:
490 return asyncio.current_task()
491 except RuntimeError:
492 return None
495# Lowercase aliases (and decorator friendliness)
496async_to_sync = AsyncToSync
499@overload
500def sync_to_async(
501 func: None = None,
502 thread_sensitive: bool = True,
503 executor: Optional["ThreadPoolExecutor"] = None,
504) -> Callable[[Callable[..., Any]], SyncToAsync]:
505 ...
508@overload
509def sync_to_async(
510 func: Callable[..., Any],
511 thread_sensitive: bool = True,
512 executor: Optional["ThreadPoolExecutor"] = None,
513) -> SyncToAsync:
514 ...
517def sync_to_async(
518 func=None,
519 thread_sensitive=True,
520 executor=None,
521):
522 if func is None:
523 return lambda f: SyncToAsync(
524 f,
525 thread_sensitive=thread_sensitive,
526 executor=executor,
527 )
528 return SyncToAsync(
529 func,
530 thread_sensitive=thread_sensitive,
531 executor=executor,
532 )