1import math
2from collections.abc import Iterable, Sequence
3from itertools import chain
4from typing import Any
5
6import numpy as np
7from qiskit import transpile
8from qiskit.circuit import QuantumCircuit
9from qiskit.primitives import BasePrimitiveJob, BitArray, DataBin, SamplerResult
10from qiskit.primitives.base import BaseEstimatorV1, BaseEstimatorV2, BaseSamplerV1, BaseSamplerV2, EstimatorResult
11from qiskit.primitives.containers import PubResult
12from qiskit.primitives.containers.estimator_pub import EstimatorPub, EstimatorPubLike
13from qiskit.primitives.containers.primitive_result import PrimitiveResult
14from qiskit.primitives.containers.sampler_pub import SamplerPub, SamplerPubLike
15from qiskit.primitives.containers.sampler_pub_result import SamplerPubResult
16from qiskit.primitives.primitive_job import PrimitiveJob
17from qiskit.quantum_info import SparsePauliOp
18from qiskit.result import QuasiDistribution
19
20from qlauncher.routines.circuits import CIRCUIT_FORMATS
21from qlauncher.routines.qiskit.backends.gate_circuit_backend import GateCircuitBackend
22from qlauncher.routines.qiskit.utils import coerce_to_circuit_list
23
24
25def _transpile_circuits(circuits, backend):
26 # Transpile qaoa circuit to backend instruction set, if backend is provided
27 # ? I pass a backend into SamplerV2 as *mode* but here sampler_v2.mode returns None, why?
28 if backend is not None:
29 circuits = [transpile(circuit) for circuit in circuits] if isinstance(circuits, Sequence) else transpile(circuits)
30
31 return circuits
32
33
[docs]
34class SamplerV2ToSamplerV1Adapter(BaseSamplerV1):
35 """
36 Adapts a v2 sampler to a v1 interface.
37 """
38
39 def __init__(self, sampler_v2: BaseSamplerV2, backend=None):
40 """
41 Args:
42 sampler_v2 (BaseSamplerV2): V2 sampler to be adapted.
43 backend (Backend | None): Backend to transpile circuits to.
44 """
45 self.sampler_v2 = sampler_v2
46 self.backend = backend
47 super().__init__()
48
49 def _get_quasi_meta(self, res):
50 data = BitArray.concatenate_bits(list(res.data.values()))
51 counts = data.get_int_counts()
52 probs = {measurement: counts / data.num_shots for measurement, counts in counts.items()}
53 quasi_dists = QuasiDistribution(probs, shots=data.num_shots)
54
55 metadata = res.metadata
56 metadata['sampler_version'] = 2 # might be useful for debugging
57
58 return quasi_dists, metadata
59
60 def _run_v2(self, pubs, **run_options):
61 job = self.sampler_v2.run(pubs=pubs, **run_options)
62 result = job.result()
63 quasi_dists, metas = [], []
64 for result_single in result:
65 quasi_dist, metadata = self._get_quasi_meta(result_single)
66 quasi_dists.append(quasi_dist)
67 metas.append(metadata)
68
69 return SamplerResult(quasi_dists=quasi_dists, metadata=metas)
70
71 def _run(self, circuits, parameter_values=None, **run_options) -> PrimitiveJob:
72 circuits = _transpile_circuits(circuits, self.backend)
73 v2_list = list(zip(circuits, parameter_values))
74
75 job = PrimitiveJob(self._run_v2, v2_list, **run_options)
76 job._submit()
77 return job
78
79
[docs]
80class SamplerV1ToSamplerV2Adapter(BaseSamplerV2):
81 """
82 Adapts a v1 sampler to a v2 interface.
83 """
84
85 def __init__(self, sampler_v1: BaseSamplerV1) -> None:
86 super().__init__()
87 self.samplerv1 = sampler_v1
88
89 def _run(self, pubs: Iterable[SamplerPubLike], shots: int = 1024):
90 circuits, params = [], []
91 for pub in pubs:
92 coerced = SamplerPub.coerce(pub)
93 circuits.append(coerced.parameter_values.bind_all(coerced.circuit).item())
94 params.append([])
95
96 out = self.samplerv1.run(circuits, params, shots=shots).result()
97
98 results = []
99 for circuit, dist in zip(circuits, out.quasi_dists):
100 values: list[int] = []
101 for value, relative_frequency in dist.items():
102 values += [value] * int(round(relative_frequency * shots, 0))
103
104 required_bits = circuit.num_qubits
105 required_bytes = math.ceil(required_bits / 8)
106 byte_array = np.array([np.frombuffer(value.to_bytes(required_bytes), dtype=np.uint8) for value in values])
107
108 bit_array = BitArray(byte_array, num_bits=required_bits)
109
110 results.append(SamplerPubResult(data=DataBin(meas=bit_array), metadata={'shots': shots}))
111
112 return PrimitiveResult(results, metadata={'version': 2})
113
[docs]
114 def run(self, pubs: Iterable[SamplerPubLike], *, shots: int | None = None) -> BasePrimitiveJob[PrimitiveResult[SamplerPubResult], Any]:
115 job = PrimitiveJob(self._run, pubs, shots if shots is not None else 1024)
116 job._submit()
117 return job
118
119
[docs]
120class EstimatorV1ToEstimatorV2Adapter(BaseEstimatorV2):
121 def __init__(self, estimator: BaseEstimatorV1) -> None:
122 super().__init__()
123 self.estimator = estimator
124
125 def _construct_v2_result(self, estimator_result: EstimatorResult) -> PubResult:
126 var = np.array([meta.get('variance', 0) for meta in estimator_result.metadata])
127 shots = np.array([meta.get('shots', 1) for meta in estimator_result.metadata])
128
129 values = estimator_result.values
130 if len(values) == 1:
131 values = values.squeeze()
132 var = var.squeeze()
133 shots = shots.squeeze()
134 data_bin = DataBin(evs=values, stds=var / np.sqrt(shots), shape=values.shape if isinstance(values, np.ndarray) else tuple())
135 return PubResult(
136 data_bin,
137 metadata={
138 'shots': shots,
139 },
140 )
141
142 def _run(self, pubs: Iterable[EstimatorPub]) -> PrimitiveResult[PubResult]:
143 results = []
144 for pub in pubs:
145 observables = pub.observables
146 parameter_values = pub.parameter_values
147
148 param_shape = parameter_values.shape
149 param_indices = np.fromiter(np.ndindex(param_shape), dtype=object).reshape(param_shape)
150 broadcast_param_indices, broadcast_observables = np.broadcast_arrays(param_indices, observables)
151
152 params_final, final_observables = [], []
153 for index in np.ndindex(*broadcast_param_indices.shape):
154 param_index = broadcast_param_indices[index]
155 params_final.append(parameter_values[param_index].as_array())
156 final_observables.append(broadcast_observables[index])
157
158 res = self.estimator.run(
159 [pub.circuit] * len(params_final),
160 [SparsePauliOp.from_list(observable.items()) for observable in final_observables],
161 params_final,
162 ).result()
163 results.append(res)
164
165 return PrimitiveResult([self._construct_v2_result(result) for result in results], metadata={'version': 2})
166
[docs]
167 def run(self, pubs: Iterable[EstimatorPubLike], *, precision: float | None = None) -> BasePrimitiveJob[PrimitiveResult[PubResult], Any]:
168 coerced_pubs = [EstimatorPub.coerce(pub, precision) for pub in pubs]
169 job = PrimitiveJob(self._run, coerced_pubs)
170 job._submit()
171 return job
172
173
[docs]
174class TranslatingSamplerV1(BaseSamplerV1):
175 def __init__(self, sampler_v1: BaseSamplerV1, compatible_circuit: CIRCUIT_FORMATS, options: dict | None = None):
176 super().__init__(options=options)
177 self.sampler = sampler_v1
178 self.compatible_circuit = compatible_circuit
179
180 def _run(self, circuits: tuple[QuantumCircuit, ...], parameter_values: tuple[tuple[float, ...], ...], **run_options) -> PrimitiveJob:
181 bound_circuits = [
182 circuit.assign_parameters(params, strict=True) for circuit, params in zip(circuits, parameter_values, strict=True)
183 ]
184 pubs = [
185 GateCircuitBackend.get_translation(circuit, self.compatible_circuit)
186 if not isinstance(circuit, self.compatible_circuit)
187 else circuit
188 for circuit in bound_circuits
189 ]
190 return self.sampler.run(pubs, parameter_values=parameter_values, **run_options)
191
192
[docs]
193class TranslatingSampler(BaseSamplerV2):
194 def __init__(self, sampler_v2: BaseSamplerV2, compatible_circuit: CIRCUIT_FORMATS):
195 super().__init__()
196 self.sampler = sampler_v2
197 self.compatible_circuit = compatible_circuit
198
[docs]
199 def run(
200 self,
201 pubs: Iterable[SamplerPubLike | CIRCUIT_FORMATS],
202 *,
203 shots: int | None = None,
204 ) -> BasePrimitiveJob[PrimitiveResult[SamplerPubResult], Any]:
205 circuits = chain(*(coerce_to_circuit_list(pub, shots) for pub in pubs))
206 pubs = [
207 GateCircuitBackend.get_translation(circuit, self.compatible_circuit)
208 if not isinstance(circuit, self.compatible_circuit)
209 else circuit
210 for circuit in circuits
211 ]
212 return self.sampler.run(pubs, shots=shots)