1import contextlib
2import os
3import pickle
4import sys
5from collections.abc import Callable
6from pathlib import Path
7from typing import Any
8
9from qlauncher.exceptions import DependencyError
10from qlauncher.workflow.base_job_manager import BaseJobManager
11
12try:
13 import dill
14 from qcg.pilotjob.api.job import Jobs
15 from qcg.pilotjob.api.manager import LocalManager, Manager
16 from qcg.pilotjob.api.errors import TimeoutElapsed
17except ImportError as e:
18 raise DependencyError(e, install_hint='pilotjob') from e
19
20
[docs]
21class PilotJobManager(BaseJobManager):
22 def __init__(
23 self,
24 output_path: str,
25 manager: Manager | None = None,
26 ):
27 """
28 PilotJob manager is QLauncher's wrapper for process management system, current version works on top of qcg-pilotjob
29
30 Args:
31 manager (Manager | None, optional): Manager system to schedule jobs, if set to None, the pilotjob's LocalManager is set.
32 Defaults to None.
33 """
34 super().__init__()
35 self.code_path = os.path.join(os.path.dirname(__file__), 'subprocess_fn.py')
36 self.manager = manager if manager is not None else LocalManager()
37 self.output_path = output_path
38
[docs]
39 def submit(
40 self,
41 function: Callable,
42 cores: int = 1,
43 **kwargs,
44 ) -> str:
45 """
46 Submit a function job to the scheduler.
47
48 Args:
49 function: Function to be executed.
50 cores: Number of CPU cores per task.
51 **kwargs: Manager-specific additional arguments.
52
53 Returns:
54 Job ID as a string.
55 """
56
57 job = self._prepare_ql_dill_job(
58 function,
59 output=self.output_path,
60 cores=cores,
61 )
62 return self.manager.submit(Jobs().add(**job['qcg_args']))[0]
63
[docs]
64 def submit_many(
65 self,
66 function: Callable,
67 cores_per_job: int = 1,
68 n_jobs: int | None = None,
69 ) -> list[str]:
70 """
71 Submits as many jobs as there are currently available cores.
72
73 Args:
74 function (Callable): Function to execute.
75 output_path (str): Path of output file.
76 cores_per_job (int, optional): Number of cores per job. Defaults to 1.
77 n_jobs: number of jobs to submit. If None, submit as many as possible (free_cores//cores_per_job). Defaults to None.
78
79 Returns:
80 list[str]: List with Job Id's.
81 """
82 free_cores = self.manager.resources()['free_cores']
83 if free_cores == 0:
84 return []
85
86 qcg_jobs = Jobs()
87 num_jobs = n_jobs if n_jobs is not None else free_cores // cores_per_job
88
89 for _ in range(num_jobs):
90 job = self._prepare_ql_dill_job(
91 function,
92 output=self.output_path,
93 cores=cores_per_job,
94 )
95 qcg_jobs.add(**job['qcg_args'])
96
97 return self.manager.submit(qcg_jobs)
98
[docs]
99 def wait_for_a_job(
100 self,
101 job_id: str | None = None,
102 timeout: float | None = None,
103 ) -> tuple[str, str]:
104 """
105 Waits for a job to finish and returns it's id and status.
106
107 Args:
108 job_id (str | None, optional): Id of selected job, if None waiting for any job. Defaults to None.
109 timeout (int | float | None, optional): Timeout in seconds. Defaults to None.
110
111 Raises:
112 ValueError: Raises if job_id not found or there are no jobs left.
113
114 Returns:
115 tuple[str, str]: job_id, job's status
116 """
117 if job_id is None:
118 if self._count_not_finished() <= 0:
119 raise ValueError('There are no jobs left')
120 job_id, state = self.manager.wait4_any_job_finish(timeout)
121 elif job_id in self.jobs:
122 try:
123 state = self.manager.wait4(job_id, timeout=timeout)[job_id]
124 except TimeoutElapsed as e:
125 raise TimeoutError from e
126 else:
127 raise ValueError(f"Job {job_id} not found in {self.__class__.__name__}'s jobs")
128
129 self.jobs[job_id]['finished'] = True
130 return job_id, state
131
132 def _prepare_ql_dill_job(
133 self,
134 function: Callable,
135 output: str,
136 cores: int = 1,
137 ) -> dict:
138 job_uid = self._make_job_uid()
139
140 out_dir = Path(output).expanduser().resolve()
141 out_dir.mkdir(parents=True, exist_ok=True)
142
143 output_file = out_dir / f'output.{job_uid}.pkl'
144 input_file = out_dir / f'input.{job_uid}.pkl'
145 stdout_file = out_dir / f'stdout.{job_uid}'
146 stderr_file = out_dir / f'stderr.{job_uid}'
147
148 with open(input_file, 'wb') as f:
149 dill.dump(function, f)
150
151 in_args = [self.code_path, str(input_file), str(output_file)]
152 qcg_args = {
153 'name': job_uid,
154 'exec': sys.executable,
155 'args': in_args,
156 'model': 'openmpi',
157 'stdout': str(stdout_file),
158 'stderr': str(stderr_file),
159 'numCores': cores,
160 }
161
162 job = {
163 'name': job_uid,
164 'qcg_args': qcg_args,
165 'output_file': str(output_file),
166 'finished': False,
167 }
168 self.jobs[job_uid] = job
169 return job
170
[docs]
171 def read_results(self, job_id) -> Any:
172 """
173 Reads the result of given job_id.
174
175 Args:
176 job_id (str): Job Id.
177
178 Returns:
179 Result: Result of selected Job.
180 """
181 if job_id not in self.jobs:
182 raise KeyError(f'Job {job_id} not found')
183
184 output_path = self.jobs[job_id]['output_file']
185 with open(output_path, 'rb') as rt:
186 return pickle.load(rt)
187
[docs]
188 def cancel(self, job_id: str) -> None:
189 """
190 Cancel a given job
191
192 Args:
193 job_id (str): id of the job to cancel.
194
195 Raises:
196 KeyError: If job with a given id was not submitted by this manager.
197
198 Returns:
199 None
200 """
201 if job_id not in self.jobs:
202 raise KeyError(f'Job {job_id} not found')
203 return self.manager.cancel(job_id)
204
[docs]
205 def clean_up(self) -> None:
206 """
207 Removes all output files generated in the process and calls self.manager.cleanup().
208 """
209 for job in self.jobs.values():
210 if os.path.exists(job['output_file']):
211 os.remove(job['output_file'])
212
213 if isinstance(self.manager, LocalManager):
214 with contextlib.suppress(Exception):
215 self.manager.cleanup()
216
[docs]
217 def stop(self) -> None:
218 """
219 Stops the manager process.
220 """
221 mgr = getattr(self, 'manager', None)
222 if mgr is None:
223 return
224
225 if isinstance(mgr, LocalManager):
226 with contextlib.suppress(Exception):
227 mgr.finish()
228
[docs]
229 def run(self, function: Callable[..., Any], cores: int = 1, **kwargs) -> Any:
230 return super().run(function, cores=cores, **kwargs)
231
232 def __del__(self):
233 with contextlib.suppress(Exception):
234 self.stop()