Source code for qlauncher.routines.qiskit.algorithms.educated_guess

  1"""Algorithms for Qiskit routines"""
  2
  3import math
  4import os
  5import weakref
  6
  7import numpy as np
  8import scipy
  9
 10from qlauncher.base import Algorithm, Result
 11from qlauncher.base.models import Hamiltonian
 12from qlauncher.routines.qiskit.algorithms.qiskit_native import QAOA
 13from qlauncher.routines.qiskit.backends.qiskit_backend import QiskitBackend
 14from qlauncher.workflow.pilotjob_scheduler import PilotJobManager
 15
 16
[docs] 17class EducatedGuess(Algorithm[Hamiltonian, QiskitBackend]): 18 def __init__(self, starting_p: int = 3, max_p: int = 8, max_job_batch_size: int | None = None, verbose: bool = False): 19 """ 20 Algorithm utilizing all available cores to run multiple QAOA's in parallel to find optimal parameters. 21 22 Args: 23 starting_p (int, optional): Initial value of QAOA's p parameter. Defaults to 3. 24 max_p (int, optional): Maximum value for QAOA's p parameter. Defaults to 8. 25 max_job_batch_size: Maximum number of jobs to run for a given p value. If None, run as many as possible. Defaults to None. 26 verbose (bool, optional): Verbose. Defaults to False. 27 """ 28 self.output_initial = 'initial/' 29 self.output_interpolated = 'interpolated/' 30 self.output = 'output/' 31 self.p_init = starting_p 32 self.p_max = max_p 33 self.verbose = verbose 34 self.failed_jobs = 0 35 self.min_energy = math.inf 36 self.manager = PilotJobManager(self.output) 37 self.best_job_id = '' 38 self.max_jobs = max_job_batch_size 39 40 weakref.finalize(self, self.manager.stop) # Kill the running jobs in case of a crash or otherwise 41
[docs] 42 def run(self, problem: Hamiltonian, backend: QiskitBackend) -> Result: 43 from qlauncher.launcher.qlauncher import QLauncher 44 45 self.manager.submit_many(QLauncher(problem, QAOA(p=self.p_init), backend).run, n_jobs=self.max_jobs) 46 print(f'{len(self.manager.jobs)} jobs submitted to qcg') 47 48 found_optimal_params = False 49 50 while not found_optimal_params: 51 jobid, state = self.manager.wait_for_a_job() 52 53 if state != 'SUCCEED': 54 self.failed_jobs += 1 55 continue 56 has_potential, energy = self._process_job(jobid, self.p_init, self.min_energy, compare_factor=0.99) 57 if has_potential: 58 found_optimal_params = self._search_for_job_with_optimal_params(jobid, energy, problem, backend) 59 60 self.manager.submit_many(QLauncher(problem, QAOA(p=self.p_init), backend).run, n_jobs=self.max_jobs) 61 62 result = self.manager.read_results(self.best_job_id) 63 self.manager.stop() 64 return result
65 66 def _search_for_job_with_optimal_params(self, previous_job_id, previous_energy, problem, backend) -> bool: 67 from qlauncher.launcher.qlauncher import QLauncher 68 69 new_job_id = None 70 for p in range(self.p_init + 1, self.p_max + 1): 71 previous_job_results = self.manager.read_results(previous_job_id).result 72 initial_point = self._interpolate_f(list(previous_job_results['result']['optimal_point']), p - 1) 73 74 new_job_id = self.manager.submit( 75 QLauncher(problem, QAOA(p=p, initial_point=initial_point), backend).run, output_path=self.output_interpolated 76 ) 77 _, state = self.manager.wait_for_a_job(new_job_id) 78 if state != 'SUCCEED': 79 self.failed_jobs += 1 80 return False 81 has_potential, new_energy = self._process_job(new_job_id, p, previous_energy) 82 if has_potential: 83 previous_energy = new_energy 84 previous_job_id = new_job_id 85 else: 86 return False 87 self.best_job_id = new_job_id if new_job_id is not None else previous_job_id 88 return True 89 90 def _process_job(self, jobid: str, p: int, energy_to_compare: float, compare_factor: float = 1.0) -> tuple[float, bool]: 91 result = self.manager.read_results(jobid).result 92 optimal_point = result['optimal_point'] 93 has_potential = False 94 linear = self._check_linearity(optimal_point, p) 95 energy = result['energy'] 96 if self.verbose: 97 print(f'job {jobid}, p={p}, energy: {energy}') 98 99 if p == self.p_init and energy < energy_to_compare: 100 print(f'new min energy: {energy}') 101 self.min_energy = energy 102 if linear and energy * compare_factor < energy_to_compare: 103 has_potential = True 104 return has_potential, energy 105 106 def _create_directories_if_not_existing(self) -> None: 107 if not os.path.exists(self.output_initial): 108 os.makedirs(self.output_initial) 109 if not os.path.exists(self.output_interpolated): 110 os.makedirs(self.output_interpolated) 111 if not os.path.exists(self.output): 112 os.makedirs(self.output) 113 114 def _interp(self, params: np.ndarray) -> np.ndarray: 115 arr1 = np.append([0], params) 116 arr2 = np.append(params, [0]) 117 weights = np.arange(len(arr1)) / len(params) 118 return arr1 * weights + arr2 * weights[::-1] 119 120 def _interpolate_f(self, params: np.ndarray, p: int) -> np.ndarray: 121 betas = params[:p] 122 gammas = params[p:] 123 new_betas = self._interp(betas) 124 new_gammas = self._interp(gammas) 125 return np.hstack([new_betas, new_gammas]) 126 127 def _check_linearity(self, optimal_params: np.ndarray, p: int) -> bool: 128 linear = False 129 correlations = ( 130 scipy.stats.pearsonr(np.arange(1, p + 1), optimal_params[:p])[0], 131 scipy.stats.pearsonr(np.arange(1, p + 1), optimal_params[p:])[0], 132 ) 133 134 if abs(correlations[0]) > 0.85 and abs(correlations[1]) > 0.85: 135 linear = True 136 return linear