Source code for quantum_launcher.routines.qiskit_routines.algorithms.qiskit_native

  1""" Algorithms for Qiskit routines """
  2import json
  3from datetime import datetime
  4from typing import Callable
  5
  6import numpy as np
  7
  8from qiskit import qpy, QuantumCircuit
  9from qiskit.circuit import ParameterVector
 10from qiskit.circuit.library import PauliEvolutionGate
 11# from qiskit.opflow import H
 12from qiskit.primitives.base.base_primitive import BasePrimitive
 13from qiskit.quantum_info import SparsePauliOp
 14from qiskit_algorithms.minimum_eigensolvers import QAOA as QiskitQAOA
 15from qiskit_algorithms.minimum_eigensolvers import SamplingVQEResult
 16
 17from quantum_launcher.base import Problem, Algorithm, Result
 18from quantum_launcher.routines.qiskit_routines.backends.ibm_backend import IBMBackend
 19
 20
[docs] 21class QiskitOptimizationAlgorithm(Algorithm): 22 """ Abstract class for Qiskit optimization algorithms """ 23
[docs] 24 def make_tag(self, problem: Problem, backend: IBMBackend) -> str: 25 tag = problem.__class__.__name__ + '-' + \ 26 backend.__class__.__name__ + '-' + \ 27 self.__class__.__name__ + '-' + \ 28 datetime.today().strftime('%Y-%m-%d') 29 return tag
30
[docs] 31 def get_processing_times(self, tag: str, primitive: BasePrimitive) -> None | tuple[list, list, int]: 32 timestamps = [] 33 usages = [] 34 qpu_time = 0 35 if hasattr(primitive, 'session'): 36 jobs = primitive.session.service.jobs(limit=None, job_tags=[tag]) 37 for job in jobs: 38 m = job.metrics() 39 timestamps.append(m['timestamps']) 40 usages.append(m['usage']) 41 qpu_time += m['usage']['quantum_seconds'] 42 return timestamps, usages, qpu_time
43 44
[docs] 45def commutator(op_a: SparsePauliOp, op_b: SparsePauliOp) -> SparsePauliOp: 46 """ Commutator """ 47 return op_a @ op_b - op_b @ op_a
48 49
[docs] 50class QAOA(QiskitOptimizationAlgorithm): 51 """Algorithm class with QAOA. 52 53 Args: 54 p (int): The number of QAOA steps. Defaults to 1. 55 alternating_ansatz (bool): Whether to use an alternating ansatz. Defaults to False. If True, it's recommended to provide a mixer_h to alg_kwargs. 56 aux: Auxiliary input for the QAOA algorithm. 57 **alg_kwargs: Additional keyword arguments for the base class. 58 59 Attributes: 60 name (str): The name of the algorithm. 61 aux: Auxiliary input for the QAOA algorithm. 62 p (int): The number of QAOA steps. 63 alternating_ansatz (bool): Whether to use an alternating ansatz. 64 parameters (list): List of parameters for the algorithm. 65 mixer_h (SparsePauliOp | None): The mixer Hamiltonian. 66 67 """ 68 _algorithm_format = 'hamiltonian' 69 70 def __init__(self, p: int = 1, alternating_ansatz: bool = False, aux=None, **alg_kwargs): 71 super().__init__(**alg_kwargs) 72 self.name: str = 'qaoa' 73 self.aux = aux 74 self.p: int = p 75 self.alternating_ansatz: bool = alternating_ansatz 76 self.parameters = ['p'] 77 self.mixer_h: SparsePauliOp | None = None 78 self.initial_state: QuantumCircuit | None = None 79 80 @property 81 def setup(self) -> dict: 82 return { 83 'aux': self.aux, 84 'p': self.p, 85 'parameters': self.parameters, 86 'arg_kwargs': self.alg_kwargs 87 } 88
[docs] 89 def parse_samplingVQEResult(self, res: SamplingVQEResult, res_path) -> dict: 90 res_dict = {} 91 for k, v in vars(res).items(): 92 if k[0] == "_": 93 key = k[1:] 94 else: 95 key = k 96 try: 97 res_dict = {**res_dict, **json.loads(json.dumps({key: v}))} 98 except TypeError as ex: 99 if str(ex) == 'Object of type complex128 is not JSON serializable': 100 res_dict = {**res_dict, ** 101 json.loads(json.dumps({key: v}, default=repr))} 102 elif str(ex) == 'Object of type ndarray is not JSON serializable': 103 res_dict = {**res_dict, ** 104 json.loads(json.dumps({key: v}, default=repr))} 105 elif str(ex) == 'keys must be str, int, float, bool or None, not ParameterVectorElement': 106 res_dict = {**res_dict, ** 107 json.loads(json.dumps({key: repr(v)}))} 108 elif str(ex) == 'Object of type OptimizerResult is not JSON serializable': 109 # recursion ftw 110 new_v = self.parse_samplingVQEResult(v, res_path) 111 res_dict = {**res_dict, ** 112 json.loads(json.dumps({key: new_v}))} 113 elif str(ex) == 'Object of type QuantumCircuit is not JSON serializable': 114 path = res_path + '.qpy' 115 with open(path, 'wb') as f: 116 qpy.dump(v, f) 117 res_dict = {**res_dict, **{key: path}} 118 return res_dict
119
[docs] 120 def run(self, problem: Problem, backend: IBMBackend, formatter=Callable) -> Result: 121 """ Runs the QAOA algorithm """ 122 hamiltonian: SparsePauliOp = formatter(problem) 123 energies = [] 124 125 def qaoa_callback(evaluation_count, params, mean, std): 126 energies.append(mean) 127 128 tag = self.make_tag(problem, backend) 129 sampler = backend.samplerV1 130 # sampler.set_options(job_tags=[tag]) 131 optimizer = backend.optimizer 132 133 if self.alternating_ansatz: 134 if self.mixer_h is None: 135 self.mixer_h = formatter.get_mixer_hamiltonian(problem) 136 if self.initial_state is None: 137 self.initial_state = formatter.get_QAOAAnsatz_initial_state( 138 problem) 139 140 qaoa = QiskitQAOA(sampler, optimizer, reps=self.p, callback=qaoa_callback, 141 mixer=self.mixer_h, initial_state=self.initial_state, **self.alg_kwargs) 142 qaoa_result = qaoa.compute_minimum_eigenvalue(hamiltonian, self.aux) 143 depth = qaoa.ansatz.decompose(reps=10).depth() 144 if 'cx' in qaoa.ansatz.decompose(reps=10).count_ops(): 145 cx_count = qaoa.ansatz.decompose(reps=10).count_ops()['cx'] 146 else: 147 cx_count = 0 148 timestamps, usages, qpu_time = self.get_processing_times(tag, sampler) 149 return self.construct_result({'energy': qaoa_result.eigenvalue, 150 'depth': depth, 151 'cx_count': cx_count, 152 'qpu_time': qpu_time, 153 'energies': energies, 154 'SamplingVQEResult': qaoa_result, 155 'usages': usages, 156 'timestamps': timestamps})
157
[docs] 158 def construct_result(self, result: dict) -> Result: 159 160 best_bitstring = self.get_bitstring(result) 161 best_energy = result['energy'] 162 163 distribution = dict(result['SamplingVQEResult'].eigenstate.items()) 164 most_common_value = max( 165 distribution, key=distribution.get) 166 most_common_bitstring = bin(most_common_value)[2:].zfill( 167 len(best_bitstring)) 168 most_common_bitstring_energy = distribution[most_common_value] 169 num_of_samples = 0 # TODO: implement 170 average_energy = np.mean(result['energies']) 171 energy_std = np.std(result['energies']) 172 return Result(best_bitstring, best_energy, most_common_bitstring, most_common_bitstring_energy, distribution, result['energies'], num_of_samples, average_energy, energy_std, result)
173
[docs] 174 def get_bitstring(self, result) -> str: 175 return result['SamplingVQEResult'].best_measurement['bitstring']
176 177
[docs] 178class FALQON(QiskitOptimizationAlgorithm): 179 """ 180 Algorithm class with FALQON. 181 182 Args: 183 driver_h (Optional[Operator]): The driver Hamiltonian for the problem. 184 delta_t (float): The time step for the evolution operators. 185 beta_0 (float): The initial value of beta. 186 n (int): The number of iterations to run the algorithm. 187 **alg_kwargs: Additional keyword arguments for the base class. 188 189 Attributes: 190 driver_h (Optional[Operator]): The driver Hamiltonian for the problem. 191 delta_t (float): The time step for the evolution operators. 192 beta_0 (float): The initial value of beta. 193 n (int): The number of iterations to run the algorithm. 194 cost_h (Optional[Operator]): The cost Hamiltonian for the problem. 195 n_qubits (int): The number of qubits in the problem. 196 parameters (List[str]): The list of algorithm parameters. 197 198 """ 199 200 def __init__(self, driver_h=None, delta_t=0, beta_0=0, n=1): 201 super().__init__() 202 self.driver_h = driver_h 203 self.delta_t = delta_t 204 self.beta_0 = beta_0 205 self.n = n 206 self.cost_h = None 207 self.n_qubits: int = 0 208 self.parameters = ['n', 'delta_t', 'beta_0'] 209 raise NotImplementedError('FALQON is not implemented yet') 210 211 @property 212 def setup(self) -> dict: 213 return { 214 'driver_h': self.driver_h, 215 'delta_t': self.delta_t, 216 'beta_0': self.beta_0, 217 'n': self.n, 218 'cost_h': self.cost_h, 219 'n_qubits': self.n_qubits, 220 'parameters': self.parameters, 221 'arg_kwargs': self.alg_kwargs 222 } 223 224 def _get_path(self) -> str: 225 return f'{self.name}@{self.n}@{self.delta_t}@{self.beta_0}' 226
[docs] 227 def run(self, problem: Problem, backend: IBMBackend): 228 """ Runs the FALQON algorithm """ 229 # TODO implement aux operator 230 hamiltonian = problem.get_qiskit_hamiltonian() 231 self.cost_h = hamiltonian 232 self.n_qubits = hamiltonian.num_qubits 233 if self.driver_h is None: 234 self.driver_h = SparsePauliOp.from_sparse_list( 235 [("X", [i], 1) for i in range(self.n_qubits)], num_qubits=self.n_qubits) 236 237 betas = [self.beta_0] 238 energies = [] 239 circuit_depths = [] 240 cxs = [] 241 242 tag = self.make_tag(problem, backend) 243 estimator = backend.estimator 244 sampler = backend.sampler 245 sampler.set_options(job_tags=[tag]) 246 estimator.set_options(job_tags=[tag]) 247 248 best_sample, last_sample = self._falqon_subroutine(estimator, 249 sampler, energies, betas, circuit_depths, cxs) 250 251 timestamps, usages, qpu_time = self.get_processing_times(tag, sampler) 252 result = {'betas': betas, 253 'energies': energies, 254 'depths': circuit_depths, 255 'cxs': cxs, 256 'n': self.n, 257 'delta_t': self.delta_t, 258 'beta_0': self.beta_0, 259 'energy': min(energies), 260 'qpu_time': qpu_time, 261 'best_sample': best_sample, 262 'last_sample': last_sample, 263 'usages': usages, 264 'timestamps': timestamps} 265 266 return result
267 268 def _build_ansatz(self, betas): 269 """ building ansatz circuit """ 270 H = None # TODO: implement H 271 circ = (H ^ self.cost_h.num_qubits).to_circuit() 272 params = ParameterVector("beta", length=len(betas)) 273 for param in params: 274 circ.append(PauliEvolutionGate( 275 self.cost_h, time=self.delta_t), circ.qubits) 276 circ.append(PauliEvolutionGate(self.driver_h, 277 time=self.delta_t * param), circ.qubits) 278 return circ 279 280 def _falqon_subroutine(self, estimator, 281 sampler, energies, betas, circuit_depths, cxs): 282 """ subroutine for falqon """ 283 for i in range(self.n): 284 betas, energy, depth, cx_count = self._run_falqon(betas, estimator) 285 print(i, energy) 286 energies.append(energy) 287 circuit_depths.append(depth) 288 cxs.append(cx_count) 289 argmin = np.argmin(np.asarray(energies)) 290 best_sample = self._sample_at(betas[:argmin], sampler) 291 last_sample = self._sample_at(betas, sampler) 292 return best_sample, last_sample 293 294 def _run_falqon(self, betas, estimator): 295 """ Method to run FALQON algorithm """ 296 ansatz = self._build_ansatz(betas) 297 comm_h = complex(0, 1) * commutator(self.driver_h, self.cost_h) 298 beta = -1 * estimator.run(ansatz, comm_h, betas).result().values[0] 299 betas.append(beta) 300 301 ansatz = self._build_ansatz(betas) 302 energy = estimator.run(ansatz, self.cost_h, betas).result().values[0] 303 304 depth = ansatz.decompose(reps=10).depth() 305 if 'cx' in ansatz.decompose(reps=10).count_ops(): 306 cx_count = ansatz.decompose(reps=10).count_ops()['cx'] 307 else: 308 cx_count = 0 309 310 return betas, energy, depth, cx_count 311 312 def _sample_at(self, betas, sampler): 313 """ Not sure yet """ 314 ansatz = self._build_ansatz(betas) 315 ansatz.measure_all() 316 res = sampler.run(ansatz, betas).result() 317 return res