1""" Algorithms for Qiskit routines """
2import math
3import os
4
5import numpy as np
6import scipy
7
8from quantum_launcher.base import Problem, Algorithm, Result
9from quantum_launcher.routines.qiskit_routines.backends.ibm_backend import IBMBackend
10from quantum_launcher.workflow.pilotjob_scheduler import JobManager
11from quantum_launcher.routines.qiskit_routines.algorithms.qiskit_native import QAOA
12
13
[docs]
14class EducatedGuess(Algorithm):
15 _algorithm_format = 'hamiltonian'
16
17 def __init__(self, starting_p: int = 3, max_p: int = 8, verbose: bool = False):
18 """
19 Algorithm utilizing all available cores to run multiple QAOA's in parallel to find optimal parameters.
20
21 Args:
22 starting_p (int, optional): Initial value of QAOA's p parameter. Defaults to 3.
23 max_p (int, optional): Maximum value for QAOA's p parameter. Defaults to 8.
24 verbose (bool, optional): Verbose. Defaults to False.
25 """
26 self.output_initial = 'initial/'
27 self.output_interpolated = 'interpolated/'
28 self.output = 'output/'
29 self.p_init = starting_p
30 self.p_max = max_p
31 self.verbose = verbose
32 self.failed_jobs = 0
33 self.min_energy = math.inf
34 self.manager = JobManager()
35 self.best_job_id = ''
36
[docs]
37 def run(self, problem: Problem, backend: IBMBackend, formatter) -> Result:
38 self.manager.submit_many(problem, QAOA(p=self.p_init), backend, output_path=self.output_initial)
39 print(f'{len(self.manager.jobs)} jobs submitted to qcg')
40
41 found_optimal_params = False
42
43 while not found_optimal_params:
44 jobid, state = self.manager.wait_for_a_job()
45
46 if state != 'SUCCEED':
47 self.failed_jobs += 1
48 continue
49 has_potential, energy = self._process_job(jobid, self.p_init, self.min_energy, compare_factor=0.99)
50 if has_potential:
51 found_optimal_params = self._search_for_job_with_optimal_params(jobid, energy, problem, backend)
52
53 self.manager.submit_many(problem, QAOA(p=self.p_init), backend, output_path=self.output_initial)
54
55 result = self.manager.read_results(self.best_job_id)
56 self.manager.stop()
57 return result
58
59 def _search_for_job_with_optimal_params(self, previous_job_id, previous_energy, problem, backend) -> bool:
60 for p in range(self.p_init + 1, self.p_max + 1):
61 previous_job_results = self.manager.read_results(previous_job_id).result
62 initial_point = self._interpolate_f(list(previous_job_results['SamplingVQEResult'].optimal_point), p-1)
63
64 new_job_id = self.manager.submit(problem, QAOA(p=p, initial_point=initial_point),
65 backend, output_path=self.output_interpolated)
66 _, state = self.manager.wait_for_a_job(new_job_id)
67 if state != 'SUCCEED':
68 self.failed_jobs += 1
69 return False
70 has_potential, new_energy = self._process_job(new_job_id, p, previous_energy)
71 if has_potential:
72 previous_energy = new_energy
73 previous_job_id = new_job_id
74 else:
75 return False
76 self.best_job_id = new_job_id
77 return True
78
79 def _process_job(self, jobid: str, p: int, energy_to_compare: float, compare_factor: float = 1.0) -> tuple[
80 float, bool]:
81 result = self.manager.read_results(jobid).result
82 optimal_point = result['SamplingVQEResult'].optimal_point
83 has_potential = False
84 linear = self._check_linearity(optimal_point, p)
85 energy = result['energy']
86 if self.verbose:
87 print(f'job {jobid}, p={p}, energy: {energy}')
88
89 if p == self.p_init and energy < energy_to_compare:
90 print(f'new min energy: {energy}')
91 self.min_energy = energy
92 if linear and energy * compare_factor < energy_to_compare:
93 has_potential = True
94 return has_potential, energy
95
96 def _create_directories_if_not_existing(self):
97 if not os.path.exists(self.output_initial):
98 os.makedirs(self.output_initial)
99 if not os.path.exists(self.output_interpolated):
100 os.makedirs(self.output_interpolated)
101 if not os.path.exists(self.output):
102 os.makedirs(self.output)
103
104 def _interp(self, params: np.ndarray) -> np.ndarray:
105 arr1 = np.append([0], params)
106 arr2 = np.append(params, [0])
107 weights = np.arange(len(arr1)) / len(params)
108 res = arr1 * weights + arr2 * weights[::-1]
109 return res
110
111 def _interpolate_f(self, params: np.ndarray, p: int) -> np.ndarray:
112 betas = params[:p]
113 gammas = params[p:]
114 new_betas = self._interp(betas)
115 new_gammas = self._interp(gammas)
116 return np.hstack([new_betas, new_gammas])
117
118 def _check_linearity(self, optimal_params: np.ndarray, p: int) -> bool:
119 linear = False
120 correlations = (scipy.stats.pearsonr(np.arange(1, p + 1), optimal_params[:p])[0],
121 scipy.stats.pearsonr(np.arange(1, p + 1), optimal_params[p:])[0])
122
123 if abs(correlations[0]) > 0.85 and abs(correlations[1]) > 0.85:
124 linear = True
125 return linear