1"""Asynchronous QLauncher orchestration layer (AQL).
2
3AQL is a lightweight orchestration utility that lets you submit one or more
4`QLauncher` executions to a pluggable job manager (`BaseJobManager`) and collect the results later.
5
6Conceptually, AQL builds a dependency graph of tasks and runs a scheduler thread that:
7- submits tasks whose dependencies are satisfied,
8- waits for any job to finish via the manager,
9- reads results, surfaces exceptions, and triggers callbacks.
10
11Execution modes:
12- ``default``: tasks are submitted as soon as they are ready.
13- ``optimize_session``: for *device* backends (``backend.is_device == True``), a task is split into a
14 formatter step (problem conversion) and a quantum execution step. Two barrier "gateway" tasks are then
15 inserted so that the global ordering becomes:
16 (classical prerequisites of quantum) -> (all quantum tasks) -> (remaining classical tasks).
17
18Example:
19 from qlauncher.workflow.new_aql import AQL
20 from qlauncher.launcher.qlauncher import QLauncher
21 from qlauncher.routines.qiskit import FALQON, QiskitBackend
22
23 problem = ... # Problem / Hamiltonian
24 algo = FALQON(max_reps=1)
25 backend = QiskitBackend("local_simulator")
26 launcher = QLauncher(problem, algo, backend)
27
28 with AQL(mode="default") as aql:
29 aql.add_task(launcher, shots=128)
30 aql.start()
31 result = aql.results(timeout=60)[0]
32"""
33
34import contextlib
35import time
36import weakref
37from collections.abc import Callable
38from threading import Event, Thread
39from typing import Any, Literal
40
41from qlauncher.base import Algorithm, Backend, Model, Problem, Result
42from qlauncher.launcher.aql.aql_task import ManagerBackedTask, get_timeout
43from qlauncher.launcher.qlauncher import QLauncher
44from qlauncher.workflow.base_job_manager import BaseJobManager
45from qlauncher.workflow.local_scheduler import LocalJobManager
46
47
48def _gateway_true() -> bool:
49 """
50 A small barrier helper used by AQL in ``optimize_session`` mode.
51
52 This task always returns ``True`` and is used only to enforce dependency ordering
53 between groups of tasks (classical prerequisites, quantum tasks, remaining classical).
54
55 Args:
56 None.
57
58 Returns:
59 Always ``True``.
60 """
61 return True
62
63
64def _filter_run_kwargs_for_callable(fn: Callable[..., Any], run_kwargs: dict[str, Any]) -> dict[str, Any]:
65 """
66 Filter ``run_kwargs`` so they are safe to forward to a callable.
67
68 If the callable accepts ``**kwargs`` then ``run_kwargs`` is returned unchanged.
69 If the callable does *not* accept ``**kwargs``, only
70 keyword arguments that match the callable signature are forwarded.
71
72 This is mainly a guardrail for calling real ``QLauncher.run(...)`` across processes,
73 where tests may pass extra kwargs and where the launcher implementation may have a
74 narrow signature.
75
76 Args:
77 fn: Target callable (e.g., ``QLauncher.run``).
78 run_kwargs: Candidate keyword arguments to forward.
79
80 Returns:
81 A dictionary of keyword arguments that can be safely passed to ``fn``.
82 """
83 try:
84 import inspect
85
86 sig = inspect.signature(fn)
87 if any(p.kind == p.VAR_KEYWORD for p in sig.parameters.values()):
88 return run_kwargs
89 allowed = set(sig.parameters.keys())
90 return {k: v for k, v in run_kwargs.items() if k in allowed}
91 except Exception:
92 return run_kwargs
93
94
[docs]
95class AQL:
96 """
97 Launches QLauncher tasks asynchronously, using a provided BaseJobManager.
98
99 - In 'default' mode, tasks are submitted as they become ready.
100 - In 'optimize_session' mode, real-device tasks are split (format + quantum run) and
101 a dependency barrier is inserted so that:
102 (all classical prereqs of quantum) -> (all quantum tasks) -> (remaining classical tasks).
103
104 Notes:
105 - `**run_kwargs` are forwarded to `QLauncher.run(**run_kwargs)`.
106 - `manager_kwargs` are forwarded to `manager.submit(..., **manager_kwargs)`.
107 """
108
109 def __init__(
110 self,
111 mode: Literal['default', 'optimize_session'] = 'default',
112 manager: BaseJobManager | None = None,
113 ) -> None:
114 """
115 Create a new AQL scheduler instance.
116
117 Args:
118 mode: Scheduling mode. ``"default"`` submits tasks as soon as they are ready.
119 ``"optimize_session"`` groups device-backed quantum tasks into a single
120 serialized block by inserting barrier tasks.
121 manager: Job manager used to execute tasks. If ``None``, a ``LocalJobManager`` is
122 created and used.
123
124 Returns:
125 None.
126 """
127 self.tasks: list[ManagerBackedTask] = [] # user-visible tasks (results order)
128 self.mode: Literal['default', 'optimize_session'] = mode
129
130 # Internal task sets (includes helper/gateway tasks)
131 self._classical_tasks: list[ManagerBackedTask] = []
132 self._quantum_tasks: list[ManagerBackedTask] = []
133
134 # Per-task manager submission kwargs
135 self._manager_kwargs: dict[ManagerBackedTask, dict[str, Any]] = {}
136
137 # Scheduler state
138 self._manager: BaseJobManager = manager if manager is not None else LocalJobManager()
139 self._scheduler_thread: Thread | None = None
140 self._scheduler_done = Event()
141 self._scheduler_exc: BaseException | None = None
142 self._prepared_optimize_session: bool = False
143
[docs]
144 def running_task_count(self) -> int:
145 """
146 Return the number of currently running internal tasks.
147
148 Args:
149 None.
150
151 Returns:
152 Number of tasks for which ``task.running()`` is ``True``.
153 """
154 return sum(1 for t in (self._classical_tasks + self._quantum_tasks) if t.running())
155
[docs]
156 def cancel_running_tasks(self) -> None:
157 """
158 Best-effort cancellation of all internal tasks (classical + quantum).
159
160 Args:
161 None.
162
163 Returns:
164 None.
165 """
166 for t in self._classical_tasks + self._quantum_tasks:
167 with contextlib.suppress(Exception):
168 t.cancel()
169
[docs]
170 def results(self, timeout: float | int | None = None, cancel_tasks_on_timeout: bool = True) -> list[Result | None]:
171 """
172 Collect results from user-visible tasks (in the order they were added).
173
174 Args:
175 timeout: Total timeout budget (seconds) shared across all ``result()`` waits.
176 cancel_tasks_on_timeout: If ``True``, cancel any still-running tasks when a
177 ``TimeoutError`` is raised.
178
179 Returns:
180 A list of results aligned with ``add_task`` order. Cancelled tasks yield ``None``.
181
182 Raises:
183 TimeoutError: If the overall timeout is exceeded while waiting for results.
184 BaseException: Re-raises any exception from the underlying task or scheduler.
185 """
186 try:
187 start = time.time()
188 out: list[Result | None] = []
189 for t in self.tasks:
190 out.append(t.result(timeout=get_timeout(timeout, start)) if not t.cancelled() else None)
191
192 # If scheduler crashed, surface the error even if tasks returned None.
193 if self._scheduler_exc is not None:
194 raise self._scheduler_exc
195
196 return out
197 except TimeoutError as e:
198 if cancel_tasks_on_timeout:
199 self.cancel_running_tasks()
200 raise e
201 except Exception as e:
202 self.cancel_running_tasks()
203 raise e
204
[docs]
205 def add_task(
206 self,
207 launcher: QLauncher | tuple[Problem | Model, Algorithm, Backend],
208 dependencies: list[ManagerBackedTask] | None = None,
209 callbacks: list[Callable] | None = None,
210 manager_kwargs: dict[str, Any] | None = None,
211 **run_kwargs: object,
212 ) -> ManagerBackedTask:
213 """
214 Add a QLauncher task to the execution queue.
215
216 In ``default`` mode (or when ``launcher.backend.is_device`` is ``False``), this creates a single
217 task that calls ``launcher.run(**run_kwargs)``.
218
219 In ``optimize_session`` mode for device backends, the submission is split into:
220 1) a formatter task (``launcher._get_compatible_problem``),
221 2) a quantum execution task that builds a new launcher in the worker and runs it.
222 The returned (user-visible) task is the quantum execution task.
223
224 Args:
225 launcher: Either a ``QLauncher`` instance or a tuple ``(problem, algorithm, backend)`` used to
226 construct one.
227 dependencies: Optional list of tasks that must complete before this task can run.
228 callbacks: Optional list of callables invoked with the task outcome.
229 manager_kwargs: Keyword arguments forwarded to ``manager.submit(...)`` when the task is submitted.
230 **run_kwargs: Keyword arguments forwarded to ``QLauncher.run(**run_kwargs)``.
231
232 Returns:
233 A ``ManagerBackedTask`` representing the submitted work. For device backends in
234 ``optimize_session`` mode, this is the quantum execution task.
235 """
236 if isinstance(launcher, tuple):
237 launcher = QLauncher(*launcher)
238
239 deps = dependencies if dependencies is not None else []
240 cb = callbacks if callbacks is not None else []
241 mkwargs = manager_kwargs if manager_kwargs is not None else {}
242
243 # Default mode (or non-device backend): single task
244 if self.mode != 'optimize_session' or not launcher.backend.is_device:
245
246 def run_launcher(launcher=launcher, run_kwargs=run_kwargs) -> Result:
247 kwargs = _filter_run_kwargs_for_callable(launcher.run, run_kwargs)
248 return launcher.run(**kwargs)
249
250 task = ManagerBackedTask(run_launcher, dependencies=deps, callbacks=cb, pipe_dependencies=False)
251 self.tasks.append(task)
252 self._classical_tasks.append(task)
253 self._manager_kwargs[task] = dict(mkwargs)
254 return task
255
256 # optimize_session + device: split into (format) + (quantum run)
257 # Formatter depends on classical deps only (to allow quantum block ordering)
258 format_deps = [d for d in deps if d not in self._quantum_tasks]
259 t_gen = ManagerBackedTask(launcher._get_compatible_problem, dependencies=format_deps, pipe_dependencies=False)
260 self._classical_tasks.append(t_gen)
261 self._manager_kwargs[t_gen] = dict(mkwargs)
262
263 algo = launcher.algorithm
264 backend = launcher.backend
265
266 QLauncherCtor = QLauncher
267
268 def quantum_run(
269 formatted_problem: Problem | Model,
270 *_: object,
271 algo=algo,
272 backend=backend,
273 run_kwargs=run_kwargs,
274 QLauncherCtor=QLauncherCtor,
275 ) -> Result:
276 ql = QLauncherCtor(formatted_problem, algo, backend)
277 kwargs = _filter_run_kwargs_for_callable(ql.run, run_kwargs)
278 return ql.run(**kwargs)
279
280 # Quantum run depends on formatter + quantum deps (to serialize device session)
281 quantum_deps = [d for d in deps if d in self._quantum_tasks]
282 t_quant = ManagerBackedTask(
283 quantum_run,
284 dependencies=[t_gen] + quantum_deps,
285 callbacks=cb,
286 pipe_dependencies=True,
287 )
288 self._quantum_tasks.append(t_quant)
289 self._manager_kwargs[t_quant] = dict(mkwargs)
290
291 # User-visible task is the quantum run
292 self.tasks.append(t_quant)
293 return t_quant
294
[docs]
295 def start(self) -> None:
296 """
297 Start scheduling and execution in a background thread.
298
299 This validates that no tasks were previously submitted, prepares session barriers for
300 ``optimize_session`` mode (once), and then starts the scheduler loop.
301
302 Args:
303 None.
304
305 Returns:
306 None.
307
308 Raises:
309 ValueError: If the scheduler is already running or tasks were already submitted/finished.
310 """
311 if self._scheduler_thread is not None and self._scheduler_thread.is_alive():
312 raise ValueError('Cannot start again, scheduler is already running.')
313
314 for t in self._classical_tasks + self._quantum_tasks:
315 if t.job_id() is not None or t.done() or t.running():
316 raise ValueError('Cannot start again, some tasks were already submitted or finished.')
317
318 if self.mode == 'optimize_session' and not self._prepared_optimize_session:
319 self._prepare_optimize_session()
320 self._prepared_optimize_session = True
321
322 self._scheduler_done.clear()
323 self._scheduler_exc = None
324
325 self._scheduler_thread = Thread(target=weakref.proxy(self)._scheduler_loop, daemon=True)
326 self._scheduler_thread.start()
327
328 def _prepare_optimize_session(self) -> None:
329 """
330 Prepare the internal dependency graph for ``optimize_session`` mode.
331
332 This method inserts two "gateway" tasks to enforce the global ordering:
333 (classical prerequisites of quantum) -> (all quantum tasks) -> (remaining classical).
334
335 Args:
336 None.
337
338 Returns:
339 None.
340 """
341 if not self._quantum_tasks:
342 return
343
344 # Collect transitive dependencies of quantum tasks
345 quantum_dependencies: set[ManagerBackedTask] = set()
346 queue = self._quantum_tasks.copy()
347 seen: set[ManagerBackedTask] = set()
348
349 while queue:
350 t = queue.pop(0)
351 if t in seen:
352 continue
353 seen.add(t)
354 for dep in t.dependencies:
355 quantum_dependencies.add(dep)
356 queue.append(dep)
357
358 # Classical deps are those deps that are not quantum tasks themselves
359 quantum_dependencies = quantum_dependencies.difference(set(self._quantum_tasks))
360
361 # Gateway: after all classical prereqs of quantum tasks
362 gateway_classical = ManagerBackedTask(_gateway_true, dependencies=list(quantum_dependencies))
363 self._classical_tasks.append(gateway_classical)
364 self._manager_kwargs[gateway_classical] = {}
365
366 for qt in self._quantum_tasks:
367 qt.dependencies.append(gateway_classical)
368
369 # Gateway: after all quantum tasks
370 gateway_quantum = ManagerBackedTask(_gateway_true, dependencies=self._quantum_tasks.copy())
371 self._quantum_tasks.append(gateway_quantum)
372 self._manager_kwargs[gateway_quantum] = {}
373
374 for ct in [t for t in self._classical_tasks if t not in quantum_dependencies and t is not gateway_classical]:
375 ct.dependencies.append(gateway_quantum)
376
377 def _scheduler_loop(self) -> None:
378 """
379 Run the AQL scheduler loop.
380
381 The scheduler:
382 - submits tasks that are ready,
383 - waits for any job to finish via the manager,
384 - reads results and finalizes tasks,
385 - propagates scheduler-level failures to remaining tasks.
386
387 Args:
388 None.
389
390 Returns:
391 None.
392 """
393 all_tasks: list[ManagerBackedTask] = []
394 for t in self._classical_tasks + self._quantum_tasks:
395 if t not in all_tasks:
396 all_tasks.append(t)
397
398 job_to_task: dict[str, ManagerBackedTask] = {}
399
400 try:
401 while True:
402 # Exit condition: all tasks are terminal (DONE/CANCELLED/FAILED)
403 if all(t.done() for t in all_tasks):
404 break
405
406 for t in all_tasks:
407 if t.cancelled() or t.done() or t.job_id() is not None:
408 continue
409 if not t.is_ready():
410 continue
411
412 # IMPORTANT: don't let a single submit exception kill the whole scheduler
413 try:
414 jid = t._submit(self._manager, **self._manager_kwargs.get(t, {}))
415 job_to_task[jid] = t
416 except BaseException as e:
417 t._set_exception(e)
418
419 # Defensive reconciliation: make sure we track all in-flight jobs.
420 for t in all_tasks:
421 jid = t.job_id()
422 if jid is None:
423 continue
424 if t.done() or t.cancelled():
425 continue
426 job_to_task.setdefault(jid, t)
427
428 for jid, t in list(job_to_task.items()):
429 if t.done() or t.cancelled() or t.job_id() != jid:
430 job_to_task.pop(jid, None)
431
432 # If no jobs are in-flight and tasks remain -> dependency deadlock
433 if not job_to_task:
434 pending = [t for t in all_tasks if (not t.done()) and (not t.cancelled()) and (t.job_id() is None)]
435 if not pending:
436 break
437 raise RuntimeError('Deadlock: no runnable tasks (check dependency graph).')
438
439 try:
440 wait_ret = self._manager.wait_for_a_job(None, timeout=0.5)
441 except TimeoutError:
442 continue
443
444 jid = wait_ret[0] if isinstance(wait_ret, tuple) and wait_ret else wait_ret
445
446 if jid is None:
447 continue
448
449 task = job_to_task.pop(jid, None)
450 if task is None:
451 task = next((t for t in all_tasks if t.job_id() == jid), None)
452 if task is None:
453 continue
454
455 try:
456 res = self._manager.read_results(jid)
457 task._set_result(res)
458 except BaseException as e:
459 task._set_exception(e)
460
461 except BaseException as e:
462 # Scheduler-level error: fail remaining tasks so result() doesn't hang
463 self._scheduler_exc = e
464 for t in all_tasks:
465 try:
466 if not t.done() and not t.cancelled():
467 t._set_exception(e)
468 except Exception:
469 pass
470 with contextlib.suppress(Exception):
471 self.cancel_running_tasks()
472
473 finally:
474 with contextlib.suppress(Exception):
475 self._manager.clean_up()
476 self._scheduler_done.set()
477
478 def __enter__(self):
479 """
480 Enter a context-manager scope.
481
482 Returns:
483 The current ``AQL`` instance.
484 """
485 return self
486
487 def __exit__(self, exc_type, exc_value, exc_traceback):
488 """
489 Exit a context-manager scope and perform best-effort cleanup.
490
491 Args:
492 exc_type: Exception type (if any).
493 exc_value: Exception instance (if any).
494 exc_traceback: Traceback (if any).
495
496 Returns:
497 None.
498 """
499 self.cancel_running_tasks()
500 with contextlib.suppress(Exception):
501 self._manager.clean_up()