1"""Wrapper for QLauncher that enables the user to launch tasks asynchronously (futures + multiprocessing)"""
2from typing import Literal
3from collections.abc import Callable
4import time
5
6from qlauncher.problems import Raw
7from qlauncher.base.base import Backend, Algorithm, Problem, Result
8from qlauncher.launcher.qlauncher import QLauncher
9from qlauncher.launcher.aql.aql_task import AQLTask, get_timeout
10
11
[docs]
12class AQL:
13 """
14 Launches QLauncher task asynchronously.
15
16 Attributes:
17 tasks (list[AQLTask]): list of submitted tasks.
18 mode (Literal['default', 'optimize_session']): Execution mode.
19
20 Usage Example
21 -------------
22 ::
23
24 aql = AQL(mode='optimize_session')
25
26 t_real = aql.add_task(
27 (
28 GraphColoring.from_preset('small'),
29 QAOA(),
30 AQTBackend(token='valid_token', name='device')
31 ),
32 constraints_weight=5,
33 costs_weight=1
34 )
35
36 aql.add_task(
37 (
38 GraphColoring.from_preset('small'),
39 QAOA(),
40 QiskitBackend('local_simulator')
41 ),
42 dependencies=[t_real],
43 constraints_weight=5,
44 costs_weight=1
45 )
46
47 aql.start()
48 result_real, result_sim = aql.results(timeout=15)
49
50
51 """
52
53 def __init__(
54 self,
55 mode: Literal['default', 'optimize_session'] = 'default'
56 ) -> None:
57 """
58 Args:
59 mode (Literal['default', 'optimize_session'], optional):
60 Task execution mode.
61 If 'optimize_session' all tasks running on a real quantum device get split into separate generation and run subtasks,
62 then the quantum tasks are ran in one shorter block.
63 Defaults to 'default'.
64 """
65
66 self.tasks: list[AQLTask] = []
67 self.mode: Literal['default', 'optimize_session'] = mode
68
69 self._classical_tasks: list[AQLTask] = []
70 self._quantum_tasks: list[AQLTask] = []
71
[docs]
72 def running_task_count(self) -> int:
73 """
74 Returns:
75 int: Amount of tasks that are currently executing.
76 """
77 return sum(1 for t in self.tasks if t.running())
78
[docs]
79 def cancel_running_tasks(self):
80 """Cancel all running tasks assigned to this AQL instance."""
81 for t in self._classical_tasks + self._quantum_tasks:
82 t.cancel()
83
[docs]
84 def results(self, timeout: float | int | None = None, cancel_tasks_on_timeout: bool = True) -> list[Result | None]:
85 """
86 Get a list of results from tasks.
87 Results are ordered in the same way the tasks were added.
88 Blocks the thread until all tasks are finished.
89
90 Args:
91 timeout (float | int | None, optional):
92 The maximum amount to wait for execution to finish.
93 If None, wait forever. If not None and time runs out, raises TimeoutError.
94 Defaults to None.
95 cancel_tasks_on_timeout (bool): Whether to cancel all other tasks when one task raises a TimeoutError.
96
97 Returns:
98 list[Result | None]: Task results.
99 """
100 try:
101 start = time.time()
102 return [t.result(timeout=get_timeout(timeout, start)) if not t.cancelled() else None for t in self.tasks]
103 except TimeoutError as e:
104 if cancel_tasks_on_timeout:
105 self.cancel_running_tasks()
106 raise e
107 except Exception as e:
108 self.cancel_running_tasks()
109 raise e
110
[docs]
111 def add_task(
112 self,
113 launcher: QLauncher | tuple[Problem, Algorithm, Backend],
114 dependencies: list[AQLTask] | None = None,
115 callbacks: list[Callable] | None = None,
116 **kwargs
117 ) -> AQLTask:
118 """
119 Add a QLauncher task to the execution queue.
120
121 Args:
122 launcher (QLauncher | tuple[Problem, Algorithm, Backend]): Launcher instance that will be run.
123 dependencies (list[AQLTask] | None, optional): Tasks that must finish first before this task. Defaults to None.
124 callbacks (list[Callable] | None, optional):
125 Functions to run when the task finishes.
126 The task will be passed to the function as the only parameter.
127 Defaults to None.
128
129 Returns:
130 AQLTask: Pointer to the submitted task.
131 """
132 if isinstance(launcher, tuple):
133 launcher = QLauncher(*launcher)
134
135 dependencies_list = dependencies if dependencies is not None else []
136
137 if self.mode != 'optimize_session' or not launcher.backend.is_device:
138 task = AQLTask(
139 lambda: launcher.run(**kwargs),
140 dependencies=dependencies,
141 callbacks=(callbacks if callbacks is not None else [])
142 )
143 self.tasks.append(task)
144 self._classical_tasks.append(task)
145 return task
146
147 def gen_task():
148 launcher.formatter.set_run_params(kwargs)
149 return launcher.formatter(launcher.problem)
150
151 # Split real device task into generation and actual run on a QC
152 t_gen = AQLTask(
153 gen_task,
154 dependencies=[dep for dep in dependencies_list if not dep in self._quantum_tasks]
155 )
156
157 def quantum_task(formatted, *_):
158 ql = QLauncher(
159 Raw(formatted, launcher.problem.instance_name),
160 launcher.algorithm,
161 launcher.backend
162 )
163 return ql.run()
164
165 t_quant = AQLTask(
166 quantum_task,
167 dependencies=[t_gen] + [dep for dep in dependencies_list if dep in self._quantum_tasks],
168 callbacks=(callbacks if callbacks is not None else []),
169 pipe_dependencies=True # Receive output from formatter
170 )
171
172 self._classical_tasks.append(t_gen)
173 self._quantum_tasks.append(t_quant)
174 self.tasks.append(t_quant)
175
176 return t_quant
177
[docs]
178 def start(self):
179 """Start tasks execution."""
180 for t in self._classical_tasks+self._quantum_tasks:
181 if t.cancelled() or t.done() or t.running():
182 raise ValueError("Cannot start again, some tasks were already ran or are currently running.")
183
184 self._run_async()
185
186 def _run_async(self):
187 quantum_dependencies = set()
188 dependency_queue = self._quantum_tasks.copy()
189 while dependency_queue:
190 t = dependency_queue.pop(0)
191 quantum_dependencies |= set(t.dependencies)
192 dependency_queue += t.dependencies
193
194 quantum_dependencies = quantum_dependencies.difference(self._quantum_tasks)
195 # The gateway tasks will ensure execution order of
196 # (all classical tasks that quantum tasks depend on) - (all quantum tasks) - (rest of the classical tasks)
197 gateway_task_classical = AQLTask(
198 lambda: True,
199 dependencies=list(quantum_dependencies)
200 )
201
202 for qt in self._quantum_tasks:
203 qt.dependencies.append(gateway_task_classical)
204
205 gateway_task_quantum = AQLTask(
206 lambda: 42,
207 dependencies=self._quantum_tasks.copy()
208 )
209
210 for ct in [t for t in self._classical_tasks if not t in quantum_dependencies]:
211 ct.dependencies.append(gateway_task_quantum)
212
213 self._classical_tasks.append(gateway_task_classical)
214 self._quantum_tasks.append(gateway_task_quantum)
215
216 # Start all tasks
217 for t in self._classical_tasks:
218 t.start()
219
220 for qt in self._quantum_tasks:
221 qt.start()
222
223 def __enter__(self):
224 return self
225
226 def __exit__(self, exc_type, exc_value, exc_traceback):
227 for t in self.tasks:
228 if t.running():
229 t.cancel()