1""" Algorithms for Qiskit routines """
2import json
3from datetime import datetime
4from collections.abc import Callable
5
6import numpy as np
7
8from qiskit import qpy, QuantumCircuit
9from qiskit.circuit.library import PauliEvolutionGate
10
11from qiskit.primitives import PrimitiveResult, SamplerPubResult, BaseEstimatorV1, BaseSamplerV1
12from qiskit.primitives.containers import BitArray
13from qiskit.primitives.base.base_primitive import BasePrimitive
14
15from qiskit.quantum_info import SparsePauliOp
16
17from qiskit_algorithms.minimum_eigensolvers import QAOA as QiskitQAOA
18from qiskit_algorithms.minimum_eigensolvers import SamplingVQEResult
19from qiskit_algorithms.optimizers import Optimizer, COBYLA
20
21from qlauncher.base import Problem, Algorithm, Result
22from qlauncher.base.base import Backend
23from qlauncher.routines.cirq import CirqBackend
24from qlauncher.routines.qiskit.backends.qiskit_backend import QiskitBackend
25
26
[docs]
27class QiskitOptimizationAlgorithm(Algorithm):
28 """ Abstract class for Qiskit optimization algorithms """
29
[docs]
30 def make_tag(self, problem: Problem, backend: QiskitBackend) -> str:
31 tag = problem.__class__.__name__ + '-' + \
32 backend.__class__.__name__ + '-' + \
33 self.__class__.__name__ + '-' + \
34 datetime.today().strftime('%Y-%m-%d')
35 return tag
36
[docs]
37 def get_processing_times(self, tag: str, primitive: BasePrimitive) -> None | tuple[list, list, int]:
38 timestamps = []
39 usages = []
40 qpu_time = 0
41 if hasattr(primitive, 'session'):
42 jobs = primitive.session.service.jobs(limit=None, job_tags=[tag])
43 for job in jobs:
44 m = job.metrics()
45 timestamps.append(m['timestamps'])
46 usages.append(m['usage'])
47 qpu_time += m['usage']['quantum_seconds']
48 return timestamps, usages, qpu_time
49
50
[docs]
51def commutator(op_a: SparsePauliOp, op_b: SparsePauliOp) -> SparsePauliOp:
52 """ Commutator """
53 return op_a @ op_b - op_b @ op_a
54
55
[docs]
56class QAOA(QiskitOptimizationAlgorithm):
57 """Algorithm class with QAOA.
58
59 Args:
60 p (int): The number of QAOA steps. Defaults to 1.
61 optimizer (Optimizer | None): Optimizer used during algorithm runtime. If set to `None` turns into COBYLA. Defaults to None,
62 alternating_ansatz (bool): Whether to use an alternating ansatz. Defaults to False. If True, it's recommended to provide a mixer_h to alg_kwargs.
63 aux: Auxiliary input for the QAOA algorithm.
64 **alg_kwargs: Additional keyword arguments for the base class.
65
66 Attributes:
67 name (str): The name of the algorithm.
68 aux: Auxiliary input for the QAOA algorithm.
69 p (int): The number of QAOA steps.
70 optimizer (Optimizer): Optimizer used during algorithm runtime.
71 alternating_ansatz (bool): Whether to use an alternating ansatz.
72 parameters (list): List of parameters for the algorithm.
73 mixer_h (SparsePauliOp | None): The mixer Hamiltonian.
74
75 """
76 _algorithm_format = 'hamiltonian'
77
78 def __init__(self, p: int = 1, optimizer: Optimizer | None = None, alternating_ansatz: bool = False, aux=None, **alg_kwargs):
79 super().__init__(**alg_kwargs)
80 self.name: str = 'qaoa'
81 self.aux = aux
82 self.p: int = p
83 self.optimizer: Optimizer = optimizer if optimizer is not None else COBYLA()
84 self.alternating_ansatz: bool = alternating_ansatz
85 self.parameters = ['p']
86 self.mixer_h: SparsePauliOp | None = None
87 self.initial_state: QuantumCircuit | None = None
88
89 @property
90 def setup(self) -> dict:
91 return {
92 'aux': self.aux,
93 'p': self.p,
94 'parameters': self.parameters,
95 'arg_kwargs': self.alg_kwargs
96 }
97
[docs]
98 def parse_samplingVQEResult(self, res: SamplingVQEResult, res_path) -> dict:
99 res_dict = {}
100 for k, v in vars(res).items():
101 if k[0] == "_":
102 key = k[1:]
103 else:
104 key = k
105 try:
106 res_dict = {**res_dict, **json.loads(json.dumps({key: v}))}
107 except TypeError as ex:
108 if str(ex) == 'Object of type complex128 is not JSON serializable':
109 res_dict = {**res_dict, **
110 json.loads(json.dumps({key: v}, default=repr))}
111 elif str(ex) == 'Object of type ndarray is not JSON serializable':
112 res_dict = {**res_dict, **
113 json.loads(json.dumps({key: v}, default=repr))}
114 elif str(ex) == 'keys must be str, int, float, bool or None, not ParameterVectorElement':
115 res_dict = {**res_dict, **
116 json.loads(json.dumps({key: repr(v)}))}
117 elif str(ex) == 'Object of type OptimizerResult is not JSON serializable':
118 # recursion ftw
119 new_v = self.parse_samplingVQEResult(v, res_path)
120 res_dict = {**res_dict, **
121 json.loads(json.dumps({key: new_v}))}
122 elif str(ex) == 'Object of type QuantumCircuit is not JSON serializable':
123 path = res_path + '.qpy'
124 with open(path, 'wb') as f:
125 qpy.dump(v, f)
126 res_dict = {**res_dict, **{key: path}}
127 return res_dict
128
[docs]
129 def run(self, problem: Problem, backend: Backend, formatter: Callable) -> Result:
130 """ Runs the QAOA algorithm """
131 if not (isinstance(backend, QiskitBackend) or isinstance(backend, CirqBackend)):
132 raise ValueError('Backend should be CirqBackend, QiskitBackend or subclass.')
133 hamiltonian: SparsePauliOp = formatter(problem)
134 energies = []
135
136 def qaoa_callback(evaluation_count, params, mean, std):
137 energies.append(mean)
138
139 tag = self.make_tag(problem, backend)
140 sampler = backend.samplerV1
141 # sampler.set_options(job_tags=[tag])
142
143 if self.alternating_ansatz:
144 if self.mixer_h is None:
145 self.mixer_h = formatter.get_mixer_hamiltonian(problem)
146 if self.initial_state is None:
147 self.initial_state = formatter.get_QAOAAnsatz_initial_state(
148 problem)
149
150 qaoa = QiskitQAOA(sampler, self.optimizer, reps=self.p, callback=qaoa_callback,
151 mixer=self.mixer_h, initial_state=self.initial_state, **self.alg_kwargs)
152 qaoa_result = qaoa.compute_minimum_eigenvalue(hamiltonian, self.aux)
153 depth = qaoa.ansatz.decompose(reps=10).depth()
154 if 'cx' in qaoa.ansatz.decompose(reps=10).count_ops():
155 cx_count = qaoa.ansatz.decompose(reps=10).count_ops()['cx']
156 else:
157 cx_count = 0
158 timestamps, usages, qpu_time = self.get_processing_times(tag, sampler)
159 return self.construct_result({'energy': qaoa_result.eigenvalue,
160 'depth': depth,
161 'cx_count': cx_count,
162 'qpu_time': qpu_time,
163 'energies': energies,
164 'SamplingVQEResult': qaoa_result,
165 'usages': usages,
166 'timestamps': timestamps})
167
[docs]
168 def construct_result(self, result: dict) -> Result:
169
170 best_bitstring = self.get_bitstring(result)
171 best_energy = result['energy']
172
173 distribution = dict(result['SamplingVQEResult'].eigenstate.items())
174 most_common_value = max(
175 distribution, key=distribution.get)
176 most_common_bitstring = bin(most_common_value)[2:].zfill(
177 len(best_bitstring))
178 most_common_bitstring_energy = distribution[most_common_value]
179 num_of_samples = 0 # TODO: implement
180 average_energy = np.mean(result['energies'])
181 energy_std = np.std(result['energies'])
182 return Result(best_bitstring, best_energy, most_common_bitstring, most_common_bitstring_energy, distribution, result['energies'], num_of_samples, average_energy, energy_std, result)
183
[docs]
184 def get_bitstring(self, result) -> str:
185 return result['SamplingVQEResult'].best_measurement['bitstring']
186
187
[docs]
188class FALQON(QiskitOptimizationAlgorithm):
189 """
190 Algorithm class with FALQON.
191
192 Args:
193 driver_h (Operator | None): The driver Hamiltonian for the problem.
194 delta_t (float): The time step for the evolution operators.
195 beta_0 (float): The initial value of beta.
196 n (int): The number of iterations to run the algorithm.
197 **alg_kwargs: Additional keyword arguments for the base class.
198
199 Attributes:
200 driver_h (Operator | None): The driver Hamiltonian for the problem.
201 delta_t (float): The time step for the evolution operators.
202 beta_0 (float): The initial value of beta.
203 n (int): The number of iterations to run the algorithm.
204 cost_h (Operator | None): The cost Hamiltonian for the problem.
205 n_qubits (int): The number of qubits in the problem.
206 parameters (list[str]): The list of algorithm parameters.
207
208 """
209 _algorithm_format = 'hamiltonian'
210
211 def __init__(
212 self,
213 driver_h: SparsePauliOp | None = None,
214 delta_t: float = 0.03,
215 beta_0: float = 0.0,
216 max_reps: int = 20
217 ) -> None:
218 super().__init__()
219 self.driver_h = driver_h
220 self.cost_h = None
221 self.delta_t = delta_t
222 self.beta_0 = beta_0
223 self.max_reps = max_reps
224 self.n_qubits: int = 0
225 self.parameters = ['n', 'delta_t', 'beta_0']
226
227 @property
228 def setup(self) -> dict:
229 return {
230 'driver_h': self.driver_h,
231 'delta_t': self.delta_t,
232 'beta_0': self.beta_0,
233 'n': self.max_reps,
234 'cost_h': self.cost_h,
235 'n_qubits': self.n_qubits,
236 'parameters': self.parameters,
237 'arg_kwargs': self.alg_kwargs
238 }
239
240 def _get_path(self) -> str:
241 return f'{self.name}@{self.max_reps}@{self.delta_t}@{self.beta_0}'
242
[docs]
243 def run(self, problem: Problem, backend: QiskitBackend, formatter: Callable) -> Result:
244 """ Runs the FALQON algorithm """
245
246 if isinstance(backend.sampler, BaseSamplerV1) or isinstance(backend.estimator, BaseEstimatorV1):
247 raise ValueError("FALQON works only on V2 samplers and estimators, consider using a different backend.")
248
249 cost_h = formatter(problem)
250
251 if cost_h is None:
252 raise ValueError("Formatter returned None")
253
254 self.n_qubits = cost_h.num_qubits
255
256 best_sample, betas, energies, depths, cnot_counts = self._falqon_subroutine(cost_h, backend)
257
258 best_data: BitArray = best_sample[0].data.meas
259 counts: dict = best_data.get_counts()
260 shots = best_data.num_shots
261
262 result = {'betas': betas,
263 'energies': energies,
264 'depths': depths,
265 'cxs': cnot_counts,
266 'n': self.max_reps,
267 'delta_t': self.delta_t,
268 'beta_0': self.beta_0,
269 'energy': min(energies),
270 }
271
272 return Result(
273 best_bitstring=max(counts, key=counts.get),
274 most_common_bitstring=max(counts, key=counts.get),
275 distribution={k: v/shots for k, v in counts.items()},
276 energies=energies,
277 energy_std=np.std(energies),
278 best_energy=min(energies),
279 num_of_samples=shots,
280 average_energy=np.mean(energies),
281 most_common_bitstring_energy=0,
282 result=result
283 )
284
285 def _add_ansatz_part(
286 self,
287 cost_hamiltonian: SparsePauliOp,
288 driver_hamiltonian: SparsePauliOp,
289 beta: float,
290 circuit: QuantumCircuit
291 ) -> None:
292 """Adds a single FALQON ansatz 'building block' with the specified beta to the circuit"""
293 circ_part = QuantumCircuit(circuit.num_qubits)
294
295 circ_part.append(PauliEvolutionGate(cost_hamiltonian, time=self.delta_t), circ_part.qubits)
296 circ_part.append(PauliEvolutionGate(driver_hamiltonian, time=self.delta_t * beta), circ_part.qubits)
297
298 circuit.compose(circ_part, circ_part.qubits, inplace=True)
299
300 def _build_ansatz(self, cost_hamiltonian, driver_hamiltonian, betas):
301 """Build the FALQON circuit for the given betas"""
302
303 circ = QuantumCircuit(self.n_qubits)
304 circ.h(range(self.n_qubits))
305
306 for beta in betas:
307 circ.append(PauliEvolutionGate(cost_hamiltonian, time=self.delta_t), circ.qubits)
308 circ.append(PauliEvolutionGate(driver_hamiltonian, time=self.delta_t * beta), circ.qubits)
309 return circ
310
311 def _falqon_subroutine(
312 self,
313 cost_hamiltonian: SparsePauliOp,
314 backend: QiskitBackend
315 ) -> tuple[PrimitiveResult[SamplerPubResult], list[float], list[float], list[int], list[int]]:
316 """
317 Run the 'meat' of the algorithm.
318
319 Args:
320 cost_hamiltonian (SparsePauliOp): Cost hamiltonian from the formatter.
321 backend (QiskitBackend): Backend
322
323 Returns:
324 tuple[PrimitiveResult[SamplerPubResult], list[float], list[float], list[int], list[int]]:
325 Sampler result from best betas, list of betas, list of energies, list of depths, list of cnot counts
326 """
327
328 if self.driver_h is None:
329 self.driver_h = SparsePauliOp.from_sparse_list([("X", [i], 1) for i in range(self.n_qubits)], num_qubits=self.n_qubits)
330 driver_hamiltonian = self.driver_h
331 else:
332 driver_hamiltonian = self.driver_h
333
334 hamiltonian_commutator = complex(0, 1) * commutator(driver_hamiltonian, cost_hamiltonian)
335
336 betas = [self.beta_0]
337 energies = []
338 cnot_counts = []
339 circuit_depths = []
340
341 circuit = QuantumCircuit(self.n_qubits)
342 circuit.h(circuit.qubits)
343
344 self._add_ansatz_part(cost_hamiltonian, driver_hamiltonian, self.beta_0, circuit)
345
346 for i in range(self.max_reps):
347
348 beta = -1 * backend.estimator.run([(circuit, hamiltonian_commutator)]).result()[0].data.evs
349 betas.append(beta)
350
351 self._add_ansatz_part(cost_hamiltonian, driver_hamiltonian, beta, circuit)
352
353 energy = backend.estimator.run([(circuit, cost_hamiltonian)]).result()[0].data.evs
354 # print(i, energy)
355 energies.append(energy)
356 circuit_depths.append(circuit.depth())
357 cnot_counts.append(circuit.count_ops().get('cx', 0))
358
359 argmin = np.argmin(np.asarray(energies))
360
361 sampling_circuit = self._build_ansatz(cost_hamiltonian, driver_hamiltonian, betas[:argmin])
362 sampling_circuit.measure_all()
363
364 best_sample = backend.sampler.run([(sampling_circuit)]).result()
365
366 return best_sample, betas, energies, circuit_depths, cnot_counts