Source code for quantum_launcher.launcher.aql

  1# TODO update to new QL version
  2from typing import Tuple
  3import asyncio
  4from ..base import Backend, Algorithm, Problem
  5from ..base.adapter_structure import get_formatter
  6from .qlauncher import QuantumLauncher
  7from typing import List
  8
  9
[docs] 10class asyncQuantumLauncher(QuantumLauncher): 11
[docs] 12 def start(self, times: int = 1, real_backend: Backend = None, debugging=False) -> None: 13 self.real_backend = self.backend if real_backend is None else real_backend 14 self._async_running = 1 15 self._results = [] 16 self.debugging: bool = debugging 17 self._prepare_problem() 18 asyncio.run(self.run_async(times)) 19 return self._results
20
[docs] 21 async def run_async_task(self, pool: asyncio.BaseEventLoop): 22 if self.debugging: 23 print('cloud task started') 24 result = await pool.run_in_executor(None, self.algorithm.run, self.problem, self.backend) 25 self._results.append(result) 26 if self.debugging: 27 print('cloud task finished') 28 return result
29
[docs] 30 async def run_async_fake_task(self, pool: asyncio.BaseEventLoop): 31 if self.debugging: 32 print('local task started') 33 result = await pool.run_in_executor(None, self.algorithm.run, self.problem, self.backend) 34 self._results.append(result) 35 if self.debugging: 36 print('local task finished') 37 return result
38
[docs] 39 async def run_async(self, times: int): 40 if self.debugging: 41 print('creating tasks started') 42 pool = asyncio.get_event_loop() 43 tasks = [self.run_async_task(pool) for _ in range(times)] 44 tasks += [self.run_async_fake_task(pool)] 45 await asyncio.gather(*tasks) 46 if self.debugging: 47 print('all tasks finished')
48 49
[docs] 50class AQL: 51 def __init__(self, backends: List[Tuple[Backend, int]], algorithms: List[Tuple[Algorithm, int]], problems: List[Tuple[Problem, int]], 52 debugging: bool = False): 53 self.backends = backends 54 self.algorithms = algorithms 55 self.problems = problems 56 self._results = [] 57 self._results_bitstring = [] 58 self._async_running = 0 59 self.debugging: bool = debugging 60
[docs] 61 def start(self) -> List[any]: 62 self._async_running = 1 63 self._results = [] 64 self._results_bitstring = [] 65 66 asyncio.run(self.run_async()) 67 return self._results, self._results_bitstring
68
[docs] 69 async def run_async_task(self, pool: asyncio.BaseEventLoop, backend: Backend, algorithm: Algorithm, problem: Problem): 70 # print('Task Started') 71 if self.debugging: 72 print('cloud task started') 73 formatter = get_formatter( 74 problem._problem_id, algorithm._algorithm_format) 75 result = await pool.run_in_executor(None, algorithm.run, problem, backend, formatter) 76 self._results.append(result) 77 self._results_bitstring.append(result.best_bitstring) 78 # print('Task Done') 79 80 if self.debugging: 81 print('cloud task finished') 82 return result
83
[docs] 84 async def run_async(self): 85 if self.debugging: 86 print('creating tasks started') 87 pool = asyncio.get_event_loop() 88 tasks = [] 89 for backend, b_times in self.backends: 90 for algorithm, a_times in self.algorithms: 91 for problem, p_times in self.problems: 92 times = b_times * a_times * p_times 93 for _ in range(times): 94 tasks.append(self.run_async_task( 95 pool, backend, algorithm, problem)) 96 # print(len(tasks), tasks) 97 await asyncio.gather(*tasks) 98 if self.debugging: 99 print('all tasks finished')
100 101
[docs] 102class AQLManager: 103 """ 104 Context manager for asyncQuantumLauncher 105 Simplified high-level context manager to support asynchronous flow of asyncQuantumLauncher. 106 107 Inside is only initialization and whole processing is done at the end. 108 109 To save the results it's recommended to assign manager's variables to local ones, so they don't get destroyed. 110 111 112 Usage Example 113 ------------- 114 :: 115 116 with AQLManager('my_path') as launcher: 117 launcher.add() 118 launcher.add() 119 launcher.add() 120 result = aql.result 121 print(result) 122 123 """ 124 125 def __init__(self, path: str = None): 126 self.aql: asyncQuantumLauncher | None = None 127 self.path = path 128 self.result = [] 129 self.result_bitstring = [] 130 self._backends: List[Backend] = [] 131 self._algorithms: List[Algorithm] = [] 132 self._problems: List[Problem] = [] 133 134 def __enter__(self): 135 # self.aql: asyncQuantumLauncher = asyncQuantumLauncher(None, None, None) 136 return self 137
[docs] 138 def add_backend(self, backend: Backend, times: int = 1): 139 self._backends.append((backend, times))
140
[docs] 141 def add_algorithm(self, algorithm, times: int = 1): 142 self._algorithms.append((algorithm, times))
143
[docs] 144 def add_problem(self, problem, times: int = 1): 145 self._problems.append((problem, times))
146
[docs] 147 def add(self, backend: Backend = None, algorithm: Algorithm = None, problem: Problem = None, times: int = 1): 148 self._backends.append((backend, times)) 149 self._algorithms.append((algorithm, 1)) 150 self._problems.append((problem, 1))
151 152 def __exit__(self, exc_type, exc_val, exc_tb): 153 if exc_type is not None: 154 raise exc_type(exc_val).with_traceback(exc_tb) 155 aql = AQL(self._backends, self._algorithms, self._problems) 156 result, result_bitstring = aql.start() 157 self.result.extend(result) 158 self.result_bitstring.extend(result_bitstring)
159 160 161if __name__ == '__main__': 162 from problems import MaxCut, EC 163 from ..routines.qiskit_routines import QAOA, IBMBackend 164 165 with AQLManager('test') as launcher: 166 launcher.add(backend=IBMBackend('local_simulator'), 167 algorithm=QAOA(p=1), problem=EC('exact', instance_name='toy')) 168 for i in range(2, 3): 169 launcher.add_algorithm(QAOA(p=i)) 170 result = launcher.result 171 result_bitstring = launcher.result_bitstring 172 print(len(result)) 173 print(result_bitstring) 174 175 for ind, i in enumerate(result): 176 print(ind, i['SamplingVQEResult'].best_measurement)