Source code for qlauncher.routines.qiskit.algorithms.qiskit_native

  1""" Algorithms for Qiskit routines """
  2from datetime import datetime
  3from collections.abc import Callable, Iterable
  4from typing import Any, Literal
  5import statistics
  6import numpy as np
  7from scipy.optimize import minimize
  8
  9import qiskit_algorithms
 10from qiskit_algorithms import optimizers
 11from qiskit_algorithms.minimum_eigensolvers.diagonal_estimator import _evaluate_sparsepauli as evaluate_energy
 12
 13from qiskit import QuantumCircuit
 14from qiskit.circuit.library import PauliEvolutionGate, QAOAAnsatz, efficient_su2
 15
 16from qiskit.primitives import PrimitiveResult, SamplerPubResult, BaseSamplerV1, BaseEstimatorV1
 17from qiskit.primitives.containers import BitArray
 18from qiskit.primitives.base.base_primitive import BasePrimitive
 19
 20from qiskit.quantum_info import SparsePauliOp
 21
 22from qiskit_nature.second_q.algorithms.ground_state_solvers import GroundStateEigensolver
 23from qiskit_nature.second_q.problems import EigenstateResult
 24
 25from qlauncher.base import Problem, Algorithm, Result
 26from qlauncher.base.base import Backend
 27from qlauncher.routines.cirq import CirqBackend
 28from qlauncher.routines.qiskit.backends.qiskit_backend import QiskitBackend
 29from qlauncher.problems import Molecule
 30
 31
