Source code for qlauncher.launcher.aql.aql_task

  1"""Wrapper for QLauncher that enables the user to launch tasks asynchronously (futures + multiprocessing)"""
  2from collections.abc import Callable
  3from typing import Any
  4from threading import Event, Thread
  5import time
  6import weakref
  7import multiprocess
  8
  9from pathos.multiprocessing import _ProcessPool
 10
 11from qlauncher.base.base import Result
 12
 13
[docs] 14def get_timeout(max_timeout: int | float | None, start: int | float) -> float | None: 15 """ 16 Get timeout to wait on an event, useful when awaiting multiple tasks and total timeout must be max_timeout. 17 18 Args: 19 max_timeout (int | float | None): Total allowed timeout, None = infinite wait. 20 start (int | float): Await start timestamp (time.time()) 21 22 Returns: 23 int | float | None: Remaining timeout or None if max_timeout was None 24 """ 25 if max_timeout is None: 26 return None 27 return max_timeout - (time.time() - start)
28 29 30class _InnerAQLTask: 31 """ 32 Task object returned to user, so that dependencies can be created. 33 34 Attributes: 35 task (Callable): function that gets executed asynchronously 36 dependencies (list[AQLTask]): Optional dependencies. The task will wait for all its dependencies to finish, before starting. 37 callbacks (list[Callable]): Callbacks ran when the task finishes executing. 38 Task result is inserted as an argument to the function. 39 pipe_dependencies (bool): If True results of tasks defined as dependencies will be passed as arguments to self.task. 40 Defaults to False. 41 """ 42 43 def __init__( 44 self, 45 task: Callable, 46 dependencies: list['AQLTask'] | None = None, 47 callbacks: list[Callable] | None = None, 48 pipe_dependencies: bool = False 49 ) -> None: 50 51 self.task = task 52 self.dependencies = dependencies if dependencies is not None else [] 53 self.callbacks = callbacks if callbacks is not None else [] 54 self.pipe_dependencies = pipe_dependencies 55 56 self._cancelled = False 57 self._thread = None 58 self._pool = _ProcessPool(processes=1) 59 self._thread_made = Event() 60 61 self._result = None 62 self._done = False 63 64 def _shutdown_subprocess(self): 65 self._pool.close() 66 self._pool.terminate() 67 self._pool.join() 68 69 def _async_task(self) -> Any: 70 dep_results = [d.result() for d in self.dependencies] 71 72 if self._cancelled: 73 self._result = None 74 self._done = True 75 return 76 77 res = self._pool.apply_async(self.task, args=(dep_results if self.pipe_dependencies else [])) 78 # Turns out you can't just outright kill threads (or futures) is so I have to do this, so that the thread knows to exit. 79 while not self._cancelled: 80 try: 81 self._result = res.get(timeout=0.05) 82 self._done = True 83 return 84 except multiprocess.context.TimeoutError: 85 pass # task not ready, check for cancel 86 # For any other error originating from the task, shutdown and clean up subprocess then raise error again. 87 except Exception as e: 88 self._shutdown_subprocess() 89 self._result = e 90 self._done = True 91 return 92 93 if self._cancelled: 94 self._shutdown_subprocess() # kill res process 95 96 self._result = res.get() if res.ready() else None 97 self._done = True 98 return 99 100 def _target_task(self): 101 # Main task + callbacks launch 102 self._async_task() 103 for cb in self.callbacks: 104 cb(self._result) 105 106 def _set_thread(self): 107 self._thread = Thread(target=weakref.proxy(self)._target_task, daemon=True) # set daemon so that thread quits as main process quits 108 self._thread.start() 109 self._thread_made.set() 110 111 def start(self): 112 """Start task execution.""" 113 if self._thread is not None or self._cancelled: 114 raise ValueError("Cannot start, task already started or cancelled.") 115 self._set_thread() 116 117 def cancel(self) -> bool: 118 """ 119 Attempt to cancel the task. 120 121 Returns: 122 bool: True if cancellation was successful 123 """ 124 self._cancelled = True 125 if self._thread is None: 126 return True 127 self._thread.join(0.1) 128 return not self._thread.is_alive() 129 130 def cancelled(self) -> bool: 131 """ 132 Returns: 133 bool: True if the task was cancelled by the user. 134 """ 135 return self._cancelled 136 137 def done(self) -> bool: 138 """ 139 Returns: 140 bool: True if the task had finished execution. 141 """ 142 return self._done 143 144 def running(self) -> bool: 145 """ 146 Returns: 147 bool: True if the task is currently executing. 148 """ 149 if self._thread is None: 150 return False 151 return self._thread.is_alive() 152 153 def result(self, timeout: float | int | None = None) -> Result | None: 154 """ 155 Get result of running the task. 156 Blocks the thread until task is finished. 157 158 Args: 159 timeout (float | int | None, optional): 160 The maximum amount to wait for execution to finish. 161 If None, wait forever. If not None and time runs out, raises TimeoutError. 162 Defaults to None. 163 Returns: 164 Result if future returned result or None when cancelled. 165 """ 166 start = time.time() 167 self._thread_made.wait(timeout=get_timeout(timeout, start)) # Wait until we start a thread 168 self._thread.join(timeout=get_timeout(timeout, start)) 169 if self._thread.is_alive(): 170 self.cancel() 171 raise TimeoutError # thread still running after timeout 172 if isinstance(self._result, BaseException): 173 raise self._result 174 return self._result 175 176 177# Why like this? The inner task was not getting properly garbage collected when it was running, 178# but this does and just cancels the inner task so it also gets garbage collected 179# this is cursed :/
[docs] 180class AQLTask: 181 """ 182 Task object returned to user, so that dependencies can be created. 183 184 Attributes: 185 task (Callable): function that gets executed asynchronously 186 dependencies (list[AQLTask]): Optional dependencies. The task will wait for all its dependencies to finish, before starting. 187 callbacks (list[Callable]): Callbacks ran when the task finishes executing. 188 Task result is inserted as an argument to the function. 189 pipe_dependencies (bool): If True results of tasks defined as dependencies will be passed as arguments to self.task. 190 Defaults to False. 191 """ 192 193 def __init__( 194 self, 195 task: Callable, 196 dependencies: list['AQLTask'] | None = None, 197 callbacks: list[Callable] | None = None, 198 pipe_dependencies: bool = False 199 ) -> None: 200 self._inner_task = _InnerAQLTask(task, dependencies, callbacks, pipe_dependencies) 201 weakref.finalize(self, self._inner_task.cancel) 202 203 def __getattr__(self, name: str) -> Any: 204 return getattr(self._inner_task, name)