1""" Algorithms for Qiskit routines """
2from datetime import datetime
3from collections.abc import Callable, Iterable
4from typing import Any, Literal
5import statistics
6import numpy as np
7from scipy.optimize import minimize
8
9import qiskit_algorithms
10from qiskit_algorithms import optimizers
11from qiskit_algorithms.minimum_eigensolvers.diagonal_estimator import _evaluate_sparsepauli as evaluate_energy
12
13from qiskit import QuantumCircuit
14from qiskit.circuit.library import PauliEvolutionGate, QAOAAnsatz, efficient_su2
15
16from qiskit.primitives import PrimitiveResult, SamplerPubResult, BaseSamplerV1, BaseEstimatorV1
17from qiskit.primitives.containers import BitArray
18from qiskit.primitives.base.base_primitive import BasePrimitive
19
20from qiskit.quantum_info import SparsePauliOp
21
22from qiskit_nature.second_q.algorithms.ground_state_solvers import GroundStateEigensolver
23from qiskit_nature.second_q.problems import EigenstateResult
24
25from qlauncher.base import Problem, Algorithm, Result
26from qlauncher.base.base import Backend
27from qlauncher.routines.cirq import CirqBackend
28from qlauncher.routines.qiskit.backends.qiskit_backend import QiskitBackend
29from qlauncher.problems import Molecule
30
31
[docs]
32class QiskitOptimizationAlgorithm(Algorithm):
33 """ Abstract class for Qiskit optimization algorithms """
34
[docs]
35 def make_tag(self, problem: Problem, backend: QiskitBackend) -> str:
36 tag = problem.__class__.__name__ + '-' + \
37 backend.__class__.__name__ + '-' + \
38 self.__class__.__name__ + '-' + \
39 datetime.today().strftime('%Y-%m-%d')
40 return tag
41
[docs]
42 def get_processing_times(self, tag: str, primitive: BasePrimitive) -> None | tuple[list, list, int]:
43 timestamps = []
44 usages = []
45 qpu_time = 0
46 if hasattr(primitive, 'session'):
47 jobs = primitive.session.service.jobs(limit=None, job_tags=[tag])
48 for job in jobs:
49 m = job.metrics()
50 timestamps.append(m['timestamps'])
51 usages.append(m['usage'])
52 qpu_time += m['usage']['quantum_seconds']
53 return timestamps, usages, qpu_time
54
55
[docs]
56def commutator(op_a: SparsePauliOp, op_b: SparsePauliOp) -> SparsePauliOp:
57 """ Commutator """
58 return op_a @ op_b - op_b @ op_a
59
60
[docs]
61def int_to_bitstring(number: int, total_bits: int):
62 return np.binary_repr(number, total_bits)[::-1]
63
64
[docs]
65def cvar_cost(probs_values: Iterable[tuple[float, float]], alpha: float):
66 """CVar cost function to be used instead of mean for qaoa training"""
67 if not (alpha > 0 and alpha <= 1):
68 raise ValueError("Alpha must be in range (0,1]")
69 cvar, acc = 0.0, 0.0
70 for p, v in sorted(probs_values, key=lambda x: x[1]):
71 if acc >= alpha:
72 break
73 acc += p
74 cvar += v * min(p, alpha-acc)
75 return cvar / alpha
76
77
[docs]
78class QAOA(QiskitOptimizationAlgorithm):
79 """Algorithm class with QAOA.
80
81 Args:
82 p (int): The number of QAOA steps. Defaults to 1.
83 optimizer (Optimizer | None): Optimizer used during algorithm runtime. If set to `None` turns into COBYLA. Defaults to None,
84 alternating_ansatz (bool): Whether to use an alternating ansatz. Defaults to False. If True, it's recommended to provide a mixer_h to alg_kwargs.
85 aux: Auxiliary input for the QAOA algorithm.
86 **alg_kwargs: Additional keyword arguments for the base class.
87
88 Attributes:
89 name (str): The name of the algorithm.
90 aux: Auxiliary input for the QAOA algorithm.
91 p (int): The number of QAOA steps.
92 optimizer (Optimizer): Optimizer used during algorithm runtime.
93 alternating_ansatz (bool): Whether to use an alternating ansatz.
94 parameters (list): List of parameters for the algorithm.
95 mixer_h (SparsePauliOp | None): The mixer Hamiltonian.
96
97 """
98 _algorithm_format = 'hamiltonian'
99
100 def __init__(
101 self,
102 p: int = 1,
103 optimization_method: Literal['COBYLA'] = "COBYLA",
104 max_evaluations: int = 100,
105 training_aggregation_method: Literal['mean', 'cvar'] = 'mean',
106 cvar_alpha: float = 1,
107 alternating_ansatz: bool = False,
108 aux=None,
109 **alg_kwargs):
110 super().__init__(**alg_kwargs)
111 self.name: str = 'qaoa'
112 self.aux = aux
113 self.p: int = p
114
115 self.optimization_method = optimization_method
116 self.max_evaluations = max_evaluations
117 self.training_aggregation_method = training_aggregation_method
118 self.cvar_alpha = cvar_alpha
119
120 self.alternating_ansatz: bool = alternating_ansatz
121 self.parameters = ['p']
122 self.mixer_h: SparsePauliOp | None = None
123 self.initial_state: QuantumCircuit | None = None
124
125 @property
126 def setup(self) -> dict:
127 return {
128 'aux': self.aux,
129 'p': self.p,
130 'parameters': self.parameters,
131 'arg_kwargs': self.alg_kwargs
132 }
133
134 def _get_optimized_circuit_params(self, circuit: QuantumCircuit, hamiltonian: SparsePauliOp, backend: QiskitBackend | CirqBackend) -> tuple[np.ndarray, list[float]]:
135 """
136 Optimize circuit params
137
138 Args:
139 circuit (QuantumCircuit): QAOA circuit to be optimized
140 backend (Backend): Backend containing sampler
141
142 Returns:
143 tuple[QuantumCircuit,list[float]]: Circuit with optimal param values applied, energy history
144 """
145 costs = []
146
147 def cost_fn(params: np.ndarray):
148 job = backend.sampler.run([(circuit, params)])
149 results = job.result()[0].data.meas.get_int_counts()
150 shots = sum(results.values())
151
152 probs_with_costs = {state: (count/shots, np.real(evaluate_energy(state, hamiltonian)))
153 for state, count in results.items()}
154
155 cost = (sum(prob*cost for prob, cost in probs_with_costs.values())
156 if self.training_aggregation_method == 'mean' else
157 cvar_cost(probs_with_costs.values(), self.cvar_alpha))
158 costs.append(cost)
159 return cost
160
161 res = minimize(
162 cost_fn,
163 np.array([np.pi]*(len(circuit.parameters)//2) + [np.pi / 2]*(len(circuit.parameters)//2)),
164 method=self.optimization_method,
165 tol=1e-2,
166 options={
167 'maxiter': self.max_evaluations
168 }
169 )
170
171 return res.x, costs
172
[docs]
173 def run(self, problem: Problem, backend: Backend, formatter: Callable) -> Result:
174 """ Runs the QAOA algorithm """
175
176 if not (isinstance(backend, QiskitBackend) or isinstance(backend, CirqBackend)):
177 raise ValueError('Backend should be CirqBackend, QiskitBackend or subclass.')
178
179 hamiltonian: SparsePauliOp = formatter(problem)
180
181 if self.alternating_ansatz:
182 if self.mixer_h is None:
183 self.mixer_h = formatter.get_mixer_hamiltonian(problem)
184 if self.initial_state is None:
185 self.initial_state = formatter.get_QAOAAnsatz_initial_state(
186 problem)
187
188 # Cirq translation issues if we use QAOAAnsatz() by itself without appending it to a QuantumCircuit
189 circuit = QuantumCircuit(hamiltonian.num_qubits)
190 circuit.append(QAOAAnsatz(cost_operator=hamiltonian, reps=self.p).to_instruction(), range(hamiltonian.num_qubits))
191
192 circuit.measure_all()
193
194 opt_params, costs = self._get_optimized_circuit_params(circuit, hamiltonian, backend)
195
196 job = backend.sampler.run([(circuit, opt_params)])
197 results = job.result()[0].data.meas.get_int_counts()
198
199 final_energies = {
200 int_to_bitstring(k, circuit.num_qubits): np.real(evaluate_energy(k, hamiltonian))
201 for k in results.keys()
202 }
203 final_counts = {int_to_bitstring(k, circuit.num_qubits): v for k, v in results.items()}
204
205 depth = circuit.decompose(reps=10).depth()
206 if 'cx' in circuit.decompose(reps=10).count_ops():
207 cx_count = circuit.decompose(reps=10).count_ops()['cx']
208 else:
209 cx_count = 0
210
211 return self.construct_result(
212 {
213 'energy': min(costs),
214 'depth': depth,
215 'cx_count': cx_count,
216 'qpu_time': 0,
217 'training_costs': costs,
218 'final_sample_energies': final_energies,
219 'final_sample_counts': final_counts,
220 'optimal_point': opt_params # needed for educated_guess
221 })
222
[docs]
223 def construct_result(self, result: dict) -> Result:
224 counts, energies = result['final_sample_counts'], result['final_sample_energies']
225 num_of_samples = sum(counts.values())
226 average_energy = statistics.mean(energies.values())
227 energy_std = statistics.stdev(energies.values()) if len(energies) > 1 else 0
228
229 best_bs = min(energies, key=energies.get)
230 most_common_bs = max(counts, key=counts.get)
231
232 return Result(
233 best_bitstring=best_bs,
234 best_energy=min(energies.values()),
235 most_common_bitstring=most_common_bs,
236 most_common_bitstring_energy=energies[most_common_bs],
237 distribution={k: v/num_of_samples for k, v in counts.items()},
238 energies=energies,
239 num_of_samples=num_of_samples,
240 average_energy=average_energy,
241 energy_std=energy_std,
242 result=result
243 )
244
245
[docs]
246class FALQON(QiskitOptimizationAlgorithm):
247 """
248 Algorithm class with FALQON.
249
250 Args:
251 driver_h (Operator | None): The driver Hamiltonian for the problem.
252 delta_t (float): The time step for the evolution operators.
253 beta_0 (float): The initial value of beta.
254 n (int): The number of iterations to run the algorithm.
255 **alg_kwargs: Additional keyword arguments for the base class.
256
257 Attributes:
258 driver_h (Operator | None): The driver Hamiltonian for the problem.
259 delta_t (float): The time step for the evolution operators.
260 beta_0 (float): The initial value of beta.
261 n (int): The number of iterations to run the algorithm.
262 cost_h (Operator | None): The cost Hamiltonian for the problem.
263 n_qubits (int): The number of qubits in the problem.
264 parameters (list[str]): The list of algorithm parameters.
265
266 """
267 _algorithm_format = 'hamiltonian'
268
269 def __init__(
270 self,
271 driver_h: SparsePauliOp | None = None,
272 delta_t: float = 0.03,
273 beta_0: float = 0.0,
274 max_reps: int = 20
275 ) -> None:
276 super().__init__()
277 self.driver_h = driver_h
278 self.cost_h = None
279 self.delta_t = delta_t
280 self.beta_0 = beta_0
281 self.max_reps = max_reps
282 self.n_qubits: int = 0
283 self.parameters = ['n', 'delta_t', 'beta_0']
284
285 @property
286 def setup(self) -> dict:
287 return {
288 'driver_h': self.driver_h,
289 'delta_t': self.delta_t,
290 'beta_0': self.beta_0,
291 'n': self.max_reps,
292 'cost_h': self.cost_h,
293 'n_qubits': self.n_qubits,
294 'parameters': self.parameters,
295 'arg_kwargs': self.alg_kwargs
296 }
297
298 def _get_path(self) -> str:
299 return f'{self.name}@{self.max_reps}@{self.delta_t}@{self.beta_0}'
300
[docs]
301 def run(self, problem: Problem, backend: QiskitBackend, formatter: Callable) -> Result:
302 """ Runs the FALQON algorithm """
303
304 if isinstance(backend.sampler, BaseSamplerV1) or isinstance(backend.estimator, BaseEstimatorV1):
305 raise ValueError("FALQON works only on V2 samplers and estimators, consider using a different backend.")
306
307 cost_h = formatter(problem)
308
309 if cost_h is None:
310 raise ValueError("Formatter returned None")
311
312 self.n_qubits = cost_h.num_qubits
313
314 best_sample, betas, energies, depths, cnot_counts = self._falqon_subroutine(cost_h, backend)
315
316 best_data: BitArray = best_sample[0].data.meas
317 counts: dict = best_data.get_counts()
318 shots = best_data.num_shots
319
320 result = {'betas': betas,
321 'energies': energies,
322 'depths': depths,
323 'cxs': cnot_counts,
324 'n': self.max_reps,
325 'delta_t': self.delta_t,
326 'beta_0': self.beta_0,
327 'energy': min(energies),
328 }
329
330 return Result(
331 best_bitstring=max(counts, key=counts.get),
332 most_common_bitstring=max(counts, key=counts.get),
333 distribution={k: v/shots for k, v in counts.items()},
334 energies=energies,
335 energy_std=np.std(energies),
336 best_energy=min(energies),
337 num_of_samples=shots,
338 average_energy=np.mean(energies),
339 most_common_bitstring_energy=0,
340 result=result
341 )
342
343 def _add_ansatz_part(
344 self,
345 cost_hamiltonian: SparsePauliOp,
346 driver_hamiltonian: SparsePauliOp,
347 beta: float,
348 circuit: QuantumCircuit
349 ) -> None:
350 """Adds a single FALQON ansatz 'building block' with the specified beta to the circuit"""
351 circ_part = QuantumCircuit(circuit.num_qubits)
352
353 circ_part.append(PauliEvolutionGate(cost_hamiltonian, time=self.delta_t), circ_part.qubits)
354 circ_part.append(PauliEvolutionGate(driver_hamiltonian, time=self.delta_t * beta), circ_part.qubits)
355
356 circuit.compose(circ_part, circ_part.qubits, inplace=True)
357
358 def _build_ansatz(self, cost_hamiltonian, driver_hamiltonian, betas):
359 """Build the FALQON circuit for the given betas"""
360
361 circ = QuantumCircuit(self.n_qubits)
362 circ.h(range(self.n_qubits))
363
364 for beta in betas:
365 circ.append(PauliEvolutionGate(cost_hamiltonian, time=self.delta_t), circ.qubits)
366 circ.append(PauliEvolutionGate(driver_hamiltonian, time=self.delta_t * beta), circ.qubits)
367 return circ
368
369 def _falqon_subroutine(
370 self,
371 cost_hamiltonian: SparsePauliOp,
372 backend: QiskitBackend
373 ) -> tuple[PrimitiveResult[SamplerPubResult], list[float], list[float], list[int], list[int]]:
374 """
375 Run the 'meat' of the algorithm.
376
377 Args:
378 cost_hamiltonian (SparsePauliOp): Cost hamiltonian from the formatter.
379 backend (QiskitBackend): Backend
380
381 Returns:
382 tuple[PrimitiveResult[SamplerPubResult], list[float], list[float], list[int], list[int]]:
383 Sampler result from best betas, list of betas, list of energies, list of depths, list of cnot counts
384 """
385
386 if self.driver_h is None:
387 self.driver_h = SparsePauliOp.from_sparse_list([("X", [i], 1) for i in range(self.n_qubits)], num_qubits=self.n_qubits)
388 driver_hamiltonian = self.driver_h
389 else:
390 driver_hamiltonian = self.driver_h
391
392 hamiltonian_commutator = complex(0, 1) * commutator(driver_hamiltonian, cost_hamiltonian)
393
394 betas = [self.beta_0]
395 energies = []
396 cnot_counts = []
397 circuit_depths = []
398
399 circuit = QuantumCircuit(self.n_qubits)
400 circuit.h(circuit.qubits)
401
402 self._add_ansatz_part(cost_hamiltonian, driver_hamiltonian, self.beta_0, circuit)
403
404 for i in range(self.max_reps):
405
406 beta = -1 * backend.estimator.run([(circuit, hamiltonian_commutator)]).result()[0].data.evs
407 betas.append(beta)
408
409 self._add_ansatz_part(cost_hamiltonian, driver_hamiltonian, beta, circuit)
410
411 energy = backend.estimator.run([(circuit, cost_hamiltonian)]).result()[0].data.evs
412 # print(i, energy)
413 energies.append(energy)
414 circuit_depths.append(circuit.depth())
415 cnot_counts.append(circuit.count_ops().get('cx', 0))
416
417 argmin = np.argmin(np.asarray(energies))
418
419 sampling_circuit = self._build_ansatz(cost_hamiltonian, driver_hamiltonian, betas[:argmin])
420 sampling_circuit.measure_all()
421
422 best_sample = backend.sampler.run([(sampling_circuit)]).result()
423
424 return best_sample, betas, energies, circuit_depths, cnot_counts
425
426
[docs]
427class VQE(QiskitOptimizationAlgorithm):
428 """Variational Quantum EigenSolver - qiskit-algorithm implementation wrapper.
429
430 Args:
431 optimizer (optimizers.Optimizer | None, optional): Optimizer for VQE. Defaults to None.
432 ansatz (QuantumCircuit | None, optional): VQE's ansatz. Defaults to None.
433 with_numpy (bool, optional): Ignores ansatz parameter and backend, and changes solver to Numpy based. Defaults to False.
434 """
435 # pip install git+https://github.com/qiskit-community/qiskit-nature.git
436 # pyscf
437
438 def __init__(self, optimizer: optimizers.Optimizer | None = None,
439 ansatz: QuantumCircuit | None = None, with_numpy: bool = False) -> None:
440 self.optimizer = optimizers.COBYLA() if optimizer is None else optimizer
441 self.ansatz = ansatz
442 self.num_qubits: int = 0
443 self.with_numpy: bool = with_numpy
444 super().__init__()
445
446 @property
447 def ansatz(self) -> QuantumCircuit:
448 if self._ansatz is None:
449 return efficient_su2(self.num_qubits)
450 return self._ansatz
451
452 @ansatz.setter
453 def ansatz(self, custom_ansatz):
454 self._ansatz = custom_ansatz
455
[docs]
456 def run(self, problem: Problem, backend: Backend, formatter: Callable[..., Any]) -> Result:
457 if not isinstance(backend, QiskitBackend):
458 raise ValueError('Backend should be QiskitBackend or subclass.')
459 if not isinstance(problem, Molecule):
460 raise ValueError('The problem for this algorithm should be Molecule problem')
461 if not isinstance(problem.operator.num_qubits, int):
462 raise ValueError('num_qubits from problem operator is expected to be int')
463
464 estimator = backend.estimator
465 self.num_qubits = problem.operator.num_qubits
466 if self.with_numpy:
467 solver = qiskit_algorithms.NumPyMinimumEigensolver()
468 else:
469 solver = qiskit_algorithms.VQE(estimator, self.ansatz, self.optimizer)
470 vqe_gss = GroundStateEigensolver(problem.mapper, solver)
471 vqe_results = vqe_gss.solve(problem.problem)
472 return self.construct_result(vqe_results)
473
[docs]
474 def construct_result(self, result: EigenstateResult) -> Result:
475 energy = result.total_energies[0]
476 # Not the cleanest way
477 return Result(
478 '',
479 energy,
480 '',
481 energy,
482 {'': 1},
483 {'': energy},
484 1,
485 energy,
486 0,
487 None
488 )