1"""Algorithms for Qiskit routines"""
2
3import statistics
4from collections.abc import Iterable
5from datetime import datetime
6from typing import Generic, Literal
7
8import numpy as np
9import qiskit_algorithms
10from qiskit import QuantumCircuit
11from qiskit.circuit.library import PauliEvolutionGate, QAOAAnsatz, efficient_su2
12from qiskit.primitives import BaseEstimatorV1, BaseSamplerV1
13from qiskit.primitives.base.base_primitive import BasePrimitive
14from qiskit.quantum_info import SparsePauliOp
15from qiskit_algorithms import optimizers
16from qiskit_algorithms.minimum_eigensolvers.diagonal_estimator import _evaluate_sparsepauli as evaluate_energy
17from qiskit_nature.second_q.algorithms.ground_state_solvers import GroundStateEigensolver
18from qiskit_nature.second_q.problems import EigenstateResult
19from scipy.optimize import minimize
20
21from qlauncher.base import Algorithm, Result
22from qlauncher.base.base import _Model
23from qlauncher.base.models import Hamiltonian, Molecule
24from qlauncher.routines.qiskit.backends.gate_circuit_backend import GateCircuitBackend
25from qlauncher.routines.qiskit.backends.qiskit_backend import QiskitBackend
26
27
[docs]
28class QiskitOptimizationAlgorithm(Algorithm[_Model, QiskitBackend], Generic[_Model]):
29 """Abstract class for Qiskit optimization algorithms"""
30
[docs]
31 def make_tag(self, problem: _Model, backend: QiskitBackend) -> str:
32 return (
33 problem.__class__.__name__
34 + '-'
35 + backend.__class__.__name__
36 + '-'
37 + self.__class__.__name__
38 + '-'
39 + datetime.today().strftime('%Y-%m-%d')
40 )
41
[docs]
42 def get_processing_times(self, tag: str, primitive: BasePrimitive) -> None | tuple[list, list, int]:
43 timestamps = []
44 usages = []
45 qpu_time = 0
46 if hasattr(primitive, 'session'):
47 jobs = primitive.session.service.jobs(limit=None, job_tags=[tag])
48 for job in jobs:
49 m = job.metrics()
50 timestamps.append(m['timestamps'])
51 usages.append(m['usage'])
52 qpu_time += m['usage']['quantum_seconds']
53 return timestamps, usages, qpu_time
54
55
[docs]
56def commutator(op_a: SparsePauliOp, op_b: SparsePauliOp) -> SparsePauliOp:
57 """Commutator"""
58 return op_a @ op_b - op_b @ op_a
59
60
[docs]
61def cvar_cost(probs_values: Iterable[tuple[float, float]], alpha: float) -> float:
62 """CVar cost function to be used instead of mean for qaoa training"""
63 if not (alpha > 0 and alpha <= 1):
64 raise ValueError('Alpha must be in range (0,1]')
65 cvar, acc = 0.0, 0.0
66 for p, v in sorted(probs_values, key=lambda x: x[1]):
67 if acc >= alpha:
68 break
69 acc += p
70 cvar += v * min(p, alpha - acc)
71 return cvar / alpha
72
73
[docs]
74class QAOA(QiskitOptimizationAlgorithm[Hamiltonian]):
75 """Algorithm class with QAOA.
76
77 Args:
78 p (int): The number of QAOA steps. Defaults to 1.
79 optimizer (Optimizer | None): Optimizer used during algorithm runtime. If set to `None` turns into COBYLA. Defaults to None,
80 alternating_ansatz (bool): Whether to use an alternating ansatz. Defaults to False. If True, it's recommended to provide a mixer_h to alg_kwargs.
81 aux: Auxiliary input for the QAOA algorithm.
82 **alg_kwargs: Additional keyword arguments for the base class.
83
84 Attributes:
85 name (str): The name of the algorithm.
86 aux: Auxiliary input for the QAOA algorithm.
87 p (int): The number of QAOA steps.
88 optimizer (Optimizer): Optimizer used during algorithm runtime.
89 alternating_ansatz (bool): Whether to use an alternating ansatz.
90 parameters (list): List of parameters for the algorithm.
91 mixer_h (SparsePauliOp | None): The mixer Hamiltonian.
92
93 """
94
95 def __init__(
96 self,
97 p: int = 1,
98 optimization_method: Literal['COBYLA'] = 'COBYLA',
99 max_evaluations: int = 100,
100 training_aggregation_method: Literal['mean', 'cvar'] = 'mean',
101 cvar_alpha: float = 1,
102 alternating_ansatz: bool = False,
103 aux=None,
104 **alg_kwargs,
105 ):
106 super().__init__(**alg_kwargs)
107 self.name: str = 'qaoa'
108 self.aux = aux
109 self.p: int = p
110
111 self.optimization_method = optimization_method
112 self.max_evaluations = max_evaluations
113 self.training_aggregation_method = training_aggregation_method
114 self.cvar_alpha = cvar_alpha
115
116 self.alternating_ansatz: bool = alternating_ansatz
117 self.parameters = ['p']
118 self.mixer_h: SparsePauliOp | None = None
119 self.initial_state: QuantumCircuit | None = None
120
121 @property
122 def setup(self) -> dict:
123 return {'aux': self.aux, 'p': self.p, 'parameters': self.parameters, 'arg_kwargs': self.alg_kwargs}
124
125 def _get_optimized_circuit_params(
126 self, circuit: QuantumCircuit, hamiltonian: SparsePauliOp, backend: GateCircuitBackend
127 ) -> tuple[np.ndarray, list[float]]:
128 """
129 Optimize circuit params
130
131 Args:
132 circuit (QuantumCircuit): QAOA circuit to be optimized
133 backend (Backend): Backend containing sampler
134
135 Returns:
136 tuple[QuantumCircuit,list[float]]: Circuit with optimal param values applied, energy history
137 """
138 costs = []
139
140 def cost_fn(params: np.ndarray):
141 results = backend.sample_circuit(circuit.assign_parameters(params))
142 shots = sum(results.values())
143
144 probs_with_costs = {
145 state: (count / shots, np.real(evaluate_energy(int(state, 2), hamiltonian))) for state, count in results.items()
146 }
147
148 cost = (
149 sum(prob * cost for prob, cost in probs_with_costs.values())
150 if self.training_aggregation_method == 'mean'
151 else cvar_cost(probs_with_costs.values(), self.cvar_alpha)
152 )
153 costs.append(cost)
154 return cost
155
156 res = minimize(
157 cost_fn,
158 np.array([np.pi] * (len(circuit.parameters) // 2) + [np.pi / 2] * (len(circuit.parameters) // 2)),
159 method=self.optimization_method,
160 tol=1e-2,
161 options={'maxiter': self.max_evaluations},
162 )
163
164 return res.x, costs
165
[docs]
166 def run(self, problem: Hamiltonian, backend: GateCircuitBackend) -> Result:
167 """Runs the QAOA algorithm"""
168
169 # problem.hamiltonian: SparsePauliOp = formatter(problem)
170
171 if self.alternating_ansatz:
172 if self.mixer_h is None:
173 self.mixer_h = problem.get_mixer_hamiltonian()
174 if self.initial_state is None:
175 self.initial_state = problem.get_QAOAAnsatz_initial_state()
176
177 # Cirq translation issues if we use QAOAAnsatz() by itself without appending it to a QuantumCircuit
178 circuit = QuantumCircuit(problem.hamiltonian.num_qubits)
179 circuit.append(QAOAAnsatz(cost_operator=problem.hamiltonian, reps=self.p).to_instruction(), range(problem.hamiltonian.num_qubits))
180
181 circuit.measure_all()
182
183 opt_params, costs = self._get_optimized_circuit_params(circuit, problem.hamiltonian, backend)
184
185 results = backend.sample_circuit(circuit.assign_parameters(opt_params))
186
187 final_energies = {k: np.real(evaluate_energy(int(k, 2), problem.hamiltonian)) for k in results.keys()}
188
189 depth = circuit.decompose(reps=10).depth()
190 cx_count = circuit.decompose(reps=10).count_ops().get('cx', 0)
191
192 return self.construct_result(
193 {
194 'energy': min(costs),
195 'depth': depth,
196 'cx_count': cx_count,
197 'qpu_time': 0,
198 'training_costs': costs,
199 'final_sample_energies': final_energies,
200 'final_sample_counts': results,
201 'optimal_point': opt_params, # needed for educated_guess
202 }
203 )
204
[docs]
205 def construct_result(self, result: dict) -> Result:
206 counts, energies = result['final_sample_counts'], result['final_sample_energies']
207 num_of_samples = sum(counts.values())
208 average_energy = statistics.mean(energies.values())
209 energy_std = statistics.stdev(energies.values()) if len(energies) > 1 else 0
210
211 best_bs = min(energies, key=energies.get)
212 most_common_bs = max(counts, key=counts.get)
213
214 return Result(
215 best_bitstring=best_bs,
216 best_energy=min(energies.values()),
217 most_common_bitstring=most_common_bs,
218 most_common_bitstring_energy=energies[most_common_bs],
219 distribution={k: v / num_of_samples for k, v in counts.items()},
220 energies=energies,
221 num_of_samples=num_of_samples,
222 average_energy=average_energy,
223 energy_std=energy_std,
224 result=result,
225 )
226
227
[docs]
228class FALQON(QiskitOptimizationAlgorithm[Hamiltonian]):
229 """
230 Algorithm class with FALQON.
231
232 Args:
233 driver_h (Operator | None): The driver Hamiltonian for the problem.
234 delta_t (float): The time step for the evolution operators.
235 beta_0 (float): The initial value of beta.
236 n (int): The number of iterations to run the algorithm.
237 **alg_kwargs: Additional keyword arguments for the base class.
238
239 Attributes:
240 driver_h (Operator | None): The driver Hamiltonian for the problem.
241 delta_t (float): The time step for the evolution operators.
242 beta_0 (float): The initial value of beta.
243 n (int): The number of iterations to run the algorithm.
244 cost_h (Operator | None): The cost Hamiltonian for the problem.
245 n_qubits (int): The number of qubits in the problem.
246 parameters (list[str]): The list of algorithm parameters.
247
248 """
249
250 _algorithm_format = 'hamiltonian'
251
252 def __init__(self, driver_h: SparsePauliOp | None = None, delta_t: float = 0.03, beta_0: float = 0.0, max_reps: int = 20) -> None:
253 super().__init__()
254 self.driver_h = driver_h
255 self.cost_h = None
256 self.delta_t = delta_t
257 self.beta_0 = beta_0
258 self.max_reps = max_reps
259 self.n_qubits: int = 0
260 self.parameters = ['n', 'delta_t', 'beta_0']
261
262 @property
263 def setup(self) -> dict:
264 return {
265 'driver_h': self.driver_h,
266 'delta_t': self.delta_t,
267 'beta_0': self.beta_0,
268 'n': self.max_reps,
269 'cost_h': self.cost_h,
270 'n_qubits': self.n_qubits,
271 'parameters': self.parameters,
272 'arg_kwargs': self.alg_kwargs,
273 }
274
275 def _get_path(self) -> str:
276 return f'{self.name}@{self.max_reps}@{self.delta_t}@{self.beta_0}'
277
[docs]
278 def run(self, problem: Hamiltonian, backend: QiskitBackend) -> Result:
279 """Runs the FALQON algorithm"""
280
281 if isinstance(backend.sampler, BaseSamplerV1) or isinstance(backend.estimator, BaseEstimatorV1):
282 raise ValueError('FALQON works only on V2 samplers and estimators, consider using a different backend.')
283
284 if problem.hamiltonian is None:
285 raise ValueError('Formatter returned None')
286
287 problem.hamiltonian.num_qubits
288
289 self.n_qubits = problem.hamiltonian.num_qubits
290
291 best_sample, betas, energies, depths, cnot_counts = self._falqon_subroutine(problem.hamiltonian, backend)
292
293 shots = sum(best_sample.values())
294
295 result = {
296 'betas': betas,
297 'energies': energies,
298 'depths': depths,
299 'cxs': cnot_counts,
300 'n': self.max_reps,
301 'delta_t': self.delta_t,
302 'beta_0': self.beta_0,
303 'energy': min(energies),
304 }
305
306 return Result(
307 best_bitstring=max(best_sample, key=lambda x: best_sample.get(x, 0)),
308 most_common_bitstring=max(best_sample, key=lambda x: best_sample.get(x, 0)),
309 distribution={k: v / shots for k, v in best_sample.items()},
310 energies=energies,
311 energy_std=float(np.std(energies)),
312 best_energy=min(energies),
313 num_of_samples=shots,
314 average_energy=float(np.mean(energies)),
315 most_common_bitstring_energy=0,
316 result=result,
317 )
318
319 def _add_ansatz_part(
320 self, cost_hamiltonian: SparsePauliOp, driver_hamiltonian: SparsePauliOp, beta: float, circuit: QuantumCircuit
321 ) -> None:
322 """Adds a single FALQON ansatz 'building block' with the specified beta to the circuit"""
323 circ_part = QuantumCircuit(circuit.num_qubits)
324
325 circ_part.append(PauliEvolutionGate(cost_hamiltonian, time=self.delta_t), circ_part.qubits)
326 circ_part.append(PauliEvolutionGate(driver_hamiltonian, time=self.delta_t * beta), circ_part.qubits)
327
328 circuit.compose(circ_part, circ_part.qubits, inplace=True)
329
330 def _build_ansatz(self, cost_hamiltonian, driver_hamiltonian, betas):
331 """Build the FALQON circuit for the given betas"""
332
333 circ = QuantumCircuit(self.n_qubits)
334 circ.h(range(self.n_qubits))
335
336 for beta in betas:
337 circ.append(PauliEvolutionGate(cost_hamiltonian, time=self.delta_t), circ.qubits)
338 circ.append(PauliEvolutionGate(driver_hamiltonian, time=self.delta_t * beta), circ.qubits)
339 return circ
340
341 def _falqon_subroutine(
342 self, cost_hamiltonian: SparsePauliOp, backend: QiskitBackend
343 ) -> tuple[dict[str, int], list[float], list[float], list[int], list[int]]:
344 """
345 Run the 'meat' of the algorithm.
346
347 Args:
348 cost_hamiltonian (SparsePauliOp): Cost hamiltonian from the formatter.
349 backend (QiskitBackend): Backend
350
351 Returns:
352 tuple[PrimitiveResult[SamplerPubResult], list[float], list[float], list[int], list[int]]:
353 Sampler result from best betas, list of betas, list of energies, list of depths, list of cnot counts
354 """
355
356 if self.driver_h is None:
357 driver_hamiltonian = SparsePauliOp.from_sparse_list([('X', [i], 1) for i in range(self.n_qubits)], num_qubits=self.n_qubits)
358 else:
359 driver_hamiltonian = self.driver_h
360
361 hamiltonian_commutator = complex(0, 1) * commutator(driver_hamiltonian, cost_hamiltonian)
362
363 betas = [self.beta_0]
364 energies = []
365 cnot_counts = []
366 circuit_depths = []
367
368 circuit = QuantumCircuit(self.n_qubits)
369 circuit.h(circuit.qubits)
370
371 self._add_ansatz_part(cost_hamiltonian, driver_hamiltonian, self.beta_0, circuit)
372
373 for _ in range(self.max_reps):
374 beta = -1 * backend.estimate_energy(circuit, hamiltonian_commutator)
375 betas.append(beta)
376
377 self._add_ansatz_part(cost_hamiltonian, driver_hamiltonian, beta, circuit)
378
379 energy = backend.estimate_energy(circuit, cost_hamiltonian)
380 # print(i, energy)
381 energies.append(energy)
382 circuit_depths.append(circuit.depth())
383 cnot_counts.append(circuit.count_ops().get('cx', 0))
384
385 argmin = np.argmin(np.asarray(energies))
386
387 sampling_circuit = self._build_ansatz(cost_hamiltonian, driver_hamiltonian, betas[:argmin])
388 sampling_circuit.measure_all()
389
390 best_sample = backend.sample_circuit(sampling_circuit)
391
392 return best_sample, betas, energies, circuit_depths, cnot_counts
393
394
[docs]
395class VQE(QiskitOptimizationAlgorithm[Molecule]):
396 """Variational Quantum EigenSolver - qiskit-algorithm implementation wrapper.
397
398 Args:
399 optimizer (optimizers.Optimizer | None, optional): Optimizer for VQE. Defaults to None.
400 ansatz (QuantumCircuit | None, optional): VQE's ansatz. Defaults to None.
401 with_numpy (bool, optional): Ignores ansatz parameter and backend, and changes solver to Numpy based. Defaults to False.
402 """
403
404 # pip install git+https://github.com/qiskit-community/qiskit-nature.git
405 # pyscf
406
407 def __init__(
408 self, optimizer: optimizers.Optimizer | None = None, ansatz: QuantumCircuit | None = None, with_numpy: bool = False
409 ) -> None:
410 self.optimizer = optimizers.COBYLA() if optimizer is None else optimizer
411 self.ansatz = ansatz
412 self.num_qubits: int = 0
413 self.with_numpy: bool = with_numpy
414 super().__init__()
415
416 @property
417 def ansatz(self) -> QuantumCircuit:
418 if self._ansatz is None:
419 return efficient_su2(self.num_qubits)
420 return self._ansatz
421
422 @ansatz.setter
423 def ansatz(self, custom_ansatz) -> None:
424 self._ansatz = custom_ansatz
425
[docs]
426 def run(self, problem: Molecule, backend: QiskitBackend) -> Result:
427 if not isinstance(problem.operator.num_qubits, int):
428 raise ValueError('num_qubits from problem operator is expected to be int')
429
430 estimator = backend.estimator
431 self.num_qubits = problem.operator.num_qubits
432 if self.with_numpy:
433 solver = qiskit_algorithms.NumPyMinimumEigensolver()
434 else:
435 solver = qiskit_algorithms.VQE(estimator, self.ansatz, self.optimizer)
436 vqe_gss = GroundStateEigensolver(problem.mapper, solver)
437 vqe_results = vqe_gss.solve(problem.problem)
438 return self.construct_result(vqe_results)
439
[docs]
440 def construct_result(self, result: EigenstateResult) -> Result:
441 energy = result.total_energies[0]
442 # Not the cleanest way
443 return Result('', energy, '', energy, {'': 1}, {'': energy}, 1, energy, 0, None)