Source code for quantum_launcher.workflow.pilotjob_scheduler

  1import os
  2import pickle
  3import sys
  4from typing import List, Optional
  5
  6from quantum_launcher.base.base import Algorithm, Backend, Problem, Result
  7from quantum_launcher.launcher.qlauncher import QuantumLauncher
  8from quantum_launcher.import_management import DependencyError
  9try:
 10    import dill
 11    from qcg.pilotjob.api.job import Jobs
 12    from qcg.pilotjob.api.manager import LocalManager, Manager
 13except ImportError as e:
 14    raise DependencyError(e, install_hint='pilotjob') from e
 15
 16
[docs] 17class JobManager: 18 def __init__(self, manager: Optional[Manager] = None): 19 """ 20 Job manager is Quantum Launcher's wrapper for process management system, current version works on top of qcg-pilotjob 21 22 Args: 23 manager (Optional[Manager], optional): Manager system to schedule jobs, if set to None, the pilotjob's LocalManager is set. 24 Defaults to None. 25 """ 26 self.jobs = {} 27 self.code_path = os.path.join(os.path.dirname(__file__), 'pilotjob_task.py') 28 if manager is None: 29 manager = LocalManager() 30 self.manager = manager 31 32 def _count_not_finished(self) -> int: 33 return len([job for job in self.jobs.values() if job.get('finished') is False]) 34
[docs] 35 def submit(self, problem: Problem, algorithm: Algorithm, backend: Backend, output_path: str, cores: Optional[int] = None) -> str: 36 """ 37 Submits Quantum Launcher job to the scheduler 38 39 Args: 40 problem (Problem): Problem. 41 algorithm (Algorithm): Algorithm. 42 backend (Backend): Backend. 43 output_path (str): Path of output file. 44 cores (Optional[int], optional): Number of cores per task, if None value set to number of free cores (at least 1). Defaults to None. 45 46 Returns: 47 str: Job Id. 48 """ 49 if cores is None: 50 cores = max(1, self.manager.resources()['free_cores']) 51 job = self._prepare_ql_dill_job(problem=problem, algorithm=algorithm, backend=backend, 52 output=output_path, cores=cores) 53 job_id = self.manager.submit(Jobs().add(**job.get('qcg_args')))[0] 54 return job_id
55
[docs] 56 def submit_many(self, problem: Problem, algorithm: Algorithm, backend: Backend, output_path: str, cores_per_job: int = 1) -> List[str]: 57 """ 58 Submits as many jobs as there are currently available cores. 59 60 Args: 61 problem (Problem): Problem. 62 algorithm (Algorithm): Algorithm. 63 backend (Backend): Backend. 64 output_path (str): Path of output file. 65 cores_per_job (int, optional): Number of cores per job. Defaults to 1. 66 67 Returns: 68 List[str]: List with Job Id's. 69 """ 70 free_cores = self.manager.resources()['free_cores'] 71 if free_cores == 0: 72 return [] 73 qcg_jobs = Jobs() 74 for _ in range(free_cores//cores_per_job): 75 job = self._prepare_ql_dill_job(problem=problem, algorithm=algorithm, backend=backend, output=output_path, cores=cores_per_job) 76 qcg_jobs.add(**job.get('qcg_args')) 77 return self.manager.submit(qcg_jobs)
78
[docs] 79 def wait_for_a_job(self, job_id: Optional[str] = None, timeout: Optional[int | float] = None) -> tuple[str, str]: 80 """ 81 Waits for a job to finish and returns it's id and status. 82 83 Args: 84 job_id (Optional[str], optional): Id of selected job, if None waiting for any job. Defaults to None. 85 timeout (Optional[int | float], optional): Timeout in seconds. Defaults to None. 86 87 Raises: 88 ValueError: Raises if job_id not found or there are no jobs left. 89 90 Returns: 91 tuple[str, str]: job_id, job's status 92 """ 93 if job_id is None: 94 if self._count_not_finished() <= 0: 95 raise ValueError("There are no jobs left") 96 job_id, state = self.manager.wait4_any_job_finish(timeout) 97 elif job_id in self.jobs: 98 state = self.manager.wait4(job_id, timeout=timeout)[job_id] 99 else: 100 raise ValueError(f"Job {job_id} not found in {self.__class__.__name__}'s jobs") 101 102 self.jobs[job_id]['finished'] = True 103 return job_id, state
104 105 def _prepare_ql_dill_job(self, problem: Problem, algorithm: Algorithm, backend: Backend, 106 output: str, cores: int = 1) -> dict: 107 job_uid = f'{len(self.jobs):05d}' 108 output_file = os.path.abspath(f'{output}output.{job_uid}') 109 launcher = QuantumLauncher(problem, algorithm, backend) 110 input_file = os.path.abspath(f'{output}output.{job_uid}') 111 with open(input_file, 'wb') as f: 112 dill.dump(launcher, f) 113 114 in_args = [self.code_path, input_file, output_file] 115 qcg_args = { 116 'name': job_uid, 117 'exec': sys.executable, 118 'args': in_args, 119 'model': 'openmpi', 120 'stdout': f'{output}stdout.{job_uid}', 121 'stderr': f'{output}stderr.{job_uid}', 122 'numCores': cores 123 } 124 job = {'name': job_uid, 'qcg_args': qcg_args, 'output_file': output_file, 'finished': False} 125 self.jobs[job_uid] = job 126 return job 127
[docs] 128 def read_results(self, job_id: str) -> Result: 129 """ 130 Reads the result of given job_id. 131 132 Args: 133 job_id (str): Job Id. 134 135 Returns: 136 Result: Result of selected Job. 137 """ 138 output_path = self.jobs[job_id]['output_file'] 139 with open(output_path, 'rb') as rt: 140 results = pickle.load(rt) 141 return results
142
[docs] 143 def clean_up(self): 144 """ 145 Removes all output files generated in the process and calls self.manager.cleanup(). 146 """ 147 for job in self.jobs.values(): 148 os.remove(job['output_file']) 149 if isinstance(self.manager, LocalManager): 150 self.manager.cleanup()
151
[docs] 152 def stop(self): 153 """ 154 Stops the manager process. 155 """ 156 self.manager.cancel(self.jobs) 157 self.manager.finish()
158 159 def __del__(self): 160 self.stop()