Source code for qlauncher.launcher.aql.aql_task
1"""Wrapper for QLauncher that enables the user to launch tasks asynchronously (futures + multiprocessing)"""
2from collections.abc import Callable
3from typing import Any
4from threading import Event, Thread
5import time
6import weakref
7import multiprocess
8
9from pathos.multiprocessing import _ProcessPool
10
11from qlauncher.base.base import Result
12
13
[docs]
14def get_timeout(max_timeout: int | float | None, start: int | float) -> float | None:
15 """
16 Get timeout to wait on an event, useful when awaiting multiple tasks and total timeout must be max_timeout.
17
18 Args:
19 max_timeout (int | float | None): Total allowed timeout, None = infinite wait.
20 start (int | float): Await start timestamp (time.time())
21
22 Returns:
23 int | float | None: Remaining timeout or None if max_timeout was None
24 """
25 if max_timeout is None:
26 return None
27 return max_timeout - (time.time() - start)
28
29
30class _InnerAQLTask:
31 """
32 Task object returned to user, so that dependencies can be created.
33
34 Attributes:
35 task (Callable): function that gets executed asynchronously
36 dependencies (list[AQLTask]): Optional dependencies. The task will wait for all its dependencies to finish, before starting.
37 callbacks (list[Callable]): Callbacks ran when the task finishes executing.
38 Task result is inserted as an argument to the function.
39 pipe_dependencies (bool): If True results of tasks defined as dependencies will be passed as arguments to self.task.
40 Defaults to False.
41 """
42
43 def __init__(
44 self,
45 task: Callable,
46 dependencies: list['AQLTask'] | None = None,
47 callbacks: list[Callable] | None = None,
48 pipe_dependencies: bool = False
49 ) -> None:
50
51 self.task = task
52 self.dependencies = dependencies if dependencies is not None else []
53 self.callbacks = callbacks if callbacks is not None else []
54 self.pipe_dependencies = pipe_dependencies
55
56 self._cancelled = False
57 self._thread = None
58 self._pool = _ProcessPool(processes=1)
59 self._thread_made = Event()
60
61 self._result = None
62 self._done = False
63
64 def _shutdown_subprocess(self):
65 self._pool.close()
66 self._pool.terminate()
67 self._pool.join()
68
69 def _async_task(self) -> Any:
70 dep_results = [d.result() for d in self.dependencies]
71
72 if self._cancelled:
73 self._result = None
74 self._done = True
75 return
76
77 res = self._pool.apply_async(self.task, args=(dep_results if self.pipe_dependencies else []))
78 # Turns out you can't just outright kill threads (or futures) is so I have to do this, so that the thread knows to exit.
79 while not self._cancelled:
80 try:
81 self._result = res.get(timeout=0.05)
82 self._done = True
83 return
84 except multiprocess.context.TimeoutError:
85 pass # task not ready, check for cancel
86 # For any other error originating from the task, shutdown and clean up subprocess then raise error again.
87 except Exception as e:
88 self._shutdown_subprocess()
89 self._result = e
90 self._done = True
91 return
92
93 if self._cancelled:
94 self._shutdown_subprocess() # kill res process
95
96 self._result = res.get() if res.ready() else None
97 self._done = True
98 return
99
100 def _target_task(self):
101 # Main task + callbacks launch
102 self._async_task()
103 for cb in self.callbacks:
104 cb(self._result)
105
106 def _set_thread(self):
107 self._thread = Thread(target=weakref.proxy(self)._target_task, daemon=True) # set daemon so that thread quits as main process quits
108 self._thread.start()
109 self._thread_made.set()
110
111 def start(self):
112 """Start task execution."""
113 if self._thread is not None or self._cancelled:
114 raise ValueError("Cannot start, task already started or cancelled.")
115 self._set_thread()
116
117 def cancel(self) -> bool:
118 """
119 Attempt to cancel the task.
120
121 Returns:
122 bool: True if cancellation was successful
123 """
124 self._cancelled = True
125 if self._thread is None:
126 return True
127 self._thread.join(0.1)
128 return not self._thread.is_alive()
129
130 def cancelled(self) -> bool:
131 """
132 Returns:
133 bool: True if the task was cancelled by the user.
134 """
135 return self._cancelled
136
137 def done(self) -> bool:
138 """
139 Returns:
140 bool: True if the task had finished execution.
141 """
142 return self._done
143
144 def running(self) -> bool:
145 """
146 Returns:
147 bool: True if the task is currently executing.
148 """
149 if self._thread is None:
150 return False
151 return self._thread.is_alive()
152
153 def result(self, timeout: float | int | None = None) -> Result | None:
154 """
155 Get result of running the task.
156 Blocks the thread until task is finished.
157
158 Args:
159 timeout (float | int | None, optional):
160 The maximum amount to wait for execution to finish.
161 If None, wait forever. If not None and time runs out, raises TimeoutError.
162 Defaults to None.
163 Returns:
164 Result if future returned result or None when cancelled.
165 """
166 start = time.time()
167 self._thread_made.wait(timeout=get_timeout(timeout, start)) # Wait until we start a thread
168 self._thread.join(timeout=get_timeout(timeout, start))
169 if self._thread.is_alive():
170 self.cancel()
171 raise TimeoutError # thread still running after timeout
172 if isinstance(self._result, BaseException):
173 raise self._result
174 return self._result
175
176
177# Why like this? The inner task was not getting properly garbage collected when it was running,
178# but this does and just cancels the inner task so it also gets garbage collected
179# this is cursed :/
[docs]
180class AQLTask:
181 """
182 Task object returned to user, so that dependencies can be created.
183
184 Attributes:
185 task (Callable): function that gets executed asynchronously
186 dependencies (list[AQLTask]): Optional dependencies. The task will wait for all its dependencies to finish, before starting.
187 callbacks (list[Callable]): Callbacks ran when the task finishes executing.
188 Task result is inserted as an argument to the function.
189 pipe_dependencies (bool): If True results of tasks defined as dependencies will be passed as arguments to self.task.
190 Defaults to False.
191 """
192
193 def __init__(
194 self,
195 task: Callable,
196 dependencies: list['AQLTask'] | None = None,
197 callbacks: list[Callable] | None = None,
198 pipe_dependencies: bool = False
199 ) -> None:
200 self._inner_task = _InnerAQLTask(task, dependencies, callbacks, pipe_dependencies)
201 weakref.finalize(self, self._inner_task.cancel)
202
203 def __getattr__(self, name: str) -> Any:
204 return getattr(self._inner_task, name)