Source code for qlauncher.launcher.aql.aql_task

  1import contextlib
  2import time
  3import weakref
  4from collections.abc import Callable
  5from dataclasses import dataclass
  6from functools import wraps
  7from threading import Event, Lock
  8from typing import Any
  9
 10from qlauncher.base.base import Result
 11from qlauncher.workflow.base_job_manager import BaseJobManager
 12
 13
[docs] 14def get_timeout(max_timeout: int | float | None, start: int | float) -> float | None: 15 """ 16 Compute remaining time budget when multiple waits share a single overall timeout. 17 18 Args: 19 max_timeout: Total allowed timeout in seconds. If None, wait indefinitely. 20 start: Start timestamp from time.time(). 21 22 Returns: 23 Remaining timeout in seconds, or None if max_timeout was None. 24 """ 25 if max_timeout is None: 26 return None 27 return max_timeout - (time.time() - start)
28 29 30@dataclass(frozen=True) 31class _State: 32 PENDING: str = 'PENDING' 33 SUBMITTED: str = 'SUBMITTED' 34 DONE: str = 'DONE' 35 CANCELLED: str = 'CANCELLED' 36 FAILED: str = 'FAILED' 37 38
[docs] 39class ManagerBackedTask: 40 """ 41 Async-like task that is executed by a BaseJobManager. 42 Returned to users. Supports dependencies, callbacks, cancellation, and result() waiting. 43 """ 44 45 def __init__( 46 self, 47 task: Callable, 48 dependencies: list['ManagerBackedTask'] | None = None, 49 callbacks: list[Callable] | None = None, 50 pipe_dependencies: bool = False, 51 ) -> None: 52 """ 53 Create a task executed by a BaseJobManager. 54 55 Args: 56 task: Callable to execute. If pipe_dependencies=True, it will be invoked as task(*dep_results). 57 dependencies: Tasks that must be terminal before this task can be submitted. 58 callbacks: Optional callables invoked after completion with either (result) or (exception). 59 pipe_dependencies: If True, results of dependencies are passed as positional arguments to task. 60 61 Returns: 62 None. 63 """ 64 self.task = task 65 self.dependencies = dependencies if dependencies is not None else [] 66 self.callbacks = callbacks if callbacks is not None else [] 67 self.pipe_dependencies = pipe_dependencies 68 69 self._state = _State.PENDING 70 self._state_lock = Lock() 71 72 self._job_id: str | None = None 73 self._manager_ref: weakref.ReferenceType[BaseJobManager] | None = None 74 75 self._result: Result | None = None 76 self._exception: BaseException | None = None 77 78 self._done_event = Event() 79
[docs] 80 def is_ready(self) -> bool: 81 """ 82 Check whether the task can be submitted. 83 84 Args: 85 None. 86 87 Returns: 88 True if all dependency tasks are terminal (DONE/FAILED/CANCELLED), otherwise False. 89 """ 90 return all(dep.done() for dep in self.dependencies)
91
[docs] 92 def job_id(self) -> str | None: 93 return self._job_id
94
[docs] 95 def cancelled(self) -> bool: 96 return self._state == _State.CANCELLED
97
[docs] 98 def done(self) -> bool: 99 return self._state in (_State.DONE, _State.CANCELLED, _State.FAILED)
100
[docs] 101 def running(self) -> bool: 102 return self._state == _State.SUBMITTED and not self._done_event.is_set()
103
[docs] 104 def cancel(self) -> bool: 105 """ 106 Attempt to cancel the task. 107 108 If the task has already been submitted, this forwards the cancellation to the manager 109 best-effort (manager.cancel(job_id)). The task is marked CANCELLED and becomes terminal. 110 111 Args: 112 None. 113 114 Returns: 115 True if the task is now cancelled (or was already cancelled). 116 False if the task was already terminal (DONE/FAILED) and cannot be cancelled. 117 """ 118 with self._state_lock: 119 # already terminal 120 if self._state in (_State.DONE, _State.FAILED): 121 return False 122 if self._state == _State.CANCELLED: 123 return True 124 125 # pending => cancel locally 126 self._state = _State.CANCELLED 127 self._done_event.set() 128 129 job_id = self._job_id 130 mref = self._manager_ref 131 132 # if submitted, best-effort cancel via manager 133 mgr = mref() if mref is not None else None 134 if mgr is not None and job_id is not None: 135 with contextlib.suppress(Exception): 136 mgr.cancel(job_id) 137 138 return True
139 140 def _set_result(self, res: Result) -> None: 141 """ 142 Mark the task as successfully completed and run callbacks. 143 144 Args: 145 res: Result value produced by the job. 146 147 Returns: 148 None. 149 """ 150 with self._state_lock: 151 if self.done(): 152 return 153 self._result = res 154 self._state = _State.DONE 155 self._done_event.set() 156 157 for cb in self.callbacks: 158 with contextlib.suppress(Exception): 159 cb(res) 160 161 def _set_exception(self, e: BaseException) -> None: 162 """ 163 Mark the task as failed and run callbacks. 164 165 Args: 166 e: Exception raised by the job. 167 168 Returns: 169 None. 170 """ 171 with self._state_lock: 172 if self.done(): 173 return 174 self._exception = e 175 self._state = _State.FAILED 176 self._done_event.set() 177 178 for cb in self.callbacks: 179 with contextlib.suppress(Exception): 180 cb(e) 181
[docs] 182 def result(self, timeout: float | int | None = None) -> Result | None: 183 """ 184 Wait for the task to reach a terminal state and return its outcome. 185 186 Args: 187 timeout: Maximum time to wait in seconds. If None, wait indefinitely. 188 189 Returns: 190 The task result if completed successfully. 191 None if the task was cancelled. 192 193 Raises: 194 TimeoutError: If timeout expires before the task becomes terminal (task is cancelled). 195 BaseException: Re-raises the exception produced by the job when the task is FAILED. 196 """ 197 start = time.time() 198 if not self._done_event.wait(timeout=get_timeout(timeout, start)): 199 self.cancel() 200 raise TimeoutError 201 202 if self._state == _State.CANCELLED: 203 return None 204 if self._exception is not None: 205 raise self._exception 206 return self._result
207 208 def _submit(self, manager: BaseJobManager, **manager_kwargs) -> str: 209 """ 210 Submit this task to a job manager and transition it to SUBMITTED. 211 212 This method is intended to be called by a scheduler (e.g., AQL) once is_ready() is True. 213 If pipe_dependencies=True, dependency results are collected and passed as positional args. 214 215 Args: 216 manager: Job manager used to execute the task. 217 **manager_kwargs: Manager-specific arguments forwarded to manager.submit(...). 218 219 Returns: 220 The job ID returned by manager.submit(...). 221 222 Raises: 223 RuntimeError: If the task is not in PENDING state, or if it is already cancelled. 224 """ 225 with self._state_lock: 226 if self._state != _State.PENDING: 227 raise RuntimeError('Task already submitted or terminal.') 228 if self.cancelled(): 229 raise RuntimeError('Cannot submit a cancelled task.') 230 self._manager_ref = weakref.ref(manager) 231 232 task_fn = self.task 233 234 # Evaluate dep results only here (scheduler should call _submit only when is_ready()). 235 dep_results: list[Any] = [] 236 if self.pipe_dependencies: 237 dep_results = [d.result() for d in self.dependencies] 238 239 func: Callable[..., Any] 240 if self.pipe_dependencies: 241 dep_args = tuple(dep_results) 242 243 @wraps(task_fn) 244 def bound(task_fn=task_fn, dep_args=dep_args) -> task_fn: 245 return task_fn(*dep_args) 246 247 func = bound 248 else: 249 func = task_fn 250 251 jid = manager.submit(func, **manager_kwargs) 252 with self._state_lock: 253 self._job_id = jid 254 self._state = _State.SUBMITTED 255 return jid