1# TODO update to new QL version
2from typing import Tuple
3import asyncio
4from ..base import Backend, Algorithm, Problem
5from ..base.adapter_structure import get_formatter
6from .qlauncher import QuantumLauncher
7from typing import List
8
9
[docs]
10class asyncQuantumLauncher(QuantumLauncher):
11
[docs]
12 def start(self, times: int = 1, real_backend: Backend = None, debugging=False) -> None:
13 self.real_backend = self.backend if real_backend is None else real_backend
14 self._async_running = 1
15 self._results = []
16 self.debugging: bool = debugging
17 self._prepare_problem()
18 asyncio.run(self.run_async(times))
19 return self._results
20
[docs]
21 async def run_async_task(self, pool: asyncio.BaseEventLoop):
22 if self.debugging:
23 print('cloud task started')
24 result = await pool.run_in_executor(None, self.algorithm.run, self.problem, self.backend)
25 self._results.append(result)
26 if self.debugging:
27 print('cloud task finished')
28 return result
29
[docs]
30 async def run_async_fake_task(self, pool: asyncio.BaseEventLoop):
31 if self.debugging:
32 print('local task started')
33 result = await pool.run_in_executor(None, self.algorithm.run, self.problem, self.backend)
34 self._results.append(result)
35 if self.debugging:
36 print('local task finished')
37 return result
38
[docs]
39 async def run_async(self, times: int):
40 if self.debugging:
41 print('creating tasks started')
42 pool = asyncio.get_event_loop()
43 tasks = [self.run_async_task(pool) for _ in range(times)]
44 tasks += [self.run_async_fake_task(pool)]
45 await asyncio.gather(*tasks)
46 if self.debugging:
47 print('all tasks finished')
48
49
[docs]
50class AQL:
51 def __init__(self, backends: List[Tuple[Backend, int]], algorithms: List[Tuple[Algorithm, int]], problems: List[Tuple[Problem, int]],
52 debugging: bool = False):
53 self.backends = backends
54 self.algorithms = algorithms
55 self.problems = problems
56 self._results = []
57 self._results_bitstring = []
58 self._async_running = 0
59 self.debugging: bool = debugging
60
[docs]
61 def start(self) -> List[any]:
62 self._async_running = 1
63 self._results = []
64 self._results_bitstring = []
65
66 asyncio.run(self.run_async())
67 return self._results, self._results_bitstring
68
[docs]
69 async def run_async_task(self, pool: asyncio.BaseEventLoop, backend: Backend, algorithm: Algorithm, problem: Problem):
70 # print('Task Started')
71 if self.debugging:
72 print('cloud task started')
73 formatter = get_formatter(
74 problem._problem_id, algorithm._algorithm_format)
75 result = await pool.run_in_executor(None, algorithm.run, problem, backend, formatter)
76 self._results.append(result)
77 self._results_bitstring.append(result.best_bitstring)
78 # print('Task Done')
79
80 if self.debugging:
81 print('cloud task finished')
82 return result
83
[docs]
84 async def run_async(self):
85 if self.debugging:
86 print('creating tasks started')
87 pool = asyncio.get_event_loop()
88 tasks = []
89 for backend, b_times in self.backends:
90 for algorithm, a_times in self.algorithms:
91 for problem, p_times in self.problems:
92 times = b_times * a_times * p_times
93 for _ in range(times):
94 tasks.append(self.run_async_task(
95 pool, backend, algorithm, problem))
96 # print(len(tasks), tasks)
97 await asyncio.gather(*tasks)
98 if self.debugging:
99 print('all tasks finished')
100
101
[docs]
102class AQLManager:
103 """
104 Context manager for asyncQuantumLauncher
105 Simplified high-level context manager to support asynchronous flow of asyncQuantumLauncher.
106
107 Inside is only initialization and whole processing is done at the end.
108
109 To save the results it's recommended to assign manager's variables to local ones, so they don't get destroyed.
110
111
112 Usage Example
113 -------------
114 ::
115
116 with AQLManager('my_path') as launcher:
117 launcher.add()
118 launcher.add()
119 launcher.add()
120 result = aql.result
121 print(result)
122
123 """
124
125 def __init__(self, path: str = None):
126 self.aql: asyncQuantumLauncher | None = None
127 self.path = path
128 self.result = []
129 self.result_bitstring = []
130 self._backends: List[Backend] = []
131 self._algorithms: List[Algorithm] = []
132 self._problems: List[Problem] = []
133
134 def __enter__(self):
135 # self.aql: asyncQuantumLauncher = asyncQuantumLauncher(None, None, None)
136 return self
137
[docs]
138 def add_backend(self, backend: Backend, times: int = 1):
139 self._backends.append((backend, times))
140
[docs]
141 def add_algorithm(self, algorithm, times: int = 1):
142 self._algorithms.append((algorithm, times))
143
[docs]
144 def add_problem(self, problem, times: int = 1):
145 self._problems.append((problem, times))
146
[docs]
147 def add(self, backend: Backend = None, algorithm: Algorithm = None, problem: Problem = None, times: int = 1):
148 self._backends.append((backend, times))
149 self._algorithms.append((algorithm, 1))
150 self._problems.append((problem, 1))
151
152 def __exit__(self, exc_type, exc_val, exc_tb):
153 if exc_type is not None:
154 raise exc_type(exc_val).with_traceback(exc_tb)
155 aql = AQL(self._backends, self._algorithms, self._problems)
156 result, result_bitstring = aql.start()
157 self.result.extend(result)
158 self.result_bitstring.extend(result_bitstring)
159
160
161if __name__ == '__main__':
162 from problems import MaxCut, EC
163 from ..routines.qiskit_routines import QAOA, IBMBackend
164
165 with AQLManager('test') as launcher:
166 launcher.add(backend=IBMBackend('local_simulator'),
167 algorithm=QAOA(p=1), problem=EC('exact', instance_name='toy'))
168 for i in range(2, 3):
169 launcher.add_algorithm(QAOA(p=i))
170 result = launcher.result
171 result_bitstring = launcher.result_bitstring
172 print(len(result))
173 print(result_bitstring)
174
175 for ind, i in enumerate(result):
176 print(ind, i['SamplingVQEResult'].best_measurement)