1""" Algorithms for Qiskit routines """
2import math
3import os
4from collections.abc import Callable
5
6import weakref
7
8import numpy as np
9import scipy
10
11from qlauncher.base import Problem, Algorithm, Result
12from qlauncher.routines.qiskit.backends.qiskit_backend import QiskitBackend
13from qlauncher.workflow.pilotjob_scheduler import JobManager
14from qlauncher.routines.qiskit.algorithms.qiskit_native import QAOA
15
16
[docs]
17class EducatedGuess(Algorithm):
18 _algorithm_format = 'hamiltonian'
19
20 def __init__(self, starting_p: int = 3, max_p: int = 8, max_job_batch_size: int | None = None, verbose: bool = False):
21 """
22 Algorithm utilizing all available cores to run multiple QAOA's in parallel to find optimal parameters.
23
24 Args:
25 starting_p (int, optional): Initial value of QAOA's p parameter. Defaults to 3.
26 max_p (int, optional): Maximum value for QAOA's p parameter. Defaults to 8.
27 max_job_batch_size: Maximum number of jobs to run for a given p value. If None, run as many as possible. Defaults to None.
28 verbose (bool, optional): Verbose. Defaults to False.
29 """
30 self.output_initial = 'initial/'
31 self.output_interpolated = 'interpolated/'
32 self.output = 'output/'
33 self.p_init = starting_p
34 self.p_max = max_p
35 self.verbose = verbose
36 self.failed_jobs = 0
37 self.min_energy = math.inf
38 self.manager = JobManager()
39 self.best_job_id = ''
40 self.max_jobs = max_job_batch_size
41
42 weakref.finalize(self, self.manager.stop) # Kill the running jobs in case of a crash or otherwise
43
[docs]
44 def run(self, problem: Problem, backend: QiskitBackend, formatter: Callable) -> Result:
45 self.manager.submit_many(problem, QAOA(p=self.p_init), backend, output_path=self.output_initial, n_jobs=self.max_jobs)
46 print(f'{len(self.manager.jobs)} jobs submitted to qcg')
47
48 found_optimal_params = False
49
50 while not found_optimal_params:
51 jobid, state = self.manager.wait_for_a_job()
52
53 if state != 'SUCCEED':
54 self.failed_jobs += 1
55 continue
56 has_potential, energy = self._process_job(jobid, self.p_init, self.min_energy, compare_factor=0.99)
57 if has_potential:
58 found_optimal_params = self._search_for_job_with_optimal_params(jobid, energy, problem, backend)
59
60 self.manager.submit_many(problem, QAOA(p=self.p_init), backend, output_path=self.output_initial, n_jobs=self.max_jobs)
61
62 result = self.manager.read_results(self.best_job_id)
63 self.manager.stop()
64 return result
65
66 def _search_for_job_with_optimal_params(self, previous_job_id, previous_energy, problem, backend) -> bool:
67 new_job_id = None
68 for p in range(self.p_init + 1, self.p_max + 1):
69 previous_job_results = self.manager.read_results(previous_job_id).result
70 initial_point = self._interpolate_f(list(previous_job_results['SamplingVQEResult'].optimal_point), p-1)
71
72 new_job_id = self.manager.submit(problem, QAOA(p=p, initial_point=initial_point),
73 backend, output_path=self.output_interpolated)
74 _, state = self.manager.wait_for_a_job(new_job_id)
75 if state != 'SUCCEED':
76 self.failed_jobs += 1
77 return False
78 has_potential, new_energy = self._process_job(new_job_id, p, previous_energy)
79 if has_potential:
80 previous_energy = new_energy
81 previous_job_id = new_job_id
82 else:
83 return False
84 self.best_job_id = new_job_id if new_job_id is not None else previous_job_id
85 return True
86
87 def _process_job(self, jobid: str, p: int, energy_to_compare: float, compare_factor: float = 1.0) -> tuple[
88 float, bool]:
89 result = self.manager.read_results(jobid).result
90 optimal_point = result['SamplingVQEResult'].optimal_point
91 has_potential = False
92 linear = self._check_linearity(optimal_point, p)
93 energy = result['energy']
94 if self.verbose:
95 print(f'job {jobid}, p={p}, energy: {energy}')
96
97 if p == self.p_init and energy < energy_to_compare:
98 print(f'new min energy: {energy}')
99 self.min_energy = energy
100 if linear and energy * compare_factor < energy_to_compare:
101 has_potential = True
102 return has_potential, energy
103
104 def _create_directories_if_not_existing(self):
105 if not os.path.exists(self.output_initial):
106 os.makedirs(self.output_initial)
107 if not os.path.exists(self.output_interpolated):
108 os.makedirs(self.output_interpolated)
109 if not os.path.exists(self.output):
110 os.makedirs(self.output)
111
112 def _interp(self, params: np.ndarray) -> np.ndarray:
113 arr1 = np.append([0], params)
114 arr2 = np.append(params, [0])
115 weights = np.arange(len(arr1)) / len(params)
116 res = arr1 * weights + arr2 * weights[::-1]
117 return res
118
119 def _interpolate_f(self, params: np.ndarray, p: int) -> np.ndarray:
120 betas = params[:p]
121 gammas = params[p:]
122 new_betas = self._interp(betas)
123 new_gammas = self._interp(gammas)
124 return np.hstack([new_betas, new_gammas])
125
126 def _check_linearity(self, optimal_params: np.ndarray, p: int) -> bool:
127 linear = False
128 correlations = (scipy.stats.pearsonr(np.arange(1, p + 1), optimal_params[:p])[0],
129 scipy.stats.pearsonr(np.arange(1, p + 1), optimal_params[p:])[0])
130
131 if abs(correlations[0]) > 0.85 and abs(correlations[1]) > 0.85:
132 linear = True
133 return linear