Source code for qlauncher.workflow.pilotjob_scheduler

  1import os
  2import pickle
  3import sys
  4
  5from qlauncher.base.base import Algorithm, Backend, Problem, Result
  6from qlauncher.launcher.qlauncher import QLauncher
  7from qlauncher.exceptions import DependencyError
  8try:
  9    import dill
 10    from qcg.pilotjob.api.job import Jobs
 11    from qcg.pilotjob.api.manager import LocalManager, Manager
 12except ImportError as e:
 13    raise DependencyError(e, install_hint='pilotjob') from e
 14
 15
[docs] 16class JobManager: 17 def __init__(self, manager: Manager | None = None): 18 """ 19 Job manager is QLauncher's wrapper for process management system, current version works on top of qcg-pilotjob 20 21 Args: 22 manager (Manager | None, optional): Manager system to schedule jobs, if set to None, the pilotjob's LocalManager is set. 23 Defaults to None. 24 """ 25 self.jobs = {} 26 self.code_path = os.path.join(os.path.dirname(__file__), 'pilotjob_task.py') 27 if manager is None: 28 manager = LocalManager() 29 self.manager = manager 30 31 def _count_not_finished(self) -> int: 32 return len([job for job in self.jobs.values() if job.get('finished') is False]) 33
[docs] 34 def submit(self, problem: Problem, algorithm: Algorithm, backend: Backend, output_path: str, cores: int | None = None) -> str: 35 """ 36 Submits QLauncher job to the scheduler 37 38 Args: 39 problem (Problem): Problem. 40 algorithm (Algorithm): Algorithm. 41 backend (Backend): Backend. 42 output_path (str): Path of output file. 43 cores (int | None, optional): Number of cores per task, if None value set to number of free cores (at least 1). Defaults to None. 44 45 Returns: 46 str: Job Id. 47 """ 48 if cores is None: 49 cores = 1 50 job = self._prepare_ql_dill_job(problem=problem, algorithm=algorithm, backend=backend, 51 output=output_path, cores=cores) 52 job_id = self.manager.submit(Jobs().add(**job.get('qcg_args')))[0] 53 return job_id
54
[docs] 55 def submit_many(self, problem: Problem, algorithm: Algorithm, backend: Backend, output_path: str, cores_per_job: int = 1, n_jobs: int | None = None) -> list[str]: 56 """ 57 Submits as many jobs as there are currently available cores. 58 59 Args: 60 problem (Problem): Problem. 61 algorithm (Algorithm): Algorithm. 62 backend (Backend): Backend. 63 output_path (str): Path of output file. 64 cores_per_job (int, optional): Number of cores per job. Defaults to 1. 65 n_jobs: number of jobs to submit. If None, submit as many as possible (free_cores//cores_per_job). Defaults to None. 66 67 Returns: 68 list[str]: List with Job Id's. 69 """ 70 free_cores = self.manager.resources()['free_cores'] 71 if free_cores == 0: 72 return [] 73 qcg_jobs = Jobs() 74 for _ in range(n_jobs if n_jobs is not None else free_cores//cores_per_job): 75 job = self._prepare_ql_dill_job(problem=problem, algorithm=algorithm, backend=backend, output=output_path, cores=cores_per_job) 76 qcg_jobs.add(**job.get('qcg_args')) 77 return self.manager.submit(qcg_jobs)
78
[docs] 79 def wait_for_a_job(self, job_id: str | None = None, timeout: int | float | None = None) -> tuple[str, str]: 80 """ 81 Waits for a job to finish and returns it's id and status. 82 83 Args: 84 job_id (str | None, optional): Id of selected job, if None waiting for any job. Defaults to None. 85 timeout (int | float | None, optional): Timeout in seconds. Defaults to None. 86 87 Raises: 88 ValueError: Raises if job_id not found or there are no jobs left. 89 90 Returns: 91 tuple[str, str]: job_id, job's status 92 """ 93 if job_id is None: 94 if self._count_not_finished() <= 0: 95 raise ValueError("There are no jobs left") 96 job_id, state = self.manager.wait4_any_job_finish(timeout) 97 elif job_id in self.jobs: 98 state = self.manager.wait4(job_id, timeout=timeout)[job_id] 99 else: 100 raise ValueError(f"Job {job_id} not found in {self.__class__.__name__}'s jobs") 101 102 self.jobs[job_id]['finished'] = True 103 return job_id, state
104 105 def _prepare_ql_dill_job(self, problem: Problem, algorithm: Algorithm, backend: Backend, 106 output: str, cores: int = 1) -> dict: 107 job_uid = f'{len(self.jobs):05d}' 108 output_file = os.path.abspath(f'{output}output.{job_uid}') 109 launcher = QLauncher(problem, algorithm, backend) 110 input_file = os.path.abspath(f'{output}output.{job_uid}') 111 with open(input_file, 'wb') as f: 112 dill.dump(launcher, f) 113 114 in_args = [self.code_path, input_file, output_file] 115 qcg_args = { 116 'name': job_uid, 117 'exec': sys.executable, 118 'args': in_args, 119 'model': 'openmpi', 120 'stdout': f'{output}stdout.{job_uid}', 121 'stderr': f'{output}stderr.{job_uid}', 122 'numCores': cores 123 } 124 job = {'name': job_uid, 'qcg_args': qcg_args, 'output_file': output_file, 'finished': False} 125 self.jobs[job_uid] = job 126 return job 127
[docs] 128 def read_results(self, job_id: str) -> Result: 129 """ 130 Reads the result of given job_id. 131 132 Args: 133 job_id (str): Job Id. 134 135 Returns: 136 Result: Result of selected Job. 137 """ 138 output_path = self.jobs[job_id]['output_file'] 139 with open(output_path, 'rb') as rt: 140 results = pickle.load(rt) 141 return results
142
[docs] 143 def clean_up(self): 144 """ 145 Removes all output files generated in the process and calls self.manager.cleanup(). 146 """ 147 for job in self.jobs.values(): 148 os.remove(job['output_file']) 149 if isinstance(self.manager, LocalManager): 150 self.manager.cleanup()
151
[docs] 152 def stop(self): 153 """ 154 Stops the manager process. 155 """ 156 self.manager.cancel(self.jobs) 157 if isinstance(self.manager, LocalManager): 158 self.manager.finish()
159 160 def __del__(self): 161 self.stop()