Source code for quantum_launcher.routines.qiskit_routines.algorithms.educated_guess

  1""" Algorithms for Qiskit routines """
  2import math
  3import os
  4
  5import numpy as np
  6import scipy
  7
  8from quantum_launcher.base import Problem, Algorithm, Result
  9from quantum_launcher.routines.qiskit_routines.backends.ibm_backend import IBMBackend
 10from quantum_launcher.workflow.pilotjob_scheduler import JobManager
 11from quantum_launcher.routines.qiskit_routines.algorithms.qiskit_native import QAOA
 12
 13
[docs] 14class EducatedGuess(Algorithm): 15 _algorithm_format = 'hamiltonian' 16 17 def __init__(self, starting_p: int = 3, max_p: int = 8, verbose: bool = False): 18 """ 19 Algorithm utilizing all available cores to run multiple QAOA's in parallel to find optimal parameters. 20 21 Args: 22 starting_p (int, optional): Initial value of QAOA's p parameter. Defaults to 3. 23 max_p (int, optional): Maximum value for QAOA's p parameter. Defaults to 8. 24 verbose (bool, optional): Verbose. Defaults to False. 25 """ 26 self.output_initial = 'initial/' 27 self.output_interpolated = 'interpolated/' 28 self.output = 'output/' 29 self.p_init = starting_p 30 self.p_max = max_p 31 self.verbose = verbose 32 self.failed_jobs = 0 33 self.min_energy = math.inf 34 self.manager = JobManager() 35 self.best_job_id = '' 36
[docs] 37 def run(self, problem: Problem, backend: IBMBackend, formatter) -> Result: 38 self.manager.submit_many(problem, QAOA(p=self.p_init), backend, output_path=self.output_initial) 39 print(f'{len(self.manager.jobs)} jobs submitted to qcg') 40 41 found_optimal_params = False 42 43 while not found_optimal_params: 44 jobid, state = self.manager.wait_for_a_job() 45 46 if state != 'SUCCEED': 47 self.failed_jobs += 1 48 continue 49 has_potential, energy = self._process_job(jobid, self.p_init, self.min_energy, compare_factor=0.99) 50 if has_potential: 51 found_optimal_params = self._search_for_job_with_optimal_params(jobid, energy, problem, backend) 52 53 self.manager.submit_many(problem, QAOA(p=self.p_init), backend, output_path=self.output_initial) 54 55 result = self.manager.read_results(self.best_job_id) 56 self.manager.stop() 57 return result
58 59 def _search_for_job_with_optimal_params(self, previous_job_id, previous_energy, problem, backend) -> bool: 60 for p in range(self.p_init + 1, self.p_max + 1): 61 previous_job_results = self.manager.read_results(previous_job_id).result 62 initial_point = self._interpolate_f(list(previous_job_results['SamplingVQEResult'].optimal_point), p-1) 63 64 new_job_id = self.manager.submit(problem, QAOA(p=p, initial_point=initial_point), 65 backend, output_path=self.output_interpolated) 66 _, state = self.manager.wait_for_a_job(new_job_id) 67 if state != 'SUCCEED': 68 self.failed_jobs += 1 69 return False 70 has_potential, new_energy = self._process_job(new_job_id, p, previous_energy) 71 if has_potential: 72 previous_energy = new_energy 73 previous_job_id = new_job_id 74 else: 75 return False 76 self.best_job_id = new_job_id 77 return True 78 79 def _process_job(self, jobid: str, p: int, energy_to_compare: float, compare_factor: float = 1.0) -> tuple[ 80 float, bool]: 81 result = self.manager.read_results(jobid).result 82 optimal_point = result['SamplingVQEResult'].optimal_point 83 has_potential = False 84 linear = self._check_linearity(optimal_point, p) 85 energy = result['energy'] 86 if self.verbose: 87 print(f'job {jobid}, p={p}, energy: {energy}') 88 89 if p == self.p_init and energy < energy_to_compare: 90 print(f'new min energy: {energy}') 91 self.min_energy = energy 92 if linear and energy * compare_factor < energy_to_compare: 93 has_potential = True 94 return has_potential, energy 95 96 def _create_directories_if_not_existing(self): 97 if not os.path.exists(self.output_initial): 98 os.makedirs(self.output_initial) 99 if not os.path.exists(self.output_interpolated): 100 os.makedirs(self.output_interpolated) 101 if not os.path.exists(self.output): 102 os.makedirs(self.output) 103 104 def _interp(self, params: np.ndarray) -> np.ndarray: 105 arr1 = np.append([0], params) 106 arr2 = np.append(params, [0]) 107 weights = np.arange(len(arr1)) / len(params) 108 res = arr1 * weights + arr2 * weights[::-1] 109 return res 110 111 def _interpolate_f(self, params: np.ndarray, p: int) -> np.ndarray: 112 betas = params[:p] 113 gammas = params[p:] 114 new_betas = self._interp(betas) 115 new_gammas = self._interp(gammas) 116 return np.hstack([new_betas, new_gammas]) 117 118 def _check_linearity(self, optimal_params: np.ndarray, p: int) -> bool: 119 linear = False 120 correlations = (scipy.stats.pearsonr(np.arange(1, p + 1), optimal_params[:p])[0], 121 scipy.stats.pearsonr(np.arange(1, p + 1), optimal_params[p:])[0]) 122 123 if abs(correlations[0]) > 0.85 and abs(correlations[1]) > 0.85: 124 linear = True 125 return linear