1import os
2import pickle
3import sys
4from typing import List, Optional
5
6from quantum_launcher.base.base import Algorithm, Backend, Problem, Result
7from quantum_launcher.launcher.qlauncher import QuantumLauncher
8from quantum_launcher.import_management import DependencyError
9try:
10 import dill
11 from qcg.pilotjob.api.job import Jobs
12 from qcg.pilotjob.api.manager import LocalManager, Manager
13except ImportError as e:
14 raise DependencyError(e, install_hint='pilotjob') from e
15
16
[docs]
17class JobManager:
18 def __init__(self, manager: Optional[Manager] = None):
19 """
20 Job manager is Quantum Launcher's wrapper for process management system, current version works on top of qcg-pilotjob
21
22 Args:
23 manager (Optional[Manager], optional): Manager system to schedule jobs, if set to None, the pilotjob's LocalManager is set.
24 Defaults to None.
25 """
26 self.jobs = {}
27 self.code_path = os.path.join(os.path.dirname(__file__), 'pilotjob_task.py')
28 if manager is None:
29 manager = LocalManager()
30 self.manager = manager
31
32 def _count_not_finished(self) -> int:
33 return len([job for job in self.jobs.values() if job.get('finished') is False])
34
[docs]
35 def submit(self, problem: Problem, algorithm: Algorithm, backend: Backend, output_path: str, cores: Optional[int] = None) -> str:
36 """
37 Submits Quantum Launcher job to the scheduler
38
39 Args:
40 problem (Problem): Problem.
41 algorithm (Algorithm): Algorithm.
42 backend (Backend): Backend.
43 output_path (str): Path of output file.
44 cores (Optional[int], optional): Number of cores per task, if None value set to number of free cores (at least 1). Defaults to None.
45
46 Returns:
47 str: Job Id.
48 """
49 if cores is None:
50 cores = max(1, self.manager.resources()['free_cores'])
51 job = self._prepare_ql_dill_job(problem=problem, algorithm=algorithm, backend=backend,
52 output=output_path, cores=cores)
53 job_id = self.manager.submit(Jobs().add(**job.get('qcg_args')))[0]
54 return job_id
55
[docs]
56 def submit_many(self, problem: Problem, algorithm: Algorithm, backend: Backend, output_path: str, cores_per_job: int = 1) -> List[str]:
57 """
58 Submits as many jobs as there are currently available cores.
59
60 Args:
61 problem (Problem): Problem.
62 algorithm (Algorithm): Algorithm.
63 backend (Backend): Backend.
64 output_path (str): Path of output file.
65 cores_per_job (int, optional): Number of cores per job. Defaults to 1.
66
67 Returns:
68 List[str]: List with Job Id's.
69 """
70 free_cores = self.manager.resources()['free_cores']
71 if free_cores == 0:
72 return []
73 qcg_jobs = Jobs()
74 for _ in range(free_cores//cores_per_job):
75 job = self._prepare_ql_dill_job(problem=problem, algorithm=algorithm, backend=backend, output=output_path, cores=cores_per_job)
76 qcg_jobs.add(**job.get('qcg_args'))
77 return self.manager.submit(qcg_jobs)
78
[docs]
79 def wait_for_a_job(self, job_id: Optional[str] = None, timeout: Optional[int | float] = None) -> tuple[str, str]:
80 """
81 Waits for a job to finish and returns it's id and status.
82
83 Args:
84 job_id (Optional[str], optional): Id of selected job, if None waiting for any job. Defaults to None.
85 timeout (Optional[int | float], optional): Timeout in seconds. Defaults to None.
86
87 Raises:
88 ValueError: Raises if job_id not found or there are no jobs left.
89
90 Returns:
91 tuple[str, str]: job_id, job's status
92 """
93 if job_id is None:
94 if self._count_not_finished() <= 0:
95 raise ValueError("There are no jobs left")
96 job_id, state = self.manager.wait4_any_job_finish(timeout)
97 elif job_id in self.jobs:
98 state = self.manager.wait4(job_id, timeout=timeout)[job_id]
99 else:
100 raise ValueError(f"Job {job_id} not found in {self.__class__.__name__}'s jobs")
101
102 self.jobs[job_id]['finished'] = True
103 return job_id, state
104
105 def _prepare_ql_dill_job(self, problem: Problem, algorithm: Algorithm, backend: Backend,
106 output: str, cores: int = 1) -> dict:
107 job_uid = f'{len(self.jobs):05d}'
108 output_file = os.path.abspath(f'{output}output.{job_uid}')
109 launcher = QuantumLauncher(problem, algorithm, backend)
110 input_file = os.path.abspath(f'{output}output.{job_uid}')
111 with open(input_file, 'wb') as f:
112 dill.dump(launcher, f)
113
114 in_args = [self.code_path, input_file, output_file]
115 qcg_args = {
116 'name': job_uid,
117 'exec': sys.executable,
118 'args': in_args,
119 'model': 'openmpi',
120 'stdout': f'{output}stdout.{job_uid}',
121 'stderr': f'{output}stderr.{job_uid}',
122 'numCores': cores
123 }
124 job = {'name': job_uid, 'qcg_args': qcg_args, 'output_file': output_file, 'finished': False}
125 self.jobs[job_uid] = job
126 return job
127
[docs]
128 def read_results(self, job_id: str) -> Result:
129 """
130 Reads the result of given job_id.
131
132 Args:
133 job_id (str): Job Id.
134
135 Returns:
136 Result: Result of selected Job.
137 """
138 output_path = self.jobs[job_id]['output_file']
139 with open(output_path, 'rb') as rt:
140 results = pickle.load(rt)
141 return results
142
[docs]
143 def clean_up(self):
144 """
145 Removes all output files generated in the process and calls self.manager.cleanup().
146 """
147 for job in self.jobs.values():
148 os.remove(job['output_file'])
149 if isinstance(self.manager, LocalManager):
150 self.manager.cleanup()
151
[docs]
152 def stop(self):
153 """
154 Stops the manager process.
155 """
156 self.manager.cancel(self.jobs)
157 self.manager.finish()
158
159 def __del__(self):
160 self.stop()