1import os
2import pickle
3import sys
4
5from qlauncher.base.base import Algorithm, Backend, Problem, Result
6from qlauncher.launcher.qlauncher import QLauncher
7from qlauncher.exceptions import DependencyError
8try:
9 import dill
10 from qcg.pilotjob.api.job import Jobs
11 from qcg.pilotjob.api.manager import LocalManager, Manager
12except ImportError as e:
13 raise DependencyError(e, install_hint='pilotjob') from e
14
15
[docs]
16class JobManager:
17 def __init__(self, manager: Manager | None = None):
18 """
19 Job manager is QLauncher's wrapper for process management system, current version works on top of qcg-pilotjob
20
21 Args:
22 manager (Manager | None, optional): Manager system to schedule jobs, if set to None, the pilotjob's LocalManager is set.
23 Defaults to None.
24 """
25 self.jobs = {}
26 self.code_path = os.path.join(os.path.dirname(__file__), 'pilotjob_task.py')
27 if manager is None:
28 manager = LocalManager()
29 self.manager = manager
30
31 def _count_not_finished(self) -> int:
32 return len([job for job in self.jobs.values() if job.get('finished') is False])
33
[docs]
34 def submit(self, problem: Problem, algorithm: Algorithm, backend: Backend, output_path: str, cores: int | None = None) -> str:
35 """
36 Submits QLauncher job to the scheduler
37
38 Args:
39 problem (Problem): Problem.
40 algorithm (Algorithm): Algorithm.
41 backend (Backend): Backend.
42 output_path (str): Path of output file.
43 cores (int | None, optional): Number of cores per task, if None value set to number of free cores (at least 1). Defaults to None.
44
45 Returns:
46 str: Job Id.
47 """
48 if cores is None:
49 cores = 1
50 job = self._prepare_ql_dill_job(problem=problem, algorithm=algorithm, backend=backend,
51 output=output_path, cores=cores)
52 job_id = self.manager.submit(Jobs().add(**job.get('qcg_args')))[0]
53 return job_id
54
[docs]
55 def submit_many(self, problem: Problem, algorithm: Algorithm, backend: Backend, output_path: str, cores_per_job: int = 1, n_jobs: int | None = None) -> list[str]:
56 """
57 Submits as many jobs as there are currently available cores.
58
59 Args:
60 problem (Problem): Problem.
61 algorithm (Algorithm): Algorithm.
62 backend (Backend): Backend.
63 output_path (str): Path of output file.
64 cores_per_job (int, optional): Number of cores per job. Defaults to 1.
65 n_jobs: number of jobs to submit. If None, submit as many as possible (free_cores//cores_per_job). Defaults to None.
66
67 Returns:
68 list[str]: List with Job Id's.
69 """
70 free_cores = self.manager.resources()['free_cores']
71 if free_cores == 0:
72 return []
73 qcg_jobs = Jobs()
74 for _ in range(n_jobs if n_jobs is not None else free_cores//cores_per_job):
75 job = self._prepare_ql_dill_job(problem=problem, algorithm=algorithm, backend=backend, output=output_path, cores=cores_per_job)
76 qcg_jobs.add(**job.get('qcg_args'))
77 return self.manager.submit(qcg_jobs)
78
[docs]
79 def wait_for_a_job(self, job_id: str | None = None, timeout: int | float | None = None) -> tuple[str, str]:
80 """
81 Waits for a job to finish and returns it's id and status.
82
83 Args:
84 job_id (str | None, optional): Id of selected job, if None waiting for any job. Defaults to None.
85 timeout (int | float | None, optional): Timeout in seconds. Defaults to None.
86
87 Raises:
88 ValueError: Raises if job_id not found or there are no jobs left.
89
90 Returns:
91 tuple[str, str]: job_id, job's status
92 """
93 if job_id is None:
94 if self._count_not_finished() <= 0:
95 raise ValueError("There are no jobs left")
96 job_id, state = self.manager.wait4_any_job_finish(timeout)
97 elif job_id in self.jobs:
98 state = self.manager.wait4(job_id, timeout=timeout)[job_id]
99 else:
100 raise ValueError(f"Job {job_id} not found in {self.__class__.__name__}'s jobs")
101
102 self.jobs[job_id]['finished'] = True
103 return job_id, state
104
105 def _prepare_ql_dill_job(self, problem: Problem, algorithm: Algorithm, backend: Backend,
106 output: str, cores: int = 1) -> dict:
107 job_uid = f'{len(self.jobs):05d}'
108 output_file = os.path.abspath(f'{output}output.{job_uid}')
109 launcher = QLauncher(problem, algorithm, backend)
110 input_file = os.path.abspath(f'{output}output.{job_uid}')
111 with open(input_file, 'wb') as f:
112 dill.dump(launcher, f)
113
114 in_args = [self.code_path, input_file, output_file]
115 qcg_args = {
116 'name': job_uid,
117 'exec': sys.executable,
118 'args': in_args,
119 'model': 'openmpi',
120 'stdout': f'{output}stdout.{job_uid}',
121 'stderr': f'{output}stderr.{job_uid}',
122 'numCores': cores
123 }
124 job = {'name': job_uid, 'qcg_args': qcg_args, 'output_file': output_file, 'finished': False}
125 self.jobs[job_uid] = job
126 return job
127
[docs]
128 def read_results(self, job_id: str) -> Result:
129 """
130 Reads the result of given job_id.
131
132 Args:
133 job_id (str): Job Id.
134
135 Returns:
136 Result: Result of selected Job.
137 """
138 output_path = self.jobs[job_id]['output_file']
139 with open(output_path, 'rb') as rt:
140 results = pickle.load(rt)
141 return results
142
[docs]
143 def clean_up(self):
144 """
145 Removes all output files generated in the process and calls self.manager.cleanup().
146 """
147 for job in self.jobs.values():
148 os.remove(job['output_file'])
149 if isinstance(self.manager, LocalManager):
150 self.manager.cleanup()
151
[docs]
152 def stop(self):
153 """
154 Stops the manager process.
155 """
156 self.manager.cancel(self.jobs)
157 if isinstance(self.manager, LocalManager):
158 self.manager.finish()
159
160 def __del__(self):
161 self.stop()