1""" Algorithms for Qiskit routines """
2import json
3from datetime import datetime
4from typing import Callable
5
6import numpy as np
7
8from qiskit import qpy, QuantumCircuit
9from qiskit.circuit import ParameterVector
10from qiskit.circuit.library import PauliEvolutionGate
11# from qiskit.opflow import H
12from qiskit.primitives.base.base_primitive import BasePrimitive
13from qiskit.quantum_info import SparsePauliOp
14from qiskit_algorithms.minimum_eigensolvers import QAOA as QiskitQAOA
15from qiskit_algorithms.minimum_eigensolvers import SamplingVQEResult
16
17from quantum_launcher.base import Problem, Algorithm, Result
18from quantum_launcher.routines.qiskit_routines.backends.ibm_backend import IBMBackend
19
20
[docs]
21class QiskitOptimizationAlgorithm(Algorithm):
22 """ Abstract class for Qiskit optimization algorithms """
23
[docs]
24 def make_tag(self, problem: Problem, backend: IBMBackend) -> str:
25 tag = problem.__class__.__name__ + '-' + \
26 backend.__class__.__name__ + '-' + \
27 self.__class__.__name__ + '-' + \
28 datetime.today().strftime('%Y-%m-%d')
29 return tag
30
[docs]
31 def get_processing_times(self, tag: str, primitive: BasePrimitive) -> None | tuple[list, list, int]:
32 timestamps = []
33 usages = []
34 qpu_time = 0
35 if hasattr(primitive, 'session'):
36 jobs = primitive.session.service.jobs(limit=None, job_tags=[tag])
37 for job in jobs:
38 m = job.metrics()
39 timestamps.append(m['timestamps'])
40 usages.append(m['usage'])
41 qpu_time += m['usage']['quantum_seconds']
42 return timestamps, usages, qpu_time
43
44
[docs]
45def commutator(op_a: SparsePauliOp, op_b: SparsePauliOp) -> SparsePauliOp:
46 """ Commutator """
47 return op_a @ op_b - op_b @ op_a
48
49
[docs]
50class QAOA(QiskitOptimizationAlgorithm):
51 """Algorithm class with QAOA.
52
53 Args:
54 p (int): The number of QAOA steps. Defaults to 1.
55 alternating_ansatz (bool): Whether to use an alternating ansatz. Defaults to False. If True, it's recommended to provide a mixer_h to alg_kwargs.
56 aux: Auxiliary input for the QAOA algorithm.
57 **alg_kwargs: Additional keyword arguments for the base class.
58
59 Attributes:
60 name (str): The name of the algorithm.
61 aux: Auxiliary input for the QAOA algorithm.
62 p (int): The number of QAOA steps.
63 alternating_ansatz (bool): Whether to use an alternating ansatz.
64 parameters (list): List of parameters for the algorithm.
65 mixer_h (SparsePauliOp | None): The mixer Hamiltonian.
66
67 """
68 _algorithm_format = 'hamiltonian'
69
70 def __init__(self, p: int = 1, alternating_ansatz: bool = False, aux=None, **alg_kwargs):
71 super().__init__(**alg_kwargs)
72 self.name: str = 'qaoa'
73 self.aux = aux
74 self.p: int = p
75 self.alternating_ansatz: bool = alternating_ansatz
76 self.parameters = ['p']
77 self.mixer_h: SparsePauliOp | None = None
78 self.initial_state: QuantumCircuit | None = None
79
80 @property
81 def setup(self) -> dict:
82 return {
83 'aux': self.aux,
84 'p': self.p,
85 'parameters': self.parameters,
86 'arg_kwargs': self.alg_kwargs
87 }
88
[docs]
89 def parse_samplingVQEResult(self, res: SamplingVQEResult, res_path) -> dict:
90 res_dict = {}
91 for k, v in vars(res).items():
92 if k[0] == "_":
93 key = k[1:]
94 else:
95 key = k
96 try:
97 res_dict = {**res_dict, **json.loads(json.dumps({key: v}))}
98 except TypeError as ex:
99 if str(ex) == 'Object of type complex128 is not JSON serializable':
100 res_dict = {**res_dict, **
101 json.loads(json.dumps({key: v}, default=repr))}
102 elif str(ex) == 'Object of type ndarray is not JSON serializable':
103 res_dict = {**res_dict, **
104 json.loads(json.dumps({key: v}, default=repr))}
105 elif str(ex) == 'keys must be str, int, float, bool or None, not ParameterVectorElement':
106 res_dict = {**res_dict, **
107 json.loads(json.dumps({key: repr(v)}))}
108 elif str(ex) == 'Object of type OptimizerResult is not JSON serializable':
109 # recursion ftw
110 new_v = self.parse_samplingVQEResult(v, res_path)
111 res_dict = {**res_dict, **
112 json.loads(json.dumps({key: new_v}))}
113 elif str(ex) == 'Object of type QuantumCircuit is not JSON serializable':
114 path = res_path + '.qpy'
115 with open(path, 'wb') as f:
116 qpy.dump(v, f)
117 res_dict = {**res_dict, **{key: path}}
118 return res_dict
119
[docs]
120 def run(self, problem: Problem, backend: IBMBackend, formatter=Callable) -> Result:
121 """ Runs the QAOA algorithm """
122 hamiltonian: SparsePauliOp = formatter(problem)
123 energies = []
124
125 def qaoa_callback(evaluation_count, params, mean, std):
126 energies.append(mean)
127
128 tag = self.make_tag(problem, backend)
129 sampler = backend.samplerV1
130 # sampler.set_options(job_tags=[tag])
131 optimizer = backend.optimizer
132
133 if self.alternating_ansatz:
134 if self.mixer_h is None:
135 self.mixer_h = formatter.get_mixer_hamiltonian(problem)
136 if self.initial_state is None:
137 self.initial_state = formatter.get_QAOAAnsatz_initial_state(
138 problem)
139
140 qaoa = QiskitQAOA(sampler, optimizer, reps=self.p, callback=qaoa_callback,
141 mixer=self.mixer_h, initial_state=self.initial_state, **self.alg_kwargs)
142 qaoa_result = qaoa.compute_minimum_eigenvalue(hamiltonian, self.aux)
143 depth = qaoa.ansatz.decompose(reps=10).depth()
144 if 'cx' in qaoa.ansatz.decompose(reps=10).count_ops():
145 cx_count = qaoa.ansatz.decompose(reps=10).count_ops()['cx']
146 else:
147 cx_count = 0
148 timestamps, usages, qpu_time = self.get_processing_times(tag, sampler)
149 return self.construct_result({'energy': qaoa_result.eigenvalue,
150 'depth': depth,
151 'cx_count': cx_count,
152 'qpu_time': qpu_time,
153 'energies': energies,
154 'SamplingVQEResult': qaoa_result,
155 'usages': usages,
156 'timestamps': timestamps})
157
[docs]
158 def construct_result(self, result: dict) -> Result:
159
160 best_bitstring = self.get_bitstring(result)
161 best_energy = result['energy']
162
163 distribution = dict(result['SamplingVQEResult'].eigenstate.items())
164 most_common_value = max(
165 distribution, key=distribution.get)
166 most_common_bitstring = bin(most_common_value)[2:].zfill(
167 len(best_bitstring))
168 most_common_bitstring_energy = distribution[most_common_value]
169 num_of_samples = 0 # TODO: implement
170 average_energy = np.mean(result['energies'])
171 energy_std = np.std(result['energies'])
172 return Result(best_bitstring, best_energy, most_common_bitstring, most_common_bitstring_energy, distribution, result['energies'], num_of_samples, average_energy, energy_std, result)
173
[docs]
174 def get_bitstring(self, result) -> str:
175 return result['SamplingVQEResult'].best_measurement['bitstring']
176
177
[docs]
178class FALQON(QiskitOptimizationAlgorithm):
179 """
180 Algorithm class with FALQON.
181
182 Args:
183 driver_h (Optional[Operator]): The driver Hamiltonian for the problem.
184 delta_t (float): The time step for the evolution operators.
185 beta_0 (float): The initial value of beta.
186 n (int): The number of iterations to run the algorithm.
187 **alg_kwargs: Additional keyword arguments for the base class.
188
189 Attributes:
190 driver_h (Optional[Operator]): The driver Hamiltonian for the problem.
191 delta_t (float): The time step for the evolution operators.
192 beta_0 (float): The initial value of beta.
193 n (int): The number of iterations to run the algorithm.
194 cost_h (Optional[Operator]): The cost Hamiltonian for the problem.
195 n_qubits (int): The number of qubits in the problem.
196 parameters (List[str]): The list of algorithm parameters.
197
198 """
199
200 def __init__(self, driver_h=None, delta_t=0, beta_0=0, n=1):
201 super().__init__()
202 self.driver_h = driver_h
203 self.delta_t = delta_t
204 self.beta_0 = beta_0
205 self.n = n
206 self.cost_h = None
207 self.n_qubits: int = 0
208 self.parameters = ['n', 'delta_t', 'beta_0']
209 raise NotImplementedError('FALQON is not implemented yet')
210
211 @property
212 def setup(self) -> dict:
213 return {
214 'driver_h': self.driver_h,
215 'delta_t': self.delta_t,
216 'beta_0': self.beta_0,
217 'n': self.n,
218 'cost_h': self.cost_h,
219 'n_qubits': self.n_qubits,
220 'parameters': self.parameters,
221 'arg_kwargs': self.alg_kwargs
222 }
223
224 def _get_path(self) -> str:
225 return f'{self.name}@{self.n}@{self.delta_t}@{self.beta_0}'
226
[docs]
227 def run(self, problem: Problem, backend: IBMBackend):
228 """ Runs the FALQON algorithm """
229 # TODO implement aux operator
230 hamiltonian = problem.get_qiskit_hamiltonian()
231 self.cost_h = hamiltonian
232 self.n_qubits = hamiltonian.num_qubits
233 if self.driver_h is None:
234 self.driver_h = SparsePauliOp.from_sparse_list(
235 [("X", [i], 1) for i in range(self.n_qubits)], num_qubits=self.n_qubits)
236
237 betas = [self.beta_0]
238 energies = []
239 circuit_depths = []
240 cxs = []
241
242 tag = self.make_tag(problem, backend)
243 estimator = backend.estimator
244 sampler = backend.sampler
245 sampler.set_options(job_tags=[tag])
246 estimator.set_options(job_tags=[tag])
247
248 best_sample, last_sample = self._falqon_subroutine(estimator,
249 sampler, energies, betas, circuit_depths, cxs)
250
251 timestamps, usages, qpu_time = self.get_processing_times(tag, sampler)
252 result = {'betas': betas,
253 'energies': energies,
254 'depths': circuit_depths,
255 'cxs': cxs,
256 'n': self.n,
257 'delta_t': self.delta_t,
258 'beta_0': self.beta_0,
259 'energy': min(energies),
260 'qpu_time': qpu_time,
261 'best_sample': best_sample,
262 'last_sample': last_sample,
263 'usages': usages,
264 'timestamps': timestamps}
265
266 return result
267
268 def _build_ansatz(self, betas):
269 """ building ansatz circuit """
270 H = None # TODO: implement H
271 circ = (H ^ self.cost_h.num_qubits).to_circuit()
272 params = ParameterVector("beta", length=len(betas))
273 for param in params:
274 circ.append(PauliEvolutionGate(
275 self.cost_h, time=self.delta_t), circ.qubits)
276 circ.append(PauliEvolutionGate(self.driver_h,
277 time=self.delta_t * param), circ.qubits)
278 return circ
279
280 def _falqon_subroutine(self, estimator,
281 sampler, energies, betas, circuit_depths, cxs):
282 """ subroutine for falqon """
283 for i in range(self.n):
284 betas, energy, depth, cx_count = self._run_falqon(betas, estimator)
285 print(i, energy)
286 energies.append(energy)
287 circuit_depths.append(depth)
288 cxs.append(cx_count)
289 argmin = np.argmin(np.asarray(energies))
290 best_sample = self._sample_at(betas[:argmin], sampler)
291 last_sample = self._sample_at(betas, sampler)
292 return best_sample, last_sample
293
294 def _run_falqon(self, betas, estimator):
295 """ Method to run FALQON algorithm """
296 ansatz = self._build_ansatz(betas)
297 comm_h = complex(0, 1) * commutator(self.driver_h, self.cost_h)
298 beta = -1 * estimator.run(ansatz, comm_h, betas).result().values[0]
299 betas.append(beta)
300
301 ansatz = self._build_ansatz(betas)
302 energy = estimator.run(ansatz, self.cost_h, betas).result().values[0]
303
304 depth = ansatz.decompose(reps=10).depth()
305 if 'cx' in ansatz.decompose(reps=10).count_ops():
306 cx_count = ansatz.decompose(reps=10).count_ops()['cx']
307 else:
308 cx_count = 0
309
310 return betas, energy, depth, cx_count
311
312 def _sample_at(self, betas, sampler):
313 """ Not sure yet """
314 ansatz = self._build_ansatz(betas)
315 ansatz.measure_all()
316 res = sampler.run(ansatz, betas).result()
317 return res