Source code for qlauncher.routines.qiskit.algorithms.qiskit_native

  1"""Algorithms for Qiskit routines"""
  2
  3import statistics
  4from collections.abc import Iterable
  5from datetime import datetime
  6from typing import Generic, Literal
  7
  8import numpy as np
  9import qiskit_algorithms
 10from qiskit import QuantumCircuit
 11from qiskit.circuit.library import PauliEvolutionGate, QAOAAnsatz, efficient_su2
 12from qiskit.primitives import BaseEstimatorV1, BaseSamplerV1
 13from qiskit.primitives.base.base_primitive import BasePrimitive
 14from qiskit.quantum_info import SparsePauliOp
 15from qiskit_algorithms import optimizers
 16from qiskit_algorithms.minimum_eigensolvers.diagonal_estimator import _evaluate_sparsepauli as evaluate_energy
 17from qiskit_nature.second_q.algorithms.ground_state_solvers import GroundStateEigensolver
 18from qiskit_nature.second_q.problems import EigenstateResult
 19from scipy.optimize import minimize
 20
 21from qlauncher.base import Algorithm, Result
 22from qlauncher.base.base import _Model
 23from qlauncher.base.models import Hamiltonian, Molecule
 24from qlauncher.routines.qiskit.backends.gate_circuit_backend import GateCircuitBackend
 25from qlauncher.routines.qiskit.backends.qiskit_backend import QiskitBackend
 26
 27
