Source code for qlauncher.routines.qiskit.adapters

  1import math
  2from collections.abc import Iterable, Sequence
  3from itertools import chain
  4from typing import Any
  5
  6import numpy as np
  7from qiskit import transpile
  8from qiskit.circuit import QuantumCircuit
  9from qiskit.primitives import BasePrimitiveJob, BitArray, DataBin, SamplerResult
 10from qiskit.primitives.base import BaseEstimatorV1, BaseEstimatorV2, BaseSamplerV1, BaseSamplerV2, EstimatorResult
 11from qiskit.primitives.containers import PubResult
 12from qiskit.primitives.containers.estimator_pub import EstimatorPub, EstimatorPubLike
 13from qiskit.primitives.containers.primitive_result import PrimitiveResult
 14from qiskit.primitives.containers.sampler_pub import SamplerPub, SamplerPubLike
 15from qiskit.primitives.containers.sampler_pub_result import SamplerPubResult
 16from qiskit.primitives.primitive_job import PrimitiveJob
 17from qiskit.quantum_info import SparsePauliOp
 18from qiskit.result import QuasiDistribution
 19
 20from qlauncher.routines.circuits import CIRCUIT_FORMATS
 21from qlauncher.routines.qiskit.backends.gate_circuit_backend import GateCircuitBackend
 22from qlauncher.routines.qiskit.utils import coerce_to_circuit_list
 23
 24
 25def _transpile_circuits(circuits, backend):
 26    # Transpile qaoa circuit to backend instruction set, if backend is provided
 27    # ? I pass a backend into SamplerV2 as *mode* but here sampler_v2.mode returns None, why?
 28    if backend is not None:
 29        circuits = [transpile(circuit) for circuit in circuits] if isinstance(circuits, Sequence) else transpile(circuits)
 30
 31    return circuits
 32
 33
