Source code for qlauncher.launcher.aql.aql

  1"""Wrapper for QLauncher that enables the user to launch tasks asynchronously (futures + multiprocessing)"""
  2from typing import Literal
  3from collections.abc import Callable
  4import time
  5
  6from qlauncher.problems import Raw
  7from qlauncher.base.base import Backend, Algorithm, Problem, Result
  8from qlauncher.launcher.qlauncher import QLauncher
  9from qlauncher.launcher.aql.aql_task import AQLTask, get_timeout
 10
 11
[docs] 12class AQL: 13 """ 14 Launches QLauncher task asynchronously. 15 16 Attributes: 17 tasks (list[AQLTask]): list of submitted tasks. 18 mode (Literal['default', 'optimize_session']): Execution mode. 19 20 Usage Example 21 ------------- 22 :: 23 24 aql = AQL(mode='optimize_session') 25 26 t_real = aql.add_task( 27 ( 28 GraphColoring.from_preset('small'), 29 QAOA(), 30 AQTBackend(token='valid_token', name='device') 31 ), 32 constraints_weight=5, 33 costs_weight=1 34 ) 35 36 aql.add_task( 37 ( 38 GraphColoring.from_preset('small'), 39 QAOA(), 40 QiskitBackend('local_simulator') 41 ), 42 dependencies=[t_real], 43 constraints_weight=5, 44 costs_weight=1 45 ) 46 47 aql.start() 48 result_real, result_sim = aql.results(timeout=15) 49 50 51 """ 52 53 def __init__( 54 self, 55 mode: Literal['default', 'optimize_session'] = 'default' 56 ) -> None: 57 """ 58 Args: 59 mode (Literal['default', 'optimize_session'], optional): 60 Task execution mode. 61 If 'optimize_session' all tasks running on a real quantum device get split into separate generation and run subtasks, 62 then the quantum tasks are ran in one shorter block. 63 Defaults to 'default'. 64 """ 65 66 self.tasks: list[AQLTask] = [] 67 self.mode: Literal['default', 'optimize_session'] = mode 68 69 self._classical_tasks: list[AQLTask] = [] 70 self._quantum_tasks: list[AQLTask] = [] 71
[docs] 72 def running_task_count(self) -> int: 73 """ 74 Returns: 75 int: Amount of tasks that are currently executing. 76 """ 77 return sum(1 for t in self.tasks if t.running())
78
[docs] 79 def cancel_running_tasks(self): 80 """Cancel all running tasks assigned to this AQL instance.""" 81 for t in self._classical_tasks + self._quantum_tasks: 82 t.cancel()
83
[docs] 84 def results(self, timeout: float | int | None = None, cancel_tasks_on_timeout: bool = True) -> list[Result | None]: 85 """ 86 Get a list of results from tasks. 87 Results are ordered in the same way the tasks were added. 88 Blocks the thread until all tasks are finished. 89 90 Args: 91 timeout (float | int | None, optional): 92 The maximum amount to wait for execution to finish. 93 If None, wait forever. If not None and time runs out, raises TimeoutError. 94 Defaults to None. 95 cancel_tasks_on_timeout (bool): Whether to cancel all other tasks when one task raises a TimeoutError. 96 97 Returns: 98 list[Result | None]: Task results. 99 """ 100 try: 101 start = time.time() 102 return [t.result(timeout=get_timeout(timeout, start)) if not t.cancelled() else None for t in self.tasks] 103 except TimeoutError as e: 104 if cancel_tasks_on_timeout: 105 self.cancel_running_tasks() 106 raise e 107 except Exception as e: 108 self.cancel_running_tasks() 109 raise e
110
[docs] 111 def add_task( 112 self, 113 launcher: QLauncher | tuple[Problem, Algorithm, Backend], 114 dependencies: list[AQLTask] | None = None, 115 callbacks: list[Callable] | None = None, 116 **kwargs 117 ) -> AQLTask: 118 """ 119 Add a QLauncher task to the execution queue. 120 121 Args: 122 launcher (QLauncher | tuple[Problem, Algorithm, Backend]): Launcher instance that will be run. 123 dependencies (list[AQLTask] | None, optional): Tasks that must finish first before this task. Defaults to None. 124 callbacks (list[Callable] | None, optional): 125 Functions to run when the task finishes. 126 The task will be passed to the function as the only parameter. 127 Defaults to None. 128 129 Returns: 130 AQLTask: Pointer to the submitted task. 131 """ 132 if isinstance(launcher, tuple): 133 launcher = QLauncher(*launcher) 134 135 dependencies_list = dependencies if dependencies is not None else [] 136 137 if self.mode != 'optimize_session' or not launcher.backend.is_device: 138 task = AQLTask( 139 lambda: launcher.run(**kwargs), 140 dependencies=dependencies, 141 callbacks=(callbacks if callbacks is not None else []) 142 ) 143 self.tasks.append(task) 144 self._classical_tasks.append(task) 145 return task 146 147 def gen_task(): 148 launcher.formatter.set_run_params(kwargs) 149 return launcher.formatter(launcher.problem) 150 151 # Split real device task into generation and actual run on a QC 152 t_gen = AQLTask( 153 gen_task, 154 dependencies=[dep for dep in dependencies_list if not dep in self._quantum_tasks] 155 ) 156 157 def quantum_task(formatted, *_): 158 ql = QLauncher( 159 Raw(formatted, launcher.problem.instance_name), 160 launcher.algorithm, 161 launcher.backend 162 ) 163 return ql.run() 164 165 t_quant = AQLTask( 166 quantum_task, 167 dependencies=[t_gen] + [dep for dep in dependencies_list if dep in self._quantum_tasks], 168 callbacks=(callbacks if callbacks is not None else []), 169 pipe_dependencies=True # Receive output from formatter 170 ) 171 172 self._classical_tasks.append(t_gen) 173 self._quantum_tasks.append(t_quant) 174 self.tasks.append(t_quant) 175 176 return t_quant
177
[docs] 178 def start(self): 179 """Start tasks execution.""" 180 for t in self._classical_tasks+self._quantum_tasks: 181 if t.cancelled() or t.done() or t.running(): 182 raise ValueError("Cannot start again, some tasks were already ran or are currently running.") 183 184 self._run_async()
185 186 def _run_async(self): 187 quantum_dependencies = set() 188 dependency_queue = self._quantum_tasks.copy() 189 while dependency_queue: 190 t = dependency_queue.pop(0) 191 quantum_dependencies |= set(t.dependencies) 192 dependency_queue += t.dependencies 193 194 quantum_dependencies = quantum_dependencies.difference(self._quantum_tasks) 195 # The gateway tasks will ensure execution order of 196 # (all classical tasks that quantum tasks depend on) - (all quantum tasks) - (rest of the classical tasks) 197 gateway_task_classical = AQLTask( 198 lambda: True, 199 dependencies=list(quantum_dependencies) 200 ) 201 202 for qt in self._quantum_tasks: 203 qt.dependencies.append(gateway_task_classical) 204 205 gateway_task_quantum = AQLTask( 206 lambda: 42, 207 dependencies=self._quantum_tasks.copy() 208 ) 209 210 for ct in [t for t in self._classical_tasks if not t in quantum_dependencies]: 211 ct.dependencies.append(gateway_task_quantum) 212 213 self._classical_tasks.append(gateway_task_classical) 214 self._quantum_tasks.append(gateway_task_quantum) 215 216 # Start all tasks 217 for t in self._classical_tasks: 218 t.start() 219 220 for qt in self._quantum_tasks: 221 qt.start() 222 223 def __enter__(self): 224 return self 225 226 def __exit__(self, exc_type, exc_value, exc_traceback): 227 for t in self.tasks: 228 if t.running(): 229 t.cancel()