[docs] 32class QiskitOptimizationAlgorithm(Algorithm): 33 """ Abstract class for Qiskit optimization algorithms """ 34
[docs] 35 def make_tag(self, problem: Problem, backend: QiskitBackend) -> str: 36 tag = problem.__class__.__name__ + '-' + \ 37 backend.__class__.__name__ + '-' + \ 38 self.__class__.__name__ + '-' + \ 39 datetime.today().strftime('%Y-%m-%d') 40 return tag
41
[docs] 42 def get_processing_times(self, tag: str, primitive: BasePrimitive) -> None | tuple[list, list, int]: 43 timestamps = [] 44 usages = [] 45 qpu_time = 0 46 if hasattr(primitive, 'session'): 47 jobs = primitive.session.service.jobs(limit=None, job_tags=[tag]) 48 for job in jobs: 49 m = job.metrics() 50 timestamps.append(m['timestamps']) 51 usages.append(m['usage']) 52 qpu_time += m['usage']['quantum_seconds'] 53 return timestamps, usages, qpu_time
54 55
[docs] 56def commutator(op_a: SparsePauliOp, op_b: SparsePauliOp) -> SparsePauliOp: 57 """ Commutator """ 58 return op_a @ op_b - op_b @ op_a
59 60
[docs] 61def int_to_bitstring(number: int, total_bits: int): 62 return np.binary_repr(number, total_bits)[::-1]
63 64
[docs] 65def cvar_cost(probs_values: Iterable[tuple[float, float]], alpha: float): 66 """CVar cost function to be used instead of mean for qaoa training""" 67 if not (alpha > 0 and alpha <= 1): 68 raise ValueError("Alpha must be in range (0,1]") 69 cvar, acc = 0.0, 0.0 70 for p, v in sorted(probs_values, key=lambda x: x[1]): 71 if acc >= alpha: 72 break 73 acc += p 74 cvar += v * min(p, alpha-acc) 75 return cvar / alpha
76 77
[docs] 78class QAOA(QiskitOptimizationAlgorithm): 79 """Algorithm class with QAOA. 80 81 Args: 82 p (int): The number of QAOA steps. Defaults to 1. 83 optimizer (Optimizer | None): Optimizer used during algorithm runtime. If set to `None` turns into COBYLA. Defaults to None, 84 alternating_ansatz (bool): Whether to use an alternating ansatz. Defaults to False. If True, it's recommended to provide a mixer_h to alg_kwargs. 85 aux: Auxiliary input for the QAOA algorithm. 86 **alg_kwargs: Additional keyword arguments for the base class. 87 88 Attributes: 89 name (str): The name of the algorithm. 90 aux: Auxiliary input for the QAOA algorithm. 91 p (int): The number of QAOA steps. 92 optimizer (Optimizer): Optimizer used during algorithm runtime. 93 alternating_ansatz (bool): Whether to use an alternating ansatz. 94 parameters (list): List of parameters for the algorithm. 95 mixer_h (SparsePauliOp | None): The mixer Hamiltonian. 96 97 """ 98 _algorithm_format = 'hamiltonian' 99 100 def __init__( 101 self, 102 p: int = 1, 103 optimization_method: Literal['COBYLA'] = "COBYLA", 104 max_evaluations: int = 100, 105 training_aggregation_method: Literal['mean', 'cvar'] = 'mean', 106 cvar_alpha: float = 1, 107 alternating_ansatz: bool = False, 108 aux=None, 109 **alg_kwargs): 110 super().__init__(**alg_kwargs) 111 self.name: str = 'qaoa' 112 self.aux = aux 113 self.p: int = p 114 115 self.optimization_method = optimization_method 116 self.max_evaluations = max_evaluations 117 self.training_aggregation_method = training_aggregation_method 118 self.cvar_alpha = cvar_alpha 119 120 self.alternating_ansatz: bool = alternating_ansatz 121 self.parameters = ['p'] 122 self.mixer_h: SparsePauliOp | None = None 123 self.initial_state: QuantumCircuit | None = None 124 125 @property 126 def setup(self) -> dict: 127 return { 128 'aux': self.aux, 129 'p': self.p, 130 'parameters': self.parameters, 131 'arg_kwargs': self.alg_kwargs 132 } 133 134 def _get_optimized_circuit_params(self, circuit: QuantumCircuit, hamiltonian: SparsePauliOp, backend: QiskitBackend | CirqBackend) -> tuple[np.ndarray, list[float]]: 135 """ 136 Optimize circuit params 137 138 Args: 139 circuit (QuantumCircuit): QAOA circuit to be optimized 140 backend (Backend): Backend containing sampler 141 142 Returns: 143 tuple[QuantumCircuit,list[float]]: Circuit with optimal param values applied, energy history 144 """ 145 costs = [] 146 147 def cost_fn(params: np.ndarray): 148 job = backend.sampler.run([(circuit, params)]) 149 results = job.result()[0].data.meas.get_int_counts() 150 shots = sum(results.values()) 151 152 probs_with_costs = {state: (count/shots, np.real(evaluate_energy(state, hamiltonian))) 153 for state, count in results.items()} 154 155 cost = (sum(prob*cost for prob, cost in probs_with_costs.values()) 156 if self.training_aggregation_method == 'mean' else 157 cvar_cost(probs_with_costs.values(), self.cvar_alpha)) 158 costs.append(cost) 159 return cost 160 161 res = minimize( 162 cost_fn, 163 np.array([np.pi]*(len(circuit.parameters)//2) + [np.pi / 2]*(len(circuit.parameters)//2)), 164 method=self.optimization_method, 165 tol=1e-2, 166 options={ 167 'maxiter': self.max_evaluations 168 } 169 ) 170 171 return res.x, costs 172
[docs] 173 def run(self, problem: Problem, backend: Backend, formatter: Callable) -> Result: 174 """ Runs the QAOA algorithm """ 175 176 if not (isinstance(backend, QiskitBackend) or isinstance(backend, CirqBackend)): 177 raise ValueError('Backend should be CirqBackend, QiskitBackend or subclass.') 178 179 hamiltonian: SparsePauliOp = formatter(problem) 180 181 if self.alternating_ansatz: 182 if self.mixer_h is None: 183 self.mixer_h = formatter.get_mixer_hamiltonian(problem) 184 if self.initial_state is None: 185 self.initial_state = formatter.get_QAOAAnsatz_initial_state( 186 problem) 187 188 # Cirq translation issues if we use QAOAAnsatz() by itself without appending it to a QuantumCircuit 189 circuit = QuantumCircuit(hamiltonian.num_qubits) 190 circuit.append(QAOAAnsatz(cost_operator=hamiltonian, reps=self.p).to_instruction(), range(hamiltonian.num_qubits)) 191 192 circuit.measure_all() 193 194 opt_params, costs = self._get_optimized_circuit_params(circuit, hamiltonian, backend) 195 196 job = backend.sampler.run([(circuit, opt_params)]) 197 results = job.result()[0].data.meas.get_int_counts() 198 199 final_energies = { 200 int_to_bitstring(k, circuit.num_qubits): np.real(evaluate_energy(k, hamiltonian)) 201 for k in results.keys() 202 } 203 final_counts = {int_to_bitstring(k, circuit.num_qubits): v for k, v in results.items()} 204 205 depth = circuit.decompose(reps=10).depth() 206 if 'cx' in circuit.decompose(reps=10).count_ops(): 207 cx_count = circuit.decompose(reps=10).count_ops()['cx'] 208 else: 209 cx_count = 0 210 211 return self.construct_result( 212 { 213 'energy': min(costs), 214 'depth': depth, 215 'cx_count': cx_count, 216 'qpu_time': 0, 217 'training_costs': costs, 218 'final_sample_energies': final_energies, 219 'final_sample_counts': final_counts, 220 'optimal_point': opt_params # needed for educated_guess 221 })
222
[docs] 223 def construct_result(self, result: dict) -> Result: 224 counts, energies = result['final_sample_counts'], result['final_sample_energies'] 225 num_of_samples = sum(counts.values()) 226 average_energy = statistics.mean(energies.values()) 227 energy_std = statistics.stdev(energies.values()) if len(energies) > 1 else 0 228 229 best_bs = min(energies, key=energies.get) 230 most_common_bs = max(counts, key=counts.get) 231 232 return Result( 233 best_bitstring=best_bs, 234 best_energy=min(energies.values()), 235 most_common_bitstring=most_common_bs, 236 most_common_bitstring_energy=energies[most_common_bs], 237 distribution={k: v/num_of_samples for k, v in counts.items()}, 238 energies=energies, 239 num_of_samples=num_of_samples, 240 average_energy=average_energy, 241 energy_std=energy_std, 242 result=result 243 )
244 245
[docs] 246class FALQON(QiskitOptimizationAlgorithm): 247 """ 248 Algorithm class with FALQON. 249 250 Args: 251 driver_h (Operator | None): The driver Hamiltonian for the problem. 252 delta_t (float): The time step for the evolution operators. 253 beta_0 (float): The initial value of beta. 254 n (int): The number of iterations to run the algorithm. 255 **alg_kwargs: Additional keyword arguments for the base class. 256 257 Attributes: 258 driver_h (Operator | None): The driver Hamiltonian for the problem. 259 delta_t (float): The time step for the evolution operators. 260 beta_0 (float): The initial value of beta. 261 n (int): The number of iterations to run the algorithm. 262 cost_h (Operator | None): The cost Hamiltonian for the problem. 263 n_qubits (int): The number of qubits in the problem. 264 parameters (list[str]): The list of algorithm parameters. 265 266 """ 267 _algorithm_format = 'hamiltonian' 268 269 def __init__( 270 self, 271 driver_h: SparsePauliOp | None = None, 272 delta_t: float = 0.03, 273 beta_0: float = 0.0, 274 max_reps: int = 20 275 ) -> None: 276 super().__init__() 277 self.driver_h = driver_h 278 self.cost_h = None 279 self.delta_t = delta_t 280 self.beta_0 = beta_0 281 self.max_reps = max_reps 282 self.n_qubits: int = 0 283 self.parameters = ['n', 'delta_t', 'beta_0'] 284 285 @property 286 def setup(self) -> dict: 287 return { 288 'driver_h': self.driver_h, 289 'delta_t': self.delta_t, 290 'beta_0': self.beta_0, 291 'n': self.max_reps, 292 'cost_h': self.cost_h, 293 'n_qubits': self.n_qubits, 294 'parameters': self.parameters, 295 'arg_kwargs': self.alg_kwargs 296 } 297 298 def _get_path(self) -> str: 299 return f'{self.name}@{self.max_reps}@{self.delta_t}@{self.beta_0}' 300
[docs] 301 def run(self, problem: Problem, backend: QiskitBackend, formatter: Callable) -> Result: 302 """ Runs the FALQON algorithm """ 303 304 if isinstance(backend.sampler, BaseSamplerV1) or isinstance(backend.estimator, BaseEstimatorV1): 305 raise ValueError("FALQON works only on V2 samplers and estimators, consider using a different backend.") 306 307 cost_h = formatter(problem) 308 309 if cost_h is None: 310 raise ValueError("Formatter returned None") 311 312 self.n_qubits = cost_h.num_qubits 313 314 best_sample, betas, energies, depths, cnot_counts = self._falqon_subroutine(cost_h, backend) 315 316 best_data: BitArray = best_sample[0].data.meas 317 counts: dict = best_data.get_counts() 318 shots = best_data.num_shots 319 320 result = {'betas': betas, 321 'energies': energies, 322 'depths': depths, 323 'cxs': cnot_counts, 324 'n': self.max_reps, 325 'delta_t': self.delta_t, 326 'beta_0': self.beta_0, 327 'energy': min(energies), 328 } 329 330 return Result( 331 best_bitstring=max(counts, key=counts.get), 332 most_common_bitstring=max(counts, key=counts.get), 333 distribution={k: v/shots for k, v in counts.items()}, 334 energies=energies, 335 energy_std=np.std(energies), 336 best_energy=min(energies), 337 num_of_samples=shots, 338 average_energy=np.mean(energies), 339 most_common_bitstring_energy=0, 340 result=result 341 )
342 343 def _add_ansatz_part( 344 self, 345 cost_hamiltonian: SparsePauliOp, 346 driver_hamiltonian: SparsePauliOp, 347 beta: float, 348 circuit: QuantumCircuit 349 ) -> None: 350 """Adds a single FALQON ansatz 'building block' with the specified beta to the circuit""" 351 circ_part = QuantumCircuit(circuit.num_qubits) 352 353 circ_part.append(PauliEvolutionGate(cost_hamiltonian, time=self.delta_t), circ_part.qubits) 354 circ_part.append(PauliEvolutionGate(driver_hamiltonian, time=self.delta_t * beta), circ_part.qubits) 355 356 circuit.compose(circ_part, circ_part.qubits, inplace=True) 357 358 def _build_ansatz(self, cost_hamiltonian, driver_hamiltonian, betas): 359 """Build the FALQON circuit for the given betas""" 360 361 circ = QuantumCircuit(self.n_qubits) 362 circ.h(range(self.n_qubits)) 363 364 for beta in betas: 365 circ.append(PauliEvolutionGate(cost_hamiltonian, time=self.delta_t), circ.qubits) 366 circ.append(PauliEvolutionGate(driver_hamiltonian, time=self.delta_t * beta), circ.qubits) 367 return circ 368 369 def _falqon_subroutine( 370 self, 371 cost_hamiltonian: SparsePauliOp, 372 backend: QiskitBackend 373 ) -> tuple[PrimitiveResult[SamplerPubResult], list[float], list[float], list[int], list[int]]: 374 """ 375 Run the 'meat' of the algorithm. 376 377 Args: 378 cost_hamiltonian (SparsePauliOp): Cost hamiltonian from the formatter. 379 backend (QiskitBackend): Backend 380 381 Returns: 382 tuple[PrimitiveResult[SamplerPubResult], list[float], list[float], list[int], list[int]]: 383 Sampler result from best betas, list of betas, list of energies, list of depths, list of cnot counts 384 """ 385 386 if self.driver_h is None: 387 self.driver_h = SparsePauliOp.from_sparse_list([("X", [i], 1) for i in range(self.n_qubits)], num_qubits=self.n_qubits) 388 driver_hamiltonian = self.driver_h 389 else: 390 driver_hamiltonian = self.driver_h 391 392 hamiltonian_commutator = complex(0, 1) * commutator(driver_hamiltonian, cost_hamiltonian) 393 394 betas = [self.beta_0] 395 energies = [] 396 cnot_counts = [] 397 circuit_depths = [] 398 399 circuit = QuantumCircuit(self.n_qubits) 400 circuit.h(circuit.qubits) 401 402 self._add_ansatz_part(cost_hamiltonian, driver_hamiltonian, self.beta_0, circuit) 403 404 for i in range(self.max_reps): 405 406 beta = -1 * backend.estimator.run([(circuit, hamiltonian_commutator)]).result()[0].data.evs 407 betas.append(beta) 408 409 self._add_ansatz_part(cost_hamiltonian, driver_hamiltonian, beta, circuit) 410 411 energy = backend.estimator.run([(circuit, cost_hamiltonian)]).result()[0].data.evs 412 # print(i, energy) 413 energies.append(energy) 414 circuit_depths.append(circuit.depth()) 415 cnot_counts.append(circuit.count_ops().get('cx', 0)) 416 417 argmin = np.argmin(np.asarray(energies)) 418 419 sampling_circuit = self._build_ansatz(cost_hamiltonian, driver_hamiltonian, betas[:argmin]) 420 sampling_circuit.measure_all() 421 422 best_sample = backend.sampler.run([(sampling_circuit)]).result() 423 424 return best_sample, betas, energies, circuit_depths, cnot_counts
425 426
[docs] 427class VQE(QiskitOptimizationAlgorithm): 428 """Variational Quantum EigenSolver - qiskit-algorithm implementation wrapper. 429 430 Args: 431 optimizer (optimizers.Optimizer | None, optional): Optimizer for VQE. Defaults to None. 432 ansatz (QuantumCircuit | None, optional): VQE's ansatz. Defaults to None. 433 with_numpy (bool, optional): Ignores ansatz parameter and backend, and changes solver to Numpy based. Defaults to False. 434 """ 435 # pip install git+https://github.com/qiskit-community/qiskit-nature.git 436 # pyscf 437 438 def __init__(self, optimizer: optimizers.Optimizer | None = None, 439 ansatz: QuantumCircuit | None = None, with_numpy: bool = False) -> None: 440 self.optimizer = optimizers.COBYLA() if optimizer is None else optimizer 441 self.ansatz = ansatz 442 self.num_qubits: int = 0 443 self.with_numpy: bool = with_numpy 444 super().__init__() 445 446 @property 447 def ansatz(self) -> QuantumCircuit: 448 if self._ansatz is None: 449 return efficient_su2(self.num_qubits) 450 return self._ansatz 451 452 @ansatz.setter 453 def ansatz(self, custom_ansatz): 454 self._ansatz = custom_ansatz 455
[docs] 456 def run(self, problem: Problem, backend: Backend, formatter: Callable[..., Any]) -> Result: 457 if not isinstance(backend, QiskitBackend): 458 raise ValueError('Backend should be QiskitBackend or subclass.') 459 if not isinstance(problem, Molecule): 460 raise ValueError('The problem for this algorithm should be Molecule problem') 461 if not isinstance(problem.operator.num_qubits, int): 462 raise ValueError('num_qubits from problem operator is expected to be int') 463 464 estimator = backend.estimator 465 self.num_qubits = problem.operator.num_qubits 466 if self.with_numpy: 467 solver = qiskit_algorithms.NumPyMinimumEigensolver() 468 else: 469 solver = qiskit_algorithms.VQE(estimator, self.ansatz, self.optimizer) 470 vqe_gss = GroundStateEigensolver(problem.mapper, solver) 471 vqe_results = vqe_gss.solve(problem.problem) 472 return self.construct_result(vqe_results)
473
[docs] 474 def construct_result(self, result: EigenstateResult) -> Result: 475 energy = result.total_energies[0] 476 # Not the cleanest way 477 return Result( 478 '', 479 energy, 480 '', 481 energy, 482 {'': 1}, 483 {'': energy}, 484 1, 485 energy, 486 0, 487 None 488 )