Source code for qlauncher.workflow.pilotjob_scheduler

  1import contextlib
  2import os
  3import pickle
  4import sys
  5from collections.abc import Callable
  6from pathlib import Path
  7from typing import Any
  8
  9from qlauncher.exceptions import DependencyError
 10from qlauncher.workflow.base_job_manager import BaseJobManager
 11
 12try:
 13    import dill
 14    from qcg.pilotjob.api.job import Jobs
 15    from qcg.pilotjob.api.manager import LocalManager, Manager
 16    from qcg.pilotjob.api.errors import TimeoutElapsed
 17except ImportError as e:
 18    raise DependencyError(e, install_hint='pilotjob') from e
 19
 20
[docs] 21class PilotJobManager(BaseJobManager): 22 def __init__( 23 self, 24 output_path: str, 25 manager: Manager | None = None, 26 ): 27 """ 28 PilotJob manager is QLauncher's wrapper for process management system, current version works on top of qcg-pilotjob 29 30 Args: 31 manager (Manager | None, optional): Manager system to schedule jobs, if set to None, the pilotjob's LocalManager is set. 32 Defaults to None. 33 """ 34 super().__init__() 35 self.code_path = os.path.join(os.path.dirname(__file__), 'subprocess_fn.py') 36 self.manager = manager if manager is not None else LocalManager() 37 self.output_path = output_path 38
[docs] 39 def submit( 40 self, 41 function: Callable, 42 cores: int = 1, 43 **kwargs, 44 ) -> str: 45 """ 46 Submit a function job to the scheduler. 47 48 Args: 49 function: Function to be executed. 50 cores: Number of CPU cores per task. 51 **kwargs: Manager-specific additional arguments. 52 53 Returns: 54 Job ID as a string. 55 """ 56 57 job = self._prepare_ql_dill_job( 58 function, 59 output=self.output_path, 60 cores=cores, 61 ) 62 return self.manager.submit(Jobs().add(**job['qcg_args']))[0]
63
[docs] 64 def submit_many( 65 self, 66 function: Callable, 67 cores_per_job: int = 1, 68 n_jobs: int | None = None, 69 ) -> list[str]: 70 """ 71 Submits as many jobs as there are currently available cores. 72 73 Args: 74 function (Callable): Function to execute. 75 output_path (str): Path of output file. 76 cores_per_job (int, optional): Number of cores per job. Defaults to 1. 77 n_jobs: number of jobs to submit. If None, submit as many as possible (free_cores//cores_per_job). Defaults to None. 78 79 Returns: 80 list[str]: List with Job Id's. 81 """ 82 free_cores = self.manager.resources()['free_cores'] 83 if free_cores == 0: 84 return [] 85 86 qcg_jobs = Jobs() 87 num_jobs = n_jobs if n_jobs is not None else free_cores // cores_per_job 88 89 for _ in range(num_jobs): 90 job = self._prepare_ql_dill_job( 91 function, 92 output=self.output_path, 93 cores=cores_per_job, 94 ) 95 qcg_jobs.add(**job['qcg_args']) 96 97 return self.manager.submit(qcg_jobs)
98
[docs] 99 def wait_for_a_job( 100 self, 101 job_id: str | None = None, 102 timeout: float | None = None, 103 ) -> tuple[str, str]: 104 """ 105 Waits for a job to finish and returns it's id and status. 106 107 Args: 108 job_id (str | None, optional): Id of selected job, if None waiting for any job. Defaults to None. 109 timeout (int | float | None, optional): Timeout in seconds. Defaults to None. 110 111 Raises: 112 ValueError: Raises if job_id not found or there are no jobs left. 113 114 Returns: 115 tuple[str, str]: job_id, job's status 116 """ 117 if job_id is None: 118 if self._count_not_finished() <= 0: 119 raise ValueError('There are no jobs left') 120 job_id, state = self.manager.wait4_any_job_finish(timeout) 121 elif job_id in self.jobs: 122 try: 123 state = self.manager.wait4(job_id, timeout=timeout)[job_id] 124 except TimeoutElapsed as e: 125 raise TimeoutError from e 126 else: 127 raise ValueError(f"Job {job_id} not found in {self.__class__.__name__}'s jobs") 128 129 self.jobs[job_id]['finished'] = True 130 return job_id, state
131 132 def _prepare_ql_dill_job( 133 self, 134 function: Callable, 135 output: str, 136 cores: int = 1, 137 ) -> dict: 138 job_uid = self._make_job_uid() 139 140 out_dir = Path(output).expanduser().resolve() 141 out_dir.mkdir(parents=True, exist_ok=True) 142 143 output_file = out_dir / f'output.{job_uid}.pkl' 144 input_file = out_dir / f'input.{job_uid}.pkl' 145 stdout_file = out_dir / f'stdout.{job_uid}' 146 stderr_file = out_dir / f'stderr.{job_uid}' 147 148 with open(input_file, 'wb') as f: 149 dill.dump(function, f) 150 151 in_args = [self.code_path, str(input_file), str(output_file)] 152 qcg_args = { 153 'name': job_uid, 154 'exec': sys.executable, 155 'args': in_args, 156 'model': 'openmpi', 157 'stdout': str(stdout_file), 158 'stderr': str(stderr_file), 159 'numCores': cores, 160 } 161 162 job = { 163 'name': job_uid, 164 'qcg_args': qcg_args, 165 'output_file': str(output_file), 166 'finished': False, 167 } 168 self.jobs[job_uid] = job 169 return job 170
[docs] 171 def read_results(self, job_id) -> Any: 172 """ 173 Reads the result of given job_id. 174 175 Args: 176 job_id (str): Job Id. 177 178 Returns: 179 Result: Result of selected Job. 180 """ 181 if job_id not in self.jobs: 182 raise KeyError(f'Job {job_id} not found') 183 184 output_path = self.jobs[job_id]['output_file'] 185 with open(output_path, 'rb') as rt: 186 return pickle.load(rt)
187
[docs] 188 def cancel(self, job_id: str) -> None: 189 """ 190 Cancel a given job 191 192 Args: 193 job_id (str): id of the job to cancel. 194 195 Raises: 196 KeyError: If job with a given id was not submitted by this manager. 197 198 Returns: 199 None 200 """ 201 if job_id not in self.jobs: 202 raise KeyError(f'Job {job_id} not found') 203 return self.manager.cancel(job_id)
204
[docs] 205 def clean_up(self) -> None: 206 """ 207 Removes all output files generated in the process and calls self.manager.cleanup(). 208 """ 209 for job in self.jobs.values(): 210 if os.path.exists(job['output_file']): 211 os.remove(job['output_file']) 212 213 if isinstance(self.manager, LocalManager): 214 with contextlib.suppress(Exception): 215 self.manager.cleanup()
216
[docs] 217 def stop(self) -> None: 218 """ 219 Stops the manager process. 220 """ 221 mgr = getattr(self, 'manager', None) 222 if mgr is None: 223 return 224 225 if isinstance(mgr, LocalManager): 226 with contextlib.suppress(Exception): 227 mgr.finish()
228
[docs] 229 def run(self, function: Callable[..., Any], cores: int = 1, **kwargs) -> Any: 230 return super().run(function, cores=cores, **kwargs)
231 232 def __del__(self): 233 with contextlib.suppress(Exception): 234 self.stop()