Source code for qlauncher.routines.qiskit.algorithms.qiskit_native

  1""" Algorithms for Qiskit routines """
  2import json
  3from datetime import datetime
  4from collections.abc import Callable
  5
  6import numpy as np
  7
  8from qiskit import qpy, QuantumCircuit
  9from qiskit.circuit.library import PauliEvolutionGate
 10
 11from qiskit.primitives import PrimitiveResult, SamplerPubResult, BaseEstimatorV1, BaseSamplerV1
 12from qiskit.primitives.containers import BitArray
 13from qiskit.primitives.base.base_primitive import BasePrimitive
 14
 15from qiskit.quantum_info import SparsePauliOp
 16
 17from qiskit_algorithms.minimum_eigensolvers import QAOA as QiskitQAOA
 18from qiskit_algorithms.minimum_eigensolvers import SamplingVQEResult
 19from qiskit_algorithms.optimizers import Optimizer, COBYLA
 20
 21from qlauncher.base import Problem, Algorithm, Result
 22from qlauncher.base.base import Backend
 23from qlauncher.routines.cirq import CirqBackend
 24from qlauncher.routines.qiskit.backends.qiskit_backend import QiskitBackend
 25
 26
[docs] 27class QiskitOptimizationAlgorithm(Algorithm): 28 """ Abstract class for Qiskit optimization algorithms """ 29
[docs] 30 def make_tag(self, problem: Problem, backend: QiskitBackend) -> str: 31 tag = problem.__class__.__name__ + '-' + \ 32 backend.__class__.__name__ + '-' + \ 33 self.__class__.__name__ + '-' + \ 34 datetime.today().strftime('%Y-%m-%d') 35 return tag
36
[docs] 37 def get_processing_times(self, tag: str, primitive: BasePrimitive) -> None | tuple[list, list, int]: 38 timestamps = [] 39 usages = [] 40 qpu_time = 0 41 if hasattr(primitive, 'session'): 42 jobs = primitive.session.service.jobs(limit=None, job_tags=[tag]) 43 for job in jobs: 44 m = job.metrics() 45 timestamps.append(m['timestamps']) 46 usages.append(m['usage']) 47 qpu_time += m['usage']['quantum_seconds'] 48 return timestamps, usages, qpu_time
49 50
[docs] 51def commutator(op_a: SparsePauliOp, op_b: SparsePauliOp) -> SparsePauliOp: 52 """ Commutator """ 53 return op_a @ op_b - op_b @ op_a
54 55
[docs] 56class QAOA(QiskitOptimizationAlgorithm): 57 """Algorithm class with QAOA. 58 59 Args: 60 p (int): The number of QAOA steps. Defaults to 1. 61 optimizer (Optimizer | None): Optimizer used during algorithm runtime. If set to `None` turns into COBYLA. Defaults to None, 62 alternating_ansatz (bool): Whether to use an alternating ansatz. Defaults to False. If True, it's recommended to provide a mixer_h to alg_kwargs. 63 aux: Auxiliary input for the QAOA algorithm. 64 **alg_kwargs: Additional keyword arguments for the base class. 65 66 Attributes: 67 name (str): The name of the algorithm. 68 aux: Auxiliary input for the QAOA algorithm. 69 p (int): The number of QAOA steps. 70 optimizer (Optimizer): Optimizer used during algorithm runtime. 71 alternating_ansatz (bool): Whether to use an alternating ansatz. 72 parameters (list): List of parameters for the algorithm. 73 mixer_h (SparsePauliOp | None): The mixer Hamiltonian. 74 75 """ 76 _algorithm_format = 'hamiltonian' 77 78 def __init__(self, p: int = 1, optimizer: Optimizer | None = None, alternating_ansatz: bool = False, aux=None, **alg_kwargs): 79 super().__init__(**alg_kwargs) 80 self.name: str = 'qaoa' 81 self.aux = aux 82 self.p: int = p 83 self.optimizer: Optimizer = optimizer if optimizer is not None else COBYLA() 84 self.alternating_ansatz: bool = alternating_ansatz 85 self.parameters = ['p'] 86 self.mixer_h: SparsePauliOp | None = None 87 self.initial_state: QuantumCircuit | None = None 88 89 @property 90 def setup(self) -> dict: 91 return { 92 'aux': self.aux, 93 'p': self.p, 94 'parameters': self.parameters, 95 'arg_kwargs': self.alg_kwargs 96 } 97
[docs] 98 def parse_samplingVQEResult(self, res: SamplingVQEResult, res_path) -> dict: 99 res_dict = {} 100 for k, v in vars(res).items(): 101 if k[0] == "_": 102 key = k[1:] 103 else: 104 key = k 105 try: 106 res_dict = {**res_dict, **json.loads(json.dumps({key: v}))} 107 except TypeError as ex: 108 if str(ex) == 'Object of type complex128 is not JSON serializable': 109 res_dict = {**res_dict, ** 110 json.loads(json.dumps({key: v}, default=repr))} 111 elif str(ex) == 'Object of type ndarray is not JSON serializable': 112 res_dict = {**res_dict, ** 113 json.loads(json.dumps({key: v}, default=repr))} 114 elif str(ex) == 'keys must be str, int, float, bool or None, not ParameterVectorElement': 115 res_dict = {**res_dict, ** 116 json.loads(json.dumps({key: repr(v)}))} 117 elif str(ex) == 'Object of type OptimizerResult is not JSON serializable': 118 # recursion ftw 119 new_v = self.parse_samplingVQEResult(v, res_path) 120 res_dict = {**res_dict, ** 121 json.loads(json.dumps({key: new_v}))} 122 elif str(ex) == 'Object of type QuantumCircuit is not JSON serializable': 123 path = res_path + '.qpy' 124 with open(path, 'wb') as f: 125 qpy.dump(v, f) 126 res_dict = {**res_dict, **{key: path}} 127 return res_dict
128
[docs] 129 def run(self, problem: Problem, backend: Backend, formatter: Callable) -> Result: 130 """ Runs the QAOA algorithm """ 131 if not (isinstance(backend, QiskitBackend) or isinstance(backend, CirqBackend)): 132 raise ValueError('Backend should be CirqBackend, QiskitBackend or subclass.') 133 hamiltonian: SparsePauliOp = formatter(problem) 134 energies = [] 135 136 def qaoa_callback(evaluation_count, params, mean, std): 137 energies.append(mean) 138 139 tag = self.make_tag(problem, backend) 140 sampler = backend.samplerV1 141 # sampler.set_options(job_tags=[tag]) 142 143 if self.alternating_ansatz: 144 if self.mixer_h is None: 145 self.mixer_h = formatter.get_mixer_hamiltonian(problem) 146 if self.initial_state is None: 147 self.initial_state = formatter.get_QAOAAnsatz_initial_state( 148 problem) 149 150 qaoa = QiskitQAOA(sampler, self.optimizer, reps=self.p, callback=qaoa_callback, 151 mixer=self.mixer_h, initial_state=self.initial_state, **self.alg_kwargs) 152 qaoa_result = qaoa.compute_minimum_eigenvalue(hamiltonian, self.aux) 153 depth = qaoa.ansatz.decompose(reps=10).depth() 154 if 'cx' in qaoa.ansatz.decompose(reps=10).count_ops(): 155 cx_count = qaoa.ansatz.decompose(reps=10).count_ops()['cx'] 156 else: 157 cx_count = 0 158 timestamps, usages, qpu_time = self.get_processing_times(tag, sampler) 159 return self.construct_result({'energy': qaoa_result.eigenvalue, 160 'depth': depth, 161 'cx_count': cx_count, 162 'qpu_time': qpu_time, 163 'energies': energies, 164 'SamplingVQEResult': qaoa_result, 165 'usages': usages, 166 'timestamps': timestamps})
167
[docs] 168 def construct_result(self, result: dict) -> Result: 169 170 best_bitstring = self.get_bitstring(result) 171 best_energy = result['energy'] 172 173 distribution = dict(result['SamplingVQEResult'].eigenstate.items()) 174 most_common_value = max( 175 distribution, key=distribution.get) 176 most_common_bitstring = bin(most_common_value)[2:].zfill( 177 len(best_bitstring)) 178 most_common_bitstring_energy = distribution[most_common_value] 179 num_of_samples = 0 # TODO: implement 180 average_energy = np.mean(result['energies']) 181 energy_std = np.std(result['energies']) 182 return Result(best_bitstring, best_energy, most_common_bitstring, most_common_bitstring_energy, distribution, result['energies'], num_of_samples, average_energy, energy_std, result)
183
[docs] 184 def get_bitstring(self, result) -> str: 185 return result['SamplingVQEResult'].best_measurement['bitstring']
186 187
[docs] 188class FALQON(QiskitOptimizationAlgorithm): 189 """ 190 Algorithm class with FALQON. 191 192 Args: 193 driver_h (Operator | None): The driver Hamiltonian for the problem. 194 delta_t (float): The time step for the evolution operators. 195 beta_0 (float): The initial value of beta. 196 n (int): The number of iterations to run the algorithm. 197 **alg_kwargs: Additional keyword arguments for the base class. 198 199 Attributes: 200 driver_h (Operator | None): The driver Hamiltonian for the problem. 201 delta_t (float): The time step for the evolution operators. 202 beta_0 (float): The initial value of beta. 203 n (int): The number of iterations to run the algorithm. 204 cost_h (Operator | None): The cost Hamiltonian for the problem. 205 n_qubits (int): The number of qubits in the problem. 206 parameters (list[str]): The list of algorithm parameters. 207 208 """ 209 _algorithm_format = 'hamiltonian' 210 211 def __init__( 212 self, 213 driver_h: SparsePauliOp | None = None, 214 delta_t: float = 0.03, 215 beta_0: float = 0.0, 216 max_reps: int = 20 217 ) -> None: 218 super().__init__() 219 self.driver_h = driver_h 220 self.cost_h = None 221 self.delta_t = delta_t 222 self.beta_0 = beta_0 223 self.max_reps = max_reps 224 self.n_qubits: int = 0 225 self.parameters = ['n', 'delta_t', 'beta_0'] 226 227 @property 228 def setup(self) -> dict: 229 return { 230 'driver_h': self.driver_h, 231 'delta_t': self.delta_t, 232 'beta_0': self.beta_0, 233 'n': self.max_reps, 234 'cost_h': self.cost_h, 235 'n_qubits': self.n_qubits, 236 'parameters': self.parameters, 237 'arg_kwargs': self.alg_kwargs 238 } 239 240 def _get_path(self) -> str: 241 return f'{self.name}@{self.max_reps}@{self.delta_t}@{self.beta_0}' 242
[docs] 243 def run(self, problem: Problem, backend: QiskitBackend, formatter: Callable) -> Result: 244 """ Runs the FALQON algorithm """ 245 246 if isinstance(backend.sampler, BaseSamplerV1) or isinstance(backend.estimator, BaseEstimatorV1): 247 raise ValueError("FALQON works only on V2 samplers and estimators, consider using a different backend.") 248 249 cost_h = formatter(problem) 250 251 if cost_h is None: 252 raise ValueError("Formatter returned None") 253 254 self.n_qubits = cost_h.num_qubits 255 256 best_sample, betas, energies, depths, cnot_counts = self._falqon_subroutine(cost_h, backend) 257 258 best_data: BitArray = best_sample[0].data.meas 259 counts: dict = best_data.get_counts() 260 shots = best_data.num_shots 261 262 result = {'betas': betas, 263 'energies': energies, 264 'depths': depths, 265 'cxs': cnot_counts, 266 'n': self.max_reps, 267 'delta_t': self.delta_t, 268 'beta_0': self.beta_0, 269 'energy': min(energies), 270 } 271 272 return Result( 273 best_bitstring=max(counts, key=counts.get), 274 most_common_bitstring=max(counts, key=counts.get), 275 distribution={k: v/shots for k, v in counts.items()}, 276 energies=energies, 277 energy_std=np.std(energies), 278 best_energy=min(energies), 279 num_of_samples=shots, 280 average_energy=np.mean(energies), 281 most_common_bitstring_energy=0, 282 result=result 283 )
284 285 def _add_ansatz_part( 286 self, 287 cost_hamiltonian: SparsePauliOp, 288 driver_hamiltonian: SparsePauliOp, 289 beta: float, 290 circuit: QuantumCircuit 291 ) -> None: 292 """Adds a single FALQON ansatz 'building block' with the specified beta to the circuit""" 293 circ_part = QuantumCircuit(circuit.num_qubits) 294 295 circ_part.append(PauliEvolutionGate(cost_hamiltonian, time=self.delta_t), circ_part.qubits) 296 circ_part.append(PauliEvolutionGate(driver_hamiltonian, time=self.delta_t * beta), circ_part.qubits) 297 298 circuit.compose(circ_part, circ_part.qubits, inplace=True) 299 300 def _build_ansatz(self, cost_hamiltonian, driver_hamiltonian, betas): 301 """Build the FALQON circuit for the given betas""" 302 303 circ = QuantumCircuit(self.n_qubits) 304 circ.h(range(self.n_qubits)) 305 306 for beta in betas: 307 circ.append(PauliEvolutionGate(cost_hamiltonian, time=self.delta_t), circ.qubits) 308 circ.append(PauliEvolutionGate(driver_hamiltonian, time=self.delta_t * beta), circ.qubits) 309 return circ 310 311 def _falqon_subroutine( 312 self, 313 cost_hamiltonian: SparsePauliOp, 314 backend: QiskitBackend 315 ) -> tuple[PrimitiveResult[SamplerPubResult], list[float], list[float], list[int], list[int]]: 316 """ 317 Run the 'meat' of the algorithm. 318 319 Args: 320 cost_hamiltonian (SparsePauliOp): Cost hamiltonian from the formatter. 321 backend (QiskitBackend): Backend 322 323 Returns: 324 tuple[PrimitiveResult[SamplerPubResult], list[float], list[float], list[int], list[int]]: 325 Sampler result from best betas, list of betas, list of energies, list of depths, list of cnot counts 326 """ 327 328 if self.driver_h is None: 329 self.driver_h = SparsePauliOp.from_sparse_list([("X", [i], 1) for i in range(self.n_qubits)], num_qubits=self.n_qubits) 330 driver_hamiltonian = self.driver_h 331 else: 332 driver_hamiltonian = self.driver_h 333 334 hamiltonian_commutator = complex(0, 1) * commutator(driver_hamiltonian, cost_hamiltonian) 335 336 betas = [self.beta_0] 337 energies = [] 338 cnot_counts = [] 339 circuit_depths = [] 340 341 circuit = QuantumCircuit(self.n_qubits) 342 circuit.h(circuit.qubits) 343 344 self._add_ansatz_part(cost_hamiltonian, driver_hamiltonian, self.beta_0, circuit) 345 346 for i in range(self.max_reps): 347 348 beta = -1 * backend.estimator.run([(circuit, hamiltonian_commutator)]).result()[0].data.evs 349 betas.append(beta) 350 351 self._add_ansatz_part(cost_hamiltonian, driver_hamiltonian, beta, circuit) 352 353 energy = backend.estimator.run([(circuit, cost_hamiltonian)]).result()[0].data.evs 354 # print(i, energy) 355 energies.append(energy) 356 circuit_depths.append(circuit.depth()) 357 cnot_counts.append(circuit.count_ops().get('cx', 0)) 358 359 argmin = np.argmin(np.asarray(energies)) 360 361 sampling_circuit = self._build_ansatz(cost_hamiltonian, driver_hamiltonian, betas[:argmin]) 362 sampling_circuit.measure_all() 363 364 best_sample = backend.sampler.run([(sampling_circuit)]).result() 365 366 return best_sample, betas, energies, circuit_depths, cnot_counts