Source code for qlauncher.workflow.base_job_manager

  1from abc import ABC, abstractmethod
  2from collections.abc import Callable
  3from typing import Any
  4
  5from qlauncher.base import Result
  6
  7
[docs] 8class BaseJobManager(ABC): 9 """ 10 Abstract base class for job managers that execute functions. 11 on different compute backends. 12 """ 13 14 def __init__(self): 15 self.jobs: dict[str, dict[str, Any]] = {} 16
[docs] 17 @abstractmethod 18 def submit( 19 self, 20 function: Callable, 21 **kwargs, 22 ) -> str: 23 """ 24 Submit a function job to the scheduler. 25 26 Args: 27 function: Function to be executed. 28 cores: Number of CPU cores per task. 29 **kwargs: Manager-specific additional arguments. 30 31 Returns: 32 Job ID as a string. 33 """ 34 pass
35
[docs] 36 @abstractmethod 37 def wait_for_a_job( 38 self, 39 job_id: str | None = None, 40 timeout: float | None = None, 41 ) -> str | None: 42 """ 43 Wait for a job to finish and return its ID. 44 45 Args: 46 job_id: ID of the job to wait for. If None, wait for any job. 47 timeout: Maximum time to wait in seconds. If None, wait indefinitely. 48 49 Returns: 50 Job ID of the finished job. 51 52 Raises: 53 ValueError: If no jobs are available to wait for. 54 TimeoutError: If timeout is exceeded. 55 """ 56 pass
57
[docs] 58 @abstractmethod 59 def read_results(self, job_id: str) -> Any: 60 """ 61 Read the result of a finished job. 62 63 Args: 64 job_id: Job ID returned by submit(). 65 66 Returns: 67 Result object produced by the job. 68 69 Raises: 70 KeyError: If job_id is not known to this manager. 71 FileNotFoundError: If the result file does not exist. 72 """ 73 pass
74
[docs] 75 @abstractmethod 76 def cancel(self, job_id: str): 77 """ 78 Cancel a given job 79 80 Args: 81 job_id (str): id of the job to cancel. 82 83 Raises: 84 KeyError: If job with a given id was not submitted by this manager. 85 86 Returns: 87 None 88 """ 89 pass
90
[docs] 91 @abstractmethod 92 def clean_up(self) -> None: 93 """ 94 Clean up temporary files and resources created by the manager. 95 """ 96 pass
97
[docs] 98 def run( 99 self, 100 function: Callable, 101 **kwargs, 102 ) -> Any: 103 """ 104 Convenience method: submit job, wait for completion, read results, and cleanup. 105 106 This method handles the complete lifecycle of a job execution. 107 108 Args: 109 function: Function to be executed. 110 cores: Number of CPU cores per task. 111 **kwargs: Manager-specific additional arguments. 112 113 Returns: 114 Result object produced by the job. 115 """ 116 try: 117 job_id = self.submit(function, **kwargs) 118 self.wait_for_a_job(job_id) 119 return self.read_results(job_id) 120 finally: 121 self.clean_up()
122 123 def _count_not_finished(self) -> int: 124 """Count how many jobs are not yet marked as finished.""" 125 return len([job for job in self.jobs.values() if not job.get('finished', False)]) 126 127 def _make_job_uid(self) -> str: 128 """Generate a unique job identifier.""" 129 return f'{len(self.jobs):05d}'
130 131 132if __name__ == '__main__': 133 from qlauncher import QLauncher 134 from qlauncher.problems import MaxCut 135 from qlauncher.routines.qiskit import QAOA, QiskitBackend 136 137 problem = MaxCut.from_preset('default') 138 algorithm = QAOA(p=3) 139 backend = QiskitBackend('local_simulator') 140 141 # SlurmJobManager 142 from qlauncher.workflow.slurm_job_manager import SlurmJobManager 143 144 slurm_mgr = SlurmJobManager( 145 slurm_options={ 146 'time': '00:30:00', 147 # "licenses": "orca1:1", 148 }, 149 env_setup=[ 150 # "module load Python/python-3.11.0", 151 # "source ~/venv/bin/activate", 152 ], 153 ) 154 slurm_result = slurm_mgr.run(QLauncher(problem, algorithm, backend).run, cores=1) 155 print(slurm_result) 156 157 # # PilotJobManager 158 # from qlauncher.workflow.pilotjob_scheduler import PilotJobManager 159 # 160 # out_dir = Path("./pilotjob_out") 161 # 162 # pilot_mgr = PilotJobManager() 163 # pilot_result = pilot_mgr.run( 164 # problem, 165 # algorithm, 166 # backend, 167 # cores=1, 168 # output_path=out_dir, 169 # ) 170 # print(pilot_result)