Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/asgiref/sync.py: 18%

234 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import asyncio 

2import asyncio.coroutines 

3import contextvars 

4import functools 

5import inspect 

6import os 

7import sys 

8import threading 

9import warnings 

10import weakref 

11from concurrent.futures import Future, ThreadPoolExecutor 

12from typing import Any, Callable, Dict, Optional, overload 

13 

14from .current_thread_executor import CurrentThreadExecutor 

15from .local import Local 

16 

17 

18def _restore_context(context): 

19 # Check for changes in contextvars, and set them to the current 

20 # context for downstream consumers 

21 for cvar in context: 

22 try: 

23 if cvar.get() != context.get(cvar): 

24 cvar.set(context.get(cvar)) 

25 except LookupError: 

26 cvar.set(context.get(cvar)) 

27 

28 

29def _iscoroutinefunction_or_partial(func: Any) -> bool: 

30 # Python < 3.8 does not correctly determine partially wrapped 

31 # coroutine functions are coroutine functions, hence the need for 

32 # this to exist. Code taken from CPython. 

33 if sys.version_info >= (3, 8): 

34 return asyncio.iscoroutinefunction(func) 

35 else: 

36 while inspect.ismethod(func): 

37 func = func.__func__ 

38 while isinstance(func, functools.partial): 

39 func = func.func 

40 

41 return asyncio.iscoroutinefunction(func) 

42 

43 

44class ThreadSensitiveContext: 

45 """Async context manager to manage context for thread sensitive mode 

46 

47 This context manager controls which thread pool executor is used when in 

48 thread sensitive mode. By default, a single thread pool executor is shared 

49 within a process. 

50 

51 In Python 3.7+, the ThreadSensitiveContext() context manager may be used to 

52 specify a thread pool per context. 

53 

54 This context manager is re-entrant, so only the outer-most call to 

55 ThreadSensitiveContext will set the context. 

56 

57 Usage: 

58 

59 >>> import time 

60 >>> async with ThreadSensitiveContext(): 

61 ... await sync_to_async(time.sleep, 1)() 

62 """ 

63 

64 def __init__(self): 

65 self.token = None 

66 

67 async def __aenter__(self): 

68 try: 

69 SyncToAsync.thread_sensitive_context.get() 

70 except LookupError: 

71 self.token = SyncToAsync.thread_sensitive_context.set(self) 

72 

73 return self 

74 

75 async def __aexit__(self, exc, value, tb): 

76 if not self.token: 

77 return 

78 

79 executor = SyncToAsync.context_to_thread_executor.pop(self, None) 

80 if executor: 

81 executor.shutdown() 

82 SyncToAsync.thread_sensitive_context.reset(self.token) 

83 

84 

85class AsyncToSync: 

86 """ 

87 Utility class which turns an awaitable that only works on the thread with 

88 the event loop into a synchronous callable that works in a subthread. 

89 

90 If the call stack contains an async loop, the code runs there. 

91 Otherwise, the code runs in a new loop in a new thread. 

92 

93 Either way, this thread then pauses and waits to run any thread_sensitive 

94 code called from further down the call stack using SyncToAsync, before 

95 finally exiting once the async task returns. 

96 """ 

97 

98 # Maps launched Tasks to the threads that launched them (for locals impl) 

99 launch_map: "Dict[asyncio.Task[object], threading.Thread]" = {} 

100 

101 # Keeps track of which CurrentThreadExecutor to use. This uses an asgiref 

102 # Local, not a threadlocal, so that tasks can work out what their parent used. 

103 executors = Local() 

104 

105 # When we can't find a CurrentThreadExecutor from the context, such as 

106 # inside create_task, we'll look it up here from the running event loop. 

107 loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {} 

108 

109 def __init__(self, awaitable, force_new_loop=False): 

110 if not callable(awaitable) or ( 

111 not _iscoroutinefunction_or_partial(awaitable) 

112 and not _iscoroutinefunction_or_partial( 

113 getattr(awaitable, "__call__", awaitable) 

114 ) 

115 ): 

116 # Python does not have very reliable detection of async functions 

117 # (lots of false negatives) so this is just a warning. 

118 warnings.warn( 

119 "async_to_sync was passed a non-async-marked callable", stacklevel=2 

120 ) 

121 self.awaitable = awaitable 

122 try: 

123 self.__self__ = self.awaitable.__self__ 

124 except AttributeError: 

125 pass 

126 if force_new_loop: 

127 # They have asked that we always run in a new sub-loop. 

128 self.main_event_loop = None 

129 else: 

130 try: 

131 self.main_event_loop = asyncio.get_running_loop() 

132 except RuntimeError: 

133 # There's no event loop in this thread. Look for the threadlocal if 

134 # we're inside SyncToAsync 

