1"""Algorithms for Qiskit routines"""
2
3import math
4import os
5import weakref
6
7import numpy as np
8import scipy
9
10from qlauncher.base import Algorithm, Result
11from qlauncher.base.models import Hamiltonian
12from qlauncher.routines.qiskit.algorithms.qiskit_native import QAOA
13from qlauncher.routines.qiskit.backends.qiskit_backend import QiskitBackend
14from qlauncher.workflow.pilotjob_scheduler import PilotJobManager
15
16
[docs]
17class EducatedGuess(Algorithm[Hamiltonian, QiskitBackend]):
18 def __init__(self, starting_p: int = 3, max_p: int = 8, max_job_batch_size: int | None = None, verbose: bool = False):
19 """
20 Algorithm utilizing all available cores to run multiple QAOA's in parallel to find optimal parameters.
21
22 Args:
23 starting_p (int, optional): Initial value of QAOA's p parameter. Defaults to 3.
24 max_p (int, optional): Maximum value for QAOA's p parameter. Defaults to 8.
25 max_job_batch_size: Maximum number of jobs to run for a given p value. If None, run as many as possible. Defaults to None.
26 verbose (bool, optional): Verbose. Defaults to False.
27 """
28 self.output_initial = 'initial/'
29 self.output_interpolated = 'interpolated/'
30 self.output = 'output/'
31 self.p_init = starting_p
32 self.p_max = max_p
33 self.verbose = verbose
34 self.failed_jobs = 0
35 self.min_energy = math.inf
36 self.manager = PilotJobManager(self.output)
37 self.best_job_id = ''
38 self.max_jobs = max_job_batch_size
39
40 weakref.finalize(self, self.manager.stop) # Kill the running jobs in case of a crash or otherwise
41
[docs]
42 def run(self, problem: Hamiltonian, backend: QiskitBackend) -> Result:
43 from qlauncher.launcher.qlauncher import QLauncher
44
45 self.manager.submit_many(QLauncher(problem, QAOA(p=self.p_init), backend).run, n_jobs=self.max_jobs)
46 print(f'{len(self.manager.jobs)} jobs submitted to qcg')
47
48 found_optimal_params = False
49
50 while not found_optimal_params:
51 jobid, state = self.manager.wait_for_a_job()
52
53 if state != 'SUCCEED':
54 self.failed_jobs += 1
55 continue
56 has_potential, energy = self._process_job(jobid, self.p_init, self.min_energy, compare_factor=0.99)
57 if has_potential:
58 found_optimal_params = self._search_for_job_with_optimal_params(jobid, energy, problem, backend)
59
60 self.manager.submit_many(QLauncher(problem, QAOA(p=self.p_init), backend).run, n_jobs=self.max_jobs)
61
62 result = self.manager.read_results(self.best_job_id)
63 self.manager.stop()
64 return result
65
66 def _search_for_job_with_optimal_params(self, previous_job_id, previous_energy, problem, backend) -> bool:
67 from qlauncher.launcher.qlauncher import QLauncher
68
69 new_job_id = None
70 for p in range(self.p_init + 1, self.p_max + 1):
71 previous_job_results = self.manager.read_results(previous_job_id).result
72 initial_point = self._interpolate_f(list(previous_job_results['result']['optimal_point']), p - 1)
73
74 new_job_id = self.manager.submit(
75 QLauncher(problem, QAOA(p=p, initial_point=initial_point), backend).run, output_path=self.output_interpolated
76 )
77 _, state = self.manager.wait_for_a_job(new_job_id)
78 if state != 'SUCCEED':
79 self.failed_jobs += 1
80 return False
81 has_potential, new_energy = self._process_job(new_job_id, p, previous_energy)
82 if has_potential:
83 previous_energy = new_energy
84 previous_job_id = new_job_id
85 else:
86 return False
87 self.best_job_id = new_job_id if new_job_id is not None else previous_job_id
88 return True
89
90 def _process_job(self, jobid: str, p: int, energy_to_compare: float, compare_factor: float = 1.0) -> tuple[float, bool]:
91 result = self.manager.read_results(jobid).result
92 optimal_point = result['optimal_point']
93 has_potential = False
94 linear = self._check_linearity(optimal_point, p)
95 energy = result['energy']
96 if self.verbose:
97 print(f'job {jobid}, p={p}, energy: {energy}')
98
99 if p == self.p_init and energy < energy_to_compare:
100 print(f'new min energy: {energy}')
101 self.min_energy = energy
102 if linear and energy * compare_factor < energy_to_compare:
103 has_potential = True
104 return has_potential, energy
105
106 def _create_directories_if_not_existing(self) -> None:
107 if not os.path.exists(self.output_initial):
108 os.makedirs(self.output_initial)
109 if not os.path.exists(self.output_interpolated):
110 os.makedirs(self.output_interpolated)
111 if not os.path.exists(self.output):
112 os.makedirs(self.output)
113
114 def _interp(self, params: np.ndarray) -> np.ndarray:
115 arr1 = np.append([0], params)
116 arr2 = np.append(params, [0])
117 weights = np.arange(len(arr1)) / len(params)
118 return arr1 * weights + arr2 * weights[::-1]
119
120 def _interpolate_f(self, params: np.ndarray, p: int) -> np.ndarray:
121 betas = params[:p]
122 gammas = params[p:]
123 new_betas = self._interp(betas)
124 new_gammas = self._interp(gammas)
125 return np.hstack([new_betas, new_gammas])
126
127 def _check_linearity(self, optimal_params: np.ndarray, p: int) -> bool:
128 linear = False
129 correlations = (
130 scipy.stats.pearsonr(np.arange(1, p + 1), optimal_params[:p])[0],
131 scipy.stats.pearsonr(np.arange(1, p + 1), optimal_params[p:])[0],
132 )
133
134 if abs(correlations[0]) > 0.85 and abs(correlations[1]) > 0.85:
135 linear = True
136 return linear