Source code for qlauncher.launcher.aql.aql_task
1import contextlib
2import time
3import weakref
4from collections.abc import Callable
5from dataclasses import dataclass
6from functools import wraps
7from threading import Event, Lock
8from typing import Any
9
10from qlauncher.base.base import Result
11from qlauncher.workflow.base_job_manager import BaseJobManager
12
13
[docs]
14def get_timeout(max_timeout: int | float | None, start: int | float) -> float | None:
15 """
16 Compute remaining time budget when multiple waits share a single overall timeout.
17
18 Args:
19 max_timeout: Total allowed timeout in seconds. If None, wait indefinitely.
20 start: Start timestamp from time.time().
21
22 Returns:
23 Remaining timeout in seconds, or None if max_timeout was None.
24 """
25 if max_timeout is None:
26 return None
27 return max_timeout - (time.time() - start)
28
29
30@dataclass(frozen=True)
31class _State:
32 PENDING: str = 'PENDING'
33 SUBMITTED: str = 'SUBMITTED'
34 DONE: str = 'DONE'
35 CANCELLED: str = 'CANCELLED'
36 FAILED: str = 'FAILED'
37
38
[docs]
39class ManagerBackedTask:
40 """
41 Async-like task that is executed by a BaseJobManager.
42 Returned to users. Supports dependencies, callbacks, cancellation, and result() waiting.
43 """
44
45 def __init__(
46 self,
47 task: Callable,
48 dependencies: list['ManagerBackedTask'] | None = None,
49 callbacks: list[Callable] | None = None,
50 pipe_dependencies: bool = False,
51 ) -> None:
52 """
53 Create a task executed by a BaseJobManager.
54
55 Args:
56 task: Callable to execute. If pipe_dependencies=True, it will be invoked as task(*dep_results).
57 dependencies: Tasks that must be terminal before this task can be submitted.
58 callbacks: Optional callables invoked after completion with either (result) or (exception).
59 pipe_dependencies: If True, results of dependencies are passed as positional arguments to task.
60
61 Returns:
62 None.
63 """
64 self.task = task
65 self.dependencies = dependencies if dependencies is not None else []
66 self.callbacks = callbacks if callbacks is not None else []
67 self.pipe_dependencies = pipe_dependencies
68
69 self._state = _State.PENDING
70 self._state_lock = Lock()
71
72 self._job_id: str | None = None
73 self._manager_ref: weakref.ReferenceType[BaseJobManager] | None = None
74
75 self._result: Result | None = None
76 self._exception: BaseException | None = None
77
78 self._done_event = Event()
79
[docs]
80 def is_ready(self) -> bool:
81 """
82 Check whether the task can be submitted.
83
84 Args:
85 None.
86
87 Returns:
88 True if all dependency tasks are terminal (DONE/FAILED/CANCELLED), otherwise False.
89 """
90 return all(dep.done() for dep in self.dependencies)
91
[docs]
92 def job_id(self) -> str | None:
93 return self._job_id
94
[docs]
95 def cancelled(self) -> bool:
96 return self._state == _State.CANCELLED
97
[docs]
98 def done(self) -> bool:
99 return self._state in (_State.DONE, _State.CANCELLED, _State.FAILED)
100
[docs]
101 def running(self) -> bool:
102 return self._state == _State.SUBMITTED and not self._done_event.is_set()
103
[docs]
104 def cancel(self) -> bool:
105 """
106 Attempt to cancel the task.
107
108 If the task has already been submitted, this forwards the cancellation to the manager
109 best-effort (manager.cancel(job_id)). The task is marked CANCELLED and becomes terminal.
110
111 Args:
112 None.
113
114 Returns:
115 True if the task is now cancelled (or was already cancelled).
116 False if the task was already terminal (DONE/FAILED) and cannot be cancelled.
117 """
118 with self._state_lock:
119 # already terminal
120 if self._state in (_State.DONE, _State.FAILED):
121 return False
122 if self._state == _State.CANCELLED:
123 return True
124
125 # pending => cancel locally
126 self._state = _State.CANCELLED
127 self._done_event.set()
128
129 job_id = self._job_id
130 mref = self._manager_ref
131
132 # if submitted, best-effort cancel via manager
133 mgr = mref() if mref is not None else None
134 if mgr is not None and job_id is not None:
135 with contextlib.suppress(Exception):
136 mgr.cancel(job_id)
137
138 return True
139
140 def _set_result(self, res: Result) -> None:
141 """
142 Mark the task as successfully completed and run callbacks.
143
144 Args:
145 res: Result value produced by the job.
146
147 Returns:
148 None.
149 """
150 with self._state_lock:
151 if self.done():
152 return
153 self._result = res
154 self._state = _State.DONE
155 self._done_event.set()
156
157 for cb in self.callbacks:
158 with contextlib.suppress(Exception):
159 cb(res)
160
161 def _set_exception(self, e: BaseException) -> None:
162 """
163 Mark the task as failed and run callbacks.
164
165 Args:
166 e: Exception raised by the job.
167
168 Returns:
169 None.
170 """
171 with self._state_lock:
172 if self.done():
173 return
174 self._exception = e
175 self._state = _State.FAILED
176 self._done_event.set()
177
178 for cb in self.callbacks:
179 with contextlib.suppress(Exception):
180 cb(e)
181
[docs]
182 def result(self, timeout: float | int | None = None) -> Result | None:
183 """
184 Wait for the task to reach a terminal state and return its outcome.
185
186 Args:
187 timeout: Maximum time to wait in seconds. If None, wait indefinitely.
188
189 Returns:
190 The task result if completed successfully.
191 None if the task was cancelled.
192
193 Raises:
194 TimeoutError: If timeout expires before the task becomes terminal (task is cancelled).
195 BaseException: Re-raises the exception produced by the job when the task is FAILED.
196 """
197 start = time.time()
198 if not self._done_event.wait(timeout=get_timeout(timeout, start)):
199 self.cancel()
200 raise TimeoutError
201
202 if self._state == _State.CANCELLED:
203 return None
204 if self._exception is not None:
205 raise self._exception
206 return self._result
207
208 def _submit(self, manager: BaseJobManager, **manager_kwargs) -> str:
209 """
210 Submit this task to a job manager and transition it to SUBMITTED.
211
212 This method is intended to be called by a scheduler (e.g., AQL) once is_ready() is True.
213 If pipe_dependencies=True, dependency results are collected and passed as positional args.
214
215 Args:
216 manager: Job manager used to execute the task.
217 **manager_kwargs: Manager-specific arguments forwarded to manager.submit(...).
218
219 Returns:
220 The job ID returned by manager.submit(...).
221
222 Raises:
223 RuntimeError: If the task is not in PENDING state, or if it is already cancelled.
224 """
225 with self._state_lock:
226 if self._state != _State.PENDING:
227 raise RuntimeError('Task already submitted or terminal.')
228 if self.cancelled():
229 raise RuntimeError('Cannot submit a cancelled task.')
230 self._manager_ref = weakref.ref(manager)
231
232 task_fn = self.task
233
234 # Evaluate dep results only here (scheduler should call _submit only when is_ready()).
235 dep_results: list[Any] = []
236 if self.pipe_dependencies:
237 dep_results = [d.result() for d in self.dependencies]
238
239 func: Callable[..., Any]
240 if self.pipe_dependencies:
241 dep_args = tuple(dep_results)
242
243 @wraps(task_fn)
244 def bound(task_fn=task_fn, dep_args=dep_args) -> task_fn:
245 return task_fn(*dep_args)
246
247 func = bound
248 else:
249 func = task_fn
250
251 jid = manager.submit(func, **manager_kwargs)
252 with self._state_lock:
253 self._job_id = jid
254 self._state = _State.SUBMITTED
255 return jid