135 main_event_loop_pid = getattr( 

136 SyncToAsync.threadlocal, "main_event_loop_pid", None 

137 ) 

138 # We make sure the parent loop is from the same process - if 

139 # they've forked, this is not going to be valid any more (#194) 

140 if main_event_loop_pid and main_event_loop_pid == os.getpid(): 

141 self.main_event_loop = getattr( 

142 SyncToAsync.threadlocal, "main_event_loop", None 

143 ) 

144 else: 

145 self.main_event_loop = None 

146 

147 def __call__(self, *args, **kwargs): 

148 # You can't call AsyncToSync from a thread with a running event loop 

149 try: 

150 event_loop = asyncio.get_running_loop() 

151 except RuntimeError: 

152 pass 

153 else: 

154 if event_loop.is_running(): 

155 raise RuntimeError( 

156 "You cannot use AsyncToSync in the same thread as an async event loop - " 

157 "just await the async function directly." 

158 ) 

159 

160 # Wrapping context in list so it can be reassigned from within 

161 # `main_wrap`. 

162 context = [contextvars.copy_context()] 

163 

164 # Make a future for the return information 

165 call_result = Future() 

166 # Get the source thread 

167 source_thread = threading.current_thread() 

168 # Make a CurrentThreadExecutor we'll use to idle in this thread - we 

169 # need one for every sync frame, even if there's one above us in the 

170 # same thread. 

171 if hasattr(self.executors, "current"): 

172 old_current_executor = self.executors.current 

173 else: 

174 old_current_executor = None 

175 current_executor = CurrentThreadExecutor() 

176 self.executors.current = current_executor 

177 loop = None 

178 # Use call_soon_threadsafe to schedule a synchronous callback on the 

179 # main event loop's thread if it's there, otherwise make a new loop 

180 # in this thread. 

181 try: 

182 awaitable = self.main_wrap( 

183 args, kwargs, call_result, source_thread, sys.exc_info(), context 

184 ) 

185 

186 if not (self.main_event_loop and self.main_event_loop.is_running()): 

187 # Make our own event loop - in a new thread - and run inside that. 

188 loop = asyncio.new_event_loop() 

189 self.loop_thread_executors[loop] = current_executor 

190 loop_executor = ThreadPoolExecutor(max_workers=1) 

191 loop_future = loop_executor.submit( 

192 self._run_event_loop, loop, awaitable 

193 ) 

194 if current_executor: 

195 # Run the CurrentThreadExecutor until the future is done 

196 current_executor.run_until_future(loop_future) 

197 # Wait for future and/or allow for exception propagation 

198 loop_future.result() 

199 else: 

200 # Call it inside the existing loop 

201 self.main_event_loop.call_soon_threadsafe( 

202 self.main_event_loop.create_task, awaitable 

203 ) 

204 if current_executor: 

205 # Run the CurrentThreadExecutor until the future is done 

206 current_executor.run_until_future(call_result) 

207 finally: 

208 # Clean up any executor we were running 

209 if loop is not None: 

210 del self.loop_thread_executors[loop] 

211 if hasattr(self.executors, "current"): 

212 del self.executors.current 

213 if old_current_executor: 

214 self.executors.current = old_current_executor 

215 _restore_context(context[0]) 

216 

217 # Wait for results from the future. 

218 return call_result.result() 

219 

220 def _run_event_loop(self, loop, coro): 

221 """ 

222 Runs the given event loop (designed to be called in a thread). 

223 """ 

224 asyncio.set_event_loop(loop) 

225 try: 

226 loop.run_until_complete(coro) 

227 finally: 

228 try: 

229 # mimic asyncio.run() behavior 

230 # cancel unexhausted async generators 

231 tasks = asyncio.all_tasks(loop) 

232 for task in tasks: 

233 task.cancel() 

234 

235 async def gather(): 

236 await asyncio.gather(*tasks, return_exceptions=True) 

237 

238 loop.run_until_complete(gather()) 

239 for task in tasks: 

240 if task.cancelled(): 

241 continue 

242 if task.exception() is not None: 

243 loop.call_exception_handler( 

244 { 

245 "message": "unhandled exception during loop shutdown", 

246 "exception": task.exception(), 

247 "task": task, 

248 } 

249 ) 

250 if hasattr(loop, "shutdown_asyncgens"): 

251 loop.run_until_complete(loop.shutdown_asyncgens()) 

252 finally: 

253 loop.close() 

254 asyncio.set_event_loop(self.main_event_loop) 

255 

256 def __get__(self, parent, objtype): 

257 """ 

258 Include self for methods 

259 """ 

260 func = functools.partial(self.__call__, parent) 

261 return functools.update_wrapper(func, self.awaitable) 

262 

263 async def main_wrap( 

264 self, args, kwargs, call_result, source_thread, exc_info, context 

265 ): 

