Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/asgiref/current_thread_executor.py: 24%

43 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import queue 

2import threading 

3from concurrent.futures import Executor, Future 

4 

5 

6class _WorkItem: 

7 """ 

8 Represents an item needing to be run in the executor. 

9 Copied from ThreadPoolExecutor (but it's private, so we're not going to rely on importing it) 

10 """ 

11 

12 def __init__(self, future, fn, args, kwargs): 

13 self.future = future 

14 self.fn = fn 

15 self.args = args 

16 self.kwargs = kwargs 

17 

18 def run(self): 

19 if not self.future.set_running_or_notify_cancel(): 

20 return 

21 try: 

22 result = self.fn(*self.args, **self.kwargs) 

23 except BaseException as exc: 

24 self.future.set_exception(exc) 

25 # Break a reference cycle with the exception 'exc' 

26 self = None 

27 else: 

28 self.future.set_result(result) 

29 

30 

31class CurrentThreadExecutor(Executor): 

32 """ 

33 An Executor that actually runs code in the thread it is instantiated in. 

34 Passed to other threads running async code, so they can run sync code in 

35 the thread they came from. 

36 """ 

37 

38 def __init__(self): 

39 self._work_thread = threading.current_thread() 

40 self._work_queue = queue.Queue() 

41 self._broken = False 

42 

43 def run_until_future(self, future): 

44 """ 

45 Runs the code in the work queue until a result is available from the future. 

46 Should be run from the thread the executor is initialised in. 

47 """ 

48 # Check we're in the right thread 

49 if threading.current_thread() != self._work_thread: 

50 raise RuntimeError( 

51 "You cannot run CurrentThreadExecutor from a different thread" 

52 ) 

53 future.add_done_callback(self._work_queue.put) 

54 # Keep getting and running work items until we get the future we're waiting for 

55 # back via the future's done callback. 

56 try: 

57 while True: 

58 # Get a work item and run it 

59 work_item = self._work_queue.get() 

60 if work_item is future: 

61 return 

62 work_item.run() 

63 del work_item 

64 finally: 

65 self._broken = True 

66 

67 def submit(self, fn, *args, **kwargs): 

68 # Check they're not submitting from the same thread 

69 if threading.current_thread() == self._work_thread: 

70 raise RuntimeError( 

71 "You cannot submit onto CurrentThreadExecutor from its own thread" 

72 ) 

73 # Check they're not too late or the executor errored 

74 if self._broken: 

75 raise RuntimeError("CurrentThreadExecutor already quit or is broken") 

76 # Add to work queue 

77 f = Future() 

78 work_item = _WorkItem(f, fn, args, kwargs) 

79 self._work_queue.put(work_item) 

80 # Return the future 

81 return f