[docs] 28class QiskitOptimizationAlgorithm(Algorithm[_Model, QiskitBackend], Generic[_Model]): 29 """Abstract class for Qiskit optimization algorithms""" 30
[docs] 31 def make_tag(self, problem: _Model, backend: QiskitBackend) -> str: 32 return ( 33 problem.__class__.__name__ 34 + '-' 35 + backend.__class__.__name__ 36 + '-' 37 + self.__class__.__name__ 38 + '-' 39 + datetime.today().strftime('%Y-%m-%d') 40 )
41
[docs] 42 def get_processing_times(self, tag: str, primitive: BasePrimitive) -> None | tuple[list, list, int]: 43 timestamps = [] 44 usages = [] 45 qpu_time = 0 46 if hasattr(primitive, 'session'): 47 jobs = primitive.session.service.jobs(limit=None, job_tags=[tag]) 48 for job in jobs: 49 m = job.metrics() 50 timestamps.append(m['timestamps']) 51 usages.append(m['usage']) 52 qpu_time += m['usage']['quantum_seconds'] 53 return timestamps, usages, qpu_time
54 55
[docs] 56def commutator(op_a: SparsePauliOp, op_b: SparsePauliOp) -> SparsePauliOp: 57 """Commutator""" 58 return op_a @ op_b - op_b @ op_a
59 60
[docs] 61def cvar_cost(probs_values: Iterable[tuple[float, float]], alpha: float) -> float: 62 """CVar cost function to be used instead of mean for qaoa training""" 63 if not (alpha > 0 and alpha <= 1): 64 raise ValueError('Alpha must be in range (0,1]') 65 cvar, acc = 0.0, 0.0 66 for p, v in sorted(probs_values, key=lambda x: x[1]): 67 if acc >= alpha: 68 break 69 acc += p 70 cvar += v * min(p, alpha - acc) 71 return cvar / alpha
72 73
[docs] 74class QAOA(QiskitOptimizationAlgorithm[Hamiltonian]): 75 """Algorithm class with QAOA. 76 77 Args: 78 p (int): The number of QAOA steps. Defaults to 1. 79 optimizer (Optimizer | None): Optimizer used during algorithm runtime. If set to `None` turns into COBYLA. Defaults to None, 80 alternating_ansatz (bool): Whether to use an alternating ansatz. Defaults to False. If True, it's recommended to provide a mixer_h to alg_kwargs. 81 aux: Auxiliary input for the QAOA algorithm. 82 **alg_kwargs: Additional keyword arguments for the base class. 83 84 Attributes: 85 name (str): The name of the algorithm. 86 aux: Auxiliary input for the QAOA algorithm. 87 p (int): The number of QAOA steps. 88 optimizer (Optimizer): Optimizer used during algorithm runtime. 89 alternating_ansatz (bool): Whether to use an alternating ansatz. 90 parameters (list): List of parameters for the algorithm. 91 mixer_h (SparsePauliOp | None): The mixer Hamiltonian. 92 93 """ 94 95 def __init__( 96 self, 97 p: int = 1, 98 optimization_method: Literal['COBYLA'] = 'COBYLA', 99 max_evaluations: int = 100, 100 training_aggregation_method: Literal['mean', 'cvar'] = 'mean', 101 cvar_alpha: float = 1, 102 alternating_ansatz: bool = False, 103 aux=None, 104 **alg_kwargs, 105 ): 106 super().__init__(**alg_kwargs) 107 self.name: str = 'qaoa' 108 self.aux = aux 109 self.p: int = p 110 111 self.optimization_method = optimization_method 112 self.max_evaluations = max_evaluations 113 self.training_aggregation_method = training_aggregation_method 114 self.cvar_alpha = cvar_alpha 115 116 self.alternating_ansatz: bool = alternating_ansatz 117 self.parameters = ['p'] 118 self.mixer_h: SparsePauliOp | None = None 119 self.initial_state: QuantumCircuit | None = None 120 121 @property 122 def setup(self) -> dict: 123 return {'aux': self.aux, 'p': self.p, 'parameters': self.parameters, 'arg_kwargs': self.alg_kwargs} 124 125 def _get_optimized_circuit_params( 126 self, circuit: QuantumCircuit, hamiltonian: SparsePauliOp, backend: GateCircuitBackend 127 ) -> tuple[np.ndarray, list[float]]: 128 """ 129 Optimize circuit params 130 131 Args: 132 circuit (QuantumCircuit): QAOA circuit to be optimized 133 backend (Backend): Backend containing sampler 134 135 Returns: 136 tuple[QuantumCircuit,list[float]]: Circuit with optimal param values applied, energy history 137 """ 138 costs = [] 139 140 def cost_fn(params: np.ndarray): 141 results = backend.sample_circuit(circuit.assign_parameters(params)) 142 shots = sum(results.values()) 143 144 probs_with_costs = { 145 state: (count / shots, np.real(evaluate_energy(int(state, 2), hamiltonian))) for state, count in results.items() 146 } 147 148 cost = ( 149 sum(prob * cost for prob, cost in probs_with_costs.values()) 150 if self.training_aggregation_method == 'mean' 151 else cvar_cost(probs_with_costs.values(), self.cvar_alpha) 152 ) 153 costs.append(cost) 154 return cost 155 156 res = minimize( 157 cost_fn, 158 np.array([np.pi] * (len(circuit.parameters) // 2) + [np.pi / 2] * (len(circuit.parameters) // 2)), 159 method=self.optimization_method, 160 tol=1e-2, 161 options={'maxiter': self.max_evaluations}, 162 ) 163 164 return res.x, costs 165
[docs] 166 def run(self, problem: Hamiltonian, backend: GateCircuitBackend) -> Result: 167 """Runs the QAOA algorithm""" 168 169 # problem.hamiltonian: SparsePauliOp = formatter(problem) 170 171 if self.alternating_ansatz: 172 if self.mixer_h is None: 173 self.mixer_h = problem.get_mixer_hamiltonian() 174 if self.initial_state is None: 175 self.initial_state = problem.get_QAOAAnsatz_initial_state() 176 177 # Cirq translation issues if we use QAOAAnsatz() by itself without appending it to a QuantumCircuit 178 circuit = QuantumCircuit(problem.hamiltonian.num_qubits) 179 circuit.append(QAOAAnsatz(cost_operator=problem.hamiltonian, reps=self.p).to_instruction(), range(problem.hamiltonian.num_qubits)) 180 181 circuit.measure_all() 182 183 opt_params, costs = self._get_optimized_circuit_params(circuit, problem.hamiltonian, backend) 184 185 results = backend.sample_circuit(circuit.assign_parameters(opt_params)) 186 187 final_energies = {k: np.real(evaluate_energy(int(k, 2), problem.hamiltonian)) for k in results.keys()} 188 189 depth = circuit.decompose(reps=10).depth() 190 cx_count = circuit.decompose(reps=10).count_ops().get('cx', 0) 191 192 return self.construct_result( 193 { 194 'energy': min(costs), 195 'depth': depth, 196 'cx_count': cx_count, 197 'qpu_time': 0, 198 'training_costs': costs, 199 'final_sample_energies': final_energies, 200 'final_sample_counts': results, 201 'optimal_point': opt_params, # needed for educated_guess 202 } 203 )
204
[docs] 205 def construct_result(self, result: dict) -> Result: 206 counts, energies = result['final_sample_counts'], result['final_sample_energies'] 207 num_of_samples = sum(counts.values()) 208 average_energy = statistics.mean(energies.values()) 209 energy_std = statistics.stdev(energies.values()) if len(energies) > 1 else 0 210 211 best_bs = min(energies, key=energies.get) 212 most_common_bs = max(counts, key=counts.get) 213 214 return Result( 215 best_bitstring=best_bs, 216 best_energy=min(energies.values()), 217 most_common_bitstring=most_common_bs, 218 most_common_bitstring_energy=energies[most_common_bs], 219 distribution={k: v / num_of_samples for k, v in counts.items()}, 220 energies=energies, 221 num_of_samples=num_of_samples, 222 average_energy=average_energy, 223 energy_std=energy_std, 224 result=result, 225 )
226 227
[docs] 228class FALQON(QiskitOptimizationAlgorithm[Hamiltonian]): 229 """ 230 Algorithm class with FALQON. 231 232 Args: 233 driver_h (Operator | None): The driver Hamiltonian for the problem. 234 delta_t (float): The time step for the evolution operators. 235 beta_0 (float): The initial value of beta. 236 n (int): The number of iterations to run the algorithm. 237 **alg_kwargs: Additional keyword arguments for the base class. 238 239 Attributes: 240 driver_h (Operator | None): The driver Hamiltonian for the problem. 241 delta_t (float): The time step for the evolution operators. 242 beta_0 (float): The initial value of beta. 243 n (int): The number of iterations to run the algorithm. 244 cost_h (Operator | None): The cost Hamiltonian for the problem. 245 n_qubits (int): The number of qubits in the problem. 246 parameters (list[str]): The list of algorithm parameters. 247 248 """ 249 250 _algorithm_format = 'hamiltonian' 251 252 def __init__(self, driver_h: SparsePauliOp | None = None, delta_t: float = 0.03, beta_0: float = 0.0, max_reps: int = 20) -> None: 253 super().__init__() 254 self.driver_h = driver_h 255 self.cost_h = None 256 self.delta_t = delta_t 257 self.beta_0 = beta_0 258 self.max_reps = max_reps 259 self.n_qubits: int = 0 260 self.parameters = ['n', 'delta_t', 'beta_0'] 261 262 @property 263 def setup(self) -> dict: 264 return { 265 'driver_h': self.driver_h, 266 'delta_t': self.delta_t, 267 'beta_0': self.beta_0, 268 'n': self.max_reps, 269 'cost_h': self.cost_h, 270 'n_qubits': self.n_qubits, 271 'parameters': self.parameters, 272 'arg_kwargs': self.alg_kwargs, 273 } 274 275 def _get_path(self) -> str: 276 return f'{self.name}@{self.max_reps}@{self.delta_t}@{self.beta_0}' 277
[docs] 278 def run(self, problem: Hamiltonian, backend: QiskitBackend) -> Result: 279 """Runs the FALQON algorithm""" 280 281 if isinstance(backend.sampler, BaseSamplerV1) or isinstance(backend.estimator, BaseEstimatorV1): 282 raise ValueError('FALQON works only on V2 samplers and estimators, consider using a different backend.') 283 284 if problem.hamiltonian is None: 285 raise ValueError('Formatter returned None') 286 287 problem.hamiltonian.num_qubits 288 289 self.n_qubits = problem.hamiltonian.num_qubits 290 291 best_sample, betas, energies, depths, cnot_counts = self._falqon_subroutine(problem.hamiltonian, backend) 292 293 shots = sum(best_sample.values()) 294 295 result = { 296 'betas': betas, 297 'energies': energies, 298 'depths': depths, 299 'cxs': cnot_counts, 300 'n': self.max_reps, 301 'delta_t': self.delta_t, 302 'beta_0': self.beta_0, 303 'energy': min(energies), 304 } 305 306 return Result( 307 best_bitstring=max(best_sample, key=lambda x: best_sample.get(x, 0)), 308 most_common_bitstring=max(best_sample, key=lambda x: best_sample.get(x, 0)), 309 distribution={k: v / shots for k, v in best_sample.items()}, 310 energies=energies, 311 energy_std=float(np.std(energies)), 312 best_energy=min(energies), 313 num_of_samples=shots, 314 average_energy=float(np.mean(energies)), 315 most_common_bitstring_energy=0, 316 result=result, 317 )
318 319 def _add_ansatz_part( 320 self, cost_hamiltonian: SparsePauliOp, driver_hamiltonian: SparsePauliOp, beta: float, circuit: QuantumCircuit 321 ) -> None: 322 """Adds a single FALQON ansatz 'building block' with the specified beta to the circuit""" 323 circ_part = QuantumCircuit(circuit.num_qubits) 324 325 circ_part.append(PauliEvolutionGate(cost_hamiltonian, time=self.delta_t), circ_part.qubits) 326 circ_part.append(PauliEvolutionGate(driver_hamiltonian, time=self.delta_t * beta), circ_part.qubits) 327 328 circuit.compose(circ_part, circ_part.qubits, inplace=True) 329 330 def _build_ansatz(self, cost_hamiltonian, driver_hamiltonian, betas): 331 """Build the FALQON circuit for the given betas""" 332 333 circ = QuantumCircuit(self.n_qubits) 334 circ.h(range(self.n_qubits)) 335 336 for beta in betas: 337 circ.append(PauliEvolutionGate(cost_hamiltonian, time=self.delta_t), circ.qubits) 338 circ.append(PauliEvolutionGate(driver_hamiltonian, time=self.delta_t * beta), circ.qubits) 339 return circ 340 341 def _falqon_subroutine( 342 self, cost_hamiltonian: SparsePauliOp, backend: QiskitBackend 343 ) -> tuple[dict[str, int], list[float], list[float], list[int], list[int]]: 344 """ 345 Run the 'meat' of the algorithm. 346 347 Args: 348 cost_hamiltonian (SparsePauliOp): Cost hamiltonian from the formatter. 349 backend (QiskitBackend): Backend 350 351 Returns: 352 tuple[PrimitiveResult[SamplerPubResult], list[float], list[float], list[int], list[int]]: 353 Sampler result from best betas, list of betas, list of energies, list of depths, list of cnot counts 354 """ 355 356 if self.driver_h is None: 357 driver_hamiltonian = SparsePauliOp.from_sparse_list([('X', [i], 1) for i in range(self.n_qubits)], num_qubits=self.n_qubits) 358 else: 359 driver_hamiltonian = self.driver_h 360 361 hamiltonian_commutator = complex(0, 1) * commutator(driver_hamiltonian, cost_hamiltonian) 362 363 betas = [self.beta_0] 364 energies = [] 365 cnot_counts = [] 366 circuit_depths = [] 367 368 circuit = QuantumCircuit(self.n_qubits) 369 circuit.h(circuit.qubits) 370 371 self._add_ansatz_part(cost_hamiltonian, driver_hamiltonian, self.beta_0, circuit) 372 373 for _ in range(self.max_reps): 374 beta = -1 * backend.estimate_energy(circuit, hamiltonian_commutator) 375 betas.append(beta) 376 377 self._add_ansatz_part(cost_hamiltonian, driver_hamiltonian, beta, circuit) 378 379 energy = backend.estimate_energy(circuit, cost_hamiltonian) 380 # print(i, energy) 381 energies.append(energy) 382 circuit_depths.append(circuit.depth()) 383 cnot_counts.append(circuit.count_ops().get('cx', 0)) 384 385 argmin = np.argmin(np.asarray(energies)) 386 387 sampling_circuit = self._build_ansatz(cost_hamiltonian, driver_hamiltonian, betas[:argmin]) 388 sampling_circuit.measure_all() 389 390 best_sample = backend.sample_circuit(sampling_circuit) 391 392 return best_sample, betas, energies, circuit_depths, cnot_counts
393 394
[docs] 395class VQE(QiskitOptimizationAlgorithm[Molecule]): 396 """Variational Quantum EigenSolver - qiskit-algorithm implementation wrapper. 397 398 Args: 399 optimizer (optimizers.Optimizer | None, optional): Optimizer for VQE. Defaults to None. 400 ansatz (QuantumCircuit | None, optional): VQE's ansatz. Defaults to None. 401 with_numpy (bool, optional): Ignores ansatz parameter and backend, and changes solver to Numpy based. Defaults to False. 402 """ 403 404 # pip install git+https://github.com/qiskit-community/qiskit-nature.git 405 # pyscf 406 407 def __init__( 408 self, optimizer: optimizers.Optimizer | None = None, ansatz: QuantumCircuit | None = None, with_numpy: bool = False 409 ) -> None: 410 self.optimizer = optimizers.COBYLA() if optimizer is None else optimizer 411 self.ansatz = ansatz 412 self.num_qubits: int = 0 413 self.with_numpy: bool = with_numpy 414 super().__init__() 415 416 @property 417 def ansatz(self) -> QuantumCircuit: 418 if self._ansatz is None: 419 return efficient_su2(self.num_qubits) 420 return self._ansatz 421 422 @ansatz.setter 423 def ansatz(self, custom_ansatz) -> None: 424 self._ansatz = custom_ansatz 425
[docs] 426 def run(self, problem: Molecule, backend: QiskitBackend) -> Result: 427 if not isinstance(problem.operator.num_qubits, int): 428 raise ValueError('num_qubits from problem operator is expected to be int') 429 430 estimator = backend.estimator 431 self.num_qubits = problem.operator.num_qubits 432 if self.with_numpy: 433 solver = qiskit_algorithms.NumPyMinimumEigensolver() 434 else: 435 solver = qiskit_algorithms.VQE(estimator, self.ansatz, self.optimizer) 436 vqe_gss = GroundStateEigensolver(problem.mapper, solver) 437 vqe_results = vqe_gss.solve(problem.problem) 438 return self.construct_result(vqe_results)
439
[docs] 440 def construct_result(self, result: EigenstateResult) -> Result: 441 energy = result.total_energies[0] 442 # Not the cleanest way 443 return Result('', energy, '', energy, {'': 1}, {'': energy}, 1, energy, 0, None)