266 """ 

267 Wraps the awaitable with something that puts the result into the 

268 result/exception future. 

269 """ 

270 if context is not None: 

271 _restore_context(context[0]) 

272 

273 current_task = SyncToAsync.get_current_task() 

274 self.launch_map[current_task] = source_thread 

275 try: 

276 # If we have an exception, run the function inside the except block 

277 # after raising it so exc_info is correctly populated. 

278 if exc_info[1]: 

279 try: 

280 raise exc_info[1] 

281 except BaseException: 

282 result = await self.awaitable(*args, **kwargs) 

283 else: 

284 result = await self.awaitable(*args, **kwargs) 

285 except BaseException as e: 

286 call_result.set_exception(e) 

287 else: 

288 call_result.set_result(result) 

289 finally: 

290 del self.launch_map[current_task] 

291 

292 context[0] = contextvars.copy_context() 

293 

294 

295class SyncToAsync: 

296 """ 

297 Utility class which turns a synchronous callable into an awaitable that 

298 runs in a threadpool. It also sets a threadlocal inside the thread so 

299 calls to AsyncToSync can escape it. 

300 

301 If thread_sensitive is passed, the code will run in the same thread as any 

302 outer code. This is needed for underlying Python code that is not 

303 threadsafe (for example, code which handles SQLite database connections). 

304 

305 If the outermost program is async (i.e. SyncToAsync is outermost), then 

306 this will be a dedicated single sub-thread that all sync code runs in, 

307 one after the other. If the outermost program is sync (i.e. AsyncToSync is 

308 outermost), this will just be the main thread. This is achieved by idling 

309 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent, 

310 rather than just blocking. 

311 

312 If executor is passed in, that will be used instead of the loop's default executor. 

313 In order to pass in an executor, thread_sensitive must be set to False, otherwise 

314 a TypeError will be raised. 

315 """ 

316 

317 # If they've set ASGI_THREADS, update the default asyncio executor for now 

318 if "ASGI_THREADS" in os.environ: 318 ↛ 321line 318 didn't jump to line 321, because the condition on line 318 was never true

319 # We use get_event_loop here - not get_running_loop - as this will 

320 # be run at import time, and we want to update the main thread's loop. 

321 loop = asyncio.get_event_loop() 

322 loop.set_default_executor( 

323 ThreadPoolExecutor(max_workers=int(os.environ["ASGI_THREADS"])) 

324 ) 

325 

326 # Maps launched threads to the coroutines that spawned them 

327 launch_map: "Dict[threading.Thread, asyncio.Task[object]]" = {} 

328 

329 # Storage for main event loop references 

330 threadlocal = threading.local() 

331 

332 # Single-thread executor for thread-sensitive code 

333 single_thread_executor = ThreadPoolExecutor(max_workers=1) 

334 

335 # Maintain a contextvar for the current execution context. Optionally used 

336 # for thread sensitive mode. 

337 thread_sensitive_context: "contextvars.ContextVar[str]" = contextvars.ContextVar( 

338 "thread_sensitive_context" 

339 ) 

340 

341 # Contextvar that is used to detect if the single thread executor 

342 # would be awaited on while already being used in the same context 

343 deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar( 

344 "deadlock_context" 

345 ) 

346 

347 # Maintaining a weak reference to the context ensures that thread pools are 

348 # erased once the context goes out of scope. This terminates the thread pool. 

349 context_to_thread_executor: "weakref.WeakKeyDictionary[object, ThreadPoolExecutor]" = ( 

350 weakref.WeakKeyDictionary() 

351 ) 

352 

353 def __init__( 

354 self, 

355 func: Callable[..., Any], 

356 thread_sensitive: bool = True, 

357 executor: Optional["ThreadPoolExecutor"] = None, 

358 ) -> None: 

359 if ( 

360 not callable(func) 

361 or _iscoroutinefunction_or_partial(func) 

362 or _iscoroutinefunction_or_partial(getattr(func, "__call__", func)) 

363 ): 

364 raise TypeError("sync_to_async can only be applied to sync functions.") 

365 self.func = func 

366 functools.update_wrapper(self, func) 

367 self._thread_sensitive = thread_sensitive 

368 self._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore 

369 if thread_sensitive and executor is not None: 

370 raise TypeError("executor must not be set when thread_sensitive is True") 

371 self._executor = executor 

372 try: 

373 self.__self__ = func.__self__ # type: ignore 

374 except AttributeError: 

375 pass 

376 

377 async def __call__(self, *args, **kwargs): 

378 loop = asyncio.get_running_loop() 

379 

380 # Work out what thread to run the code in 

381 if self._thread_sensitive: 

382 if hasattr(AsyncToSync.executors, "current"): 

383 # If we have a parent sync thread above somewhere, use that 

384 executor = AsyncToSync.executors.current 