[docs] 34class SamplerV2ToSamplerV1Adapter(BaseSamplerV1): 35 """ 36 Adapts a v2 sampler to a v1 interface. 37 """ 38 39 def __init__(self, sampler_v2: BaseSamplerV2, backend=None): 40 """ 41 Args: 42 sampler_v2 (BaseSamplerV2): V2 sampler to be adapted. 43 backend (Backend | None): Backend to transpile circuits to. 44 """ 45 self.sampler_v2 = sampler_v2 46 self.backend = backend 47 super().__init__() 48 49 def _get_quasi_meta(self, res): 50 data = BitArray.concatenate_bits(list(res.data.values())) 51 counts = data.get_int_counts() 52 probs = {measurement: counts / data.num_shots for measurement, counts in counts.items()} 53 quasi_dists = QuasiDistribution(probs, shots=data.num_shots) 54 55 metadata = res.metadata 56 metadata['sampler_version'] = 2 # might be useful for debugging 57 58 return quasi_dists, metadata 59 60 def _run_v2(self, pubs, **run_options): 61 job = self.sampler_v2.run(pubs=pubs, **run_options) 62 result = job.result() 63 quasi_dists, metas = [], [] 64 for result_single in result: 65 quasi_dist, metadata = self._get_quasi_meta(result_single) 66 quasi_dists.append(quasi_dist) 67 metas.append(metadata) 68 69 return SamplerResult(quasi_dists=quasi_dists, metadata=metas) 70 71 def _run(self, circuits, parameter_values=None, **run_options) -> PrimitiveJob: 72 circuits = _transpile_circuits(circuits, self.backend) 73 v2_list = list(zip(circuits, parameter_values)) 74 75 job = PrimitiveJob(self._run_v2, v2_list, **run_options) 76 job._submit() 77 return job
78 79
[docs] 80class SamplerV1ToSamplerV2Adapter(BaseSamplerV2): 81 """ 82 Adapts a v1 sampler to a v2 interface. 83 """ 84 85 def __init__(self, sampler_v1: BaseSamplerV1) -> None: 86 super().__init__() 87 self.samplerv1 = sampler_v1 88 89 def _run(self, pubs: Iterable[SamplerPubLike], shots: int = 1024): 90 circuits, params = [], [] 91 for pub in pubs: 92 coerced = SamplerPub.coerce(pub) 93 circuits.append(coerced.parameter_values.bind_all(coerced.circuit).item()) 94 params.append([]) 95 96 out = self.samplerv1.run(circuits, params, shots=shots).result() 97 98 results = [] 99 for circuit, dist in zip(circuits, out.quasi_dists): 100 values: list[int] = [] 101 for value, relative_frequency in dist.items(): 102 values += [value] * int(round(relative_frequency * shots, 0)) 103 104 required_bits = circuit.num_qubits 105 required_bytes = math.ceil(required_bits / 8) 106 byte_array = np.array([np.frombuffer(value.to_bytes(required_bytes), dtype=np.uint8) for value in values]) 107 108 bit_array = BitArray(byte_array, num_bits=required_bits) 109 110 results.append(SamplerPubResult(data=DataBin(meas=bit_array), metadata={'shots': shots})) 111 112 return PrimitiveResult(results, metadata={'version': 2}) 113
[docs] 114 def run(self, pubs: Iterable[SamplerPubLike], *, shots: int | None = None) -> BasePrimitiveJob[PrimitiveResult[SamplerPubResult], Any]: 115 job = PrimitiveJob(self._run, pubs, shots if shots is not None else 1024) 116 job._submit() 117 return job
118 119
[docs] 120class EstimatorV1ToEstimatorV2Adapter(BaseEstimatorV2): 121 def __init__(self, estimator: BaseEstimatorV1) -> None: 122 super().__init__() 123 self.estimator = estimator 124 125 def _construct_v2_result(self, estimator_result: EstimatorResult) -> PubResult: 126 var = np.array([meta.get('variance', 0) for meta in estimator_result.metadata]) 127 shots = np.array([meta.get('shots', 1) for meta in estimator_result.metadata]) 128 129 values = estimator_result.values 130 if len(values) == 1: 131 values = values.squeeze() 132 var = var.squeeze() 133 shots = shots.squeeze() 134 data_bin = DataBin(evs=values, stds=var / np.sqrt(shots), shape=values.shape if isinstance(values, np.ndarray) else tuple()) 135 return PubResult( 136 data_bin, 137 metadata={ 138 'shots': shots, 139 }, 140 ) 141 142 def _run(self, pubs: Iterable[EstimatorPub]) -> PrimitiveResult[PubResult]: 143 results = [] 144 for pub in pubs: 145 observables = pub.observables 146 parameter_values = pub.parameter_values 147 148 param_shape = parameter_values.shape 149 param_indices = np.fromiter(np.ndindex(param_shape), dtype=object).reshape(param_shape) 150 broadcast_param_indices, broadcast_observables = np.broadcast_arrays(param_indices, observables) 151 152 params_final, final_observables = [], [] 153 for index in np.ndindex(*broadcast_param_indices.shape): 154 param_index = broadcast_param_indices[index] 155 params_final.append(parameter_values[param_index].as_array()) 156 final_observables.append(broadcast_observables[index]) 157 158 res = self.estimator.run( 159 [pub.circuit] * len(params_final), 160 [SparsePauliOp.from_list(observable.items()) for observable in final_observables], 161 params_final, 162 ).result() 163 results.append(res) 164 165 return PrimitiveResult([self._construct_v2_result(result) for result in results], metadata={'version': 2}) 166
[docs] 167 def run(self, pubs: Iterable[EstimatorPubLike], *, precision: float | None = None) -> BasePrimitiveJob[PrimitiveResult[PubResult], Any]: 168 coerced_pubs = [EstimatorPub.coerce(pub, precision) for pub in pubs] 169 job = PrimitiveJob(self._run, coerced_pubs) 170 job._submit() 171 return job
172 173
[docs] 174class TranslatingSamplerV1(BaseSamplerV1): 175 def __init__(self, sampler_v1: BaseSamplerV1, compatible_circuit: CIRCUIT_FORMATS, options: dict | None = None): 176 super().__init__(options=options) 177 self.sampler = sampler_v1 178 self.compatible_circuit = compatible_circuit 179 180 def _run(self, circuits: tuple[QuantumCircuit, ...], parameter_values: tuple[tuple[float, ...], ...], **run_options) -> PrimitiveJob: 181 bound_circuits = [ 182 circuit.assign_parameters(params, strict=True) for circuit, params in zip(circuits, parameter_values, strict=True) 183 ] 184 pubs = [ 185 GateCircuitBackend.get_translation(circuit, self.compatible_circuit) 186 if not isinstance(circuit, self.compatible_circuit) 187 else circuit 188 for circuit in bound_circuits 189 ] 190 return self.sampler.run(pubs, parameter_values=parameter_values, **run_options)
191 192
[docs] 193class TranslatingSampler(BaseSamplerV2): 194 def __init__(self, sampler_v2: BaseSamplerV2, compatible_circuit: CIRCUIT_FORMATS): 195 super().__init__() 196 self.sampler = sampler_v2 197 self.compatible_circuit = compatible_circuit 198
[docs] 199 def run( 200 self, 201 pubs: Iterable[SamplerPubLike | CIRCUIT_FORMATS], 202 *, 203 shots: int | None = None, 204 ) -> BasePrimitiveJob[PrimitiveResult[SamplerPubResult], Any]: 205 circuits = chain(*(coerce_to_circuit_list(pub, shots) for pub in pubs)) 206 pubs = [ 207 GateCircuitBackend.get_translation(circuit, self.compatible_circuit) 208 if not isinstance(circuit, self.compatible_circuit) 209 else circuit 210 for circuit in circuits 211 ] 212 return self.sampler.run(pubs, shots=shots)