385 elif self.thread_sensitive_context and self.thread_sensitive_context.get( 

386 None 

387 ): 

388 # If we have a way of retrieving the current context, attempt 

389 # to use a per-context thread pool executor 

390 thread_sensitive_context = self.thread_sensitive_context.get() 

391 

392 if thread_sensitive_context in self.context_to_thread_executor: 

393 # Re-use thread executor in current context 

394 executor = self.context_to_thread_executor[thread_sensitive_context] 

395 else: 

396 # Create new thread executor in current context 

397 executor = ThreadPoolExecutor(max_workers=1) 

398 self.context_to_thread_executor[thread_sensitive_context] = executor 

399 elif loop in AsyncToSync.loop_thread_executors: 

400 # Re-use thread executor for running loop 

401 executor = AsyncToSync.loop_thread_executors[loop] 

402 elif self.deadlock_context and self.deadlock_context.get(False): 

403 raise RuntimeError( 

404 "Single thread executor already being used, would deadlock" 

405 ) 

406 else: 

407 # Otherwise, we run it in a fixed single thread 

408 executor = self.single_thread_executor 

409 if self.deadlock_context: 

410 self.deadlock_context.set(True) 

411 else: 

412 # Use the passed in executor, or the loop's default if it is None 

413 executor = self._executor 

414 

415 context = contextvars.copy_context() 

416 child = functools.partial(self.func, *args, **kwargs) 

417 func = context.run 

418 args = (child,) 

419 kwargs = {} 

420 

421 try: 

422 # Run the code in the right thread 

423 future = loop.run_in_executor( 

424 executor, 

425 functools.partial( 

426 self.thread_handler, 

427 loop, 

428 self.get_current_task(), 

429 sys.exc_info(), 

430 func, 

431 *args, 

432 **kwargs, 

433 ), 

434 ) 

435 ret = await asyncio.wait_for(future, timeout=None) 

436 

437 finally: 

438 _restore_context(context) 

439 if self.deadlock_context: 

440 self.deadlock_context.set(False) 

441 

442 return ret 

443 

444 def __get__(self, parent, objtype): 

445 """ 

446 Include self for methods 

447 """ 

448 return functools.partial(self.__call__, parent) 

449 

450 def thread_handler(self, loop, source_task, exc_info, func, *args, **kwargs): 

451 """ 

452 Wraps the sync application with exception handling. 

453 """ 

454 # Set the threadlocal for AsyncToSync 

455 self.threadlocal.main_event_loop = loop 

456 self.threadlocal.main_event_loop_pid = os.getpid() 

457 # Set the task mapping (used for the locals module) 

458 current_thread = threading.current_thread() 

459 if AsyncToSync.launch_map.get(source_task) == current_thread: 

460 # Our parent task was launched from this same thread, so don't make 

461 # a launch map entry - let it shortcut over us! (and stop infinite loops) 

462 parent_set = False 

463 else: 

464 self.launch_map[current_thread] = source_task 

465 parent_set = True 

466 # Run the function 

467 try: 

468 # If we have an exception, run the function inside the except block 

469 # after raising it so exc_info is correctly populated. 

470 if exc_info[1]: 

471 try: 

472 raise exc_info[1] 

473 except BaseException: 

474 return func(*args, **kwargs) 

475 else: 

476 return func(*args, **kwargs) 

477 finally: 

478 # Only delete the launch_map parent if we set it, otherwise it is 

479 # from someone else. 

480 if parent_set: 

481 del self.launch_map[current_thread] 

482 

483 @staticmethod 

484 def get_current_task(): 

485 """ 

486 Implementation of asyncio.current_task() 

487 that returns None if there is no task. 

488 """ 

489 try: 

490 return asyncio.current_task() 

491 except RuntimeError: 

492 return None 

493 

494 

495# Lowercase aliases (and decorator friendliness) 

496async_to_sync = AsyncToSync 

497 

498 

499@overload 

500def sync_to_async( 

501 func: None = None, 

502 thread_sensitive: bool = True, 

503 executor: Optional["ThreadPoolExecutor"] = None, 

504) -> Callable[[Callable[..., Any]], SyncToAsync]: 

505 ... 

506 

507 

508@overload 

509def sync_to_async( 

510 func: Callable[..., Any], 

511 thread_sensitive: bool = True, 

512 executor: Optional["ThreadPoolExecutor"] = None, 

513) -> SyncToAsync: 

514 ... 

515 

516 

517def sync_to_async( 

518 func=None, 

519 thread_sensitive=True, 

520 executor=None, 

521): 

522 if func is None: 

523 return lambda f: SyncToAsync( 

524 f, 

525 thread_sensitive=thread_sensitive, 

526 executor=executor, 

527 ) 

528 return SyncToAsync( 

529 func, 

530 thread_sensitive=thread_sensitive, 

531 executor=executor, 

532 )