Source code for qlauncher.workflow.base_job_manager
1from abc import ABC, abstractmethod
2from collections.abc import Callable
3from typing import Any
4
5from qlauncher.base import Result
6
7
[docs]
8class BaseJobManager(ABC):
9 """
10 Abstract base class for job managers that execute functions.
11 on different compute backends.
12 """
13
14 def __init__(self):
15 self.jobs: dict[str, dict[str, Any]] = {}
16
[docs]
17 @abstractmethod
18 def submit(
19 self,
20 function: Callable,
21 **kwargs,
22 ) -> str:
23 """
24 Submit a function job to the scheduler.
25
26 Args:
27 function: Function to be executed.
28 cores: Number of CPU cores per task.
29 **kwargs: Manager-specific additional arguments.
30
31 Returns:
32 Job ID as a string.
33 """
34 pass
35
[docs]
36 @abstractmethod
37 def wait_for_a_job(
38 self,
39 job_id: str | None = None,
40 timeout: float | None = None,
41 ) -> str | None:
42 """
43 Wait for a job to finish and return its ID.
44
45 Args:
46 job_id: ID of the job to wait for. If None, wait for any job.
47 timeout: Maximum time to wait in seconds. If None, wait indefinitely.
48
49 Returns:
50 Job ID of the finished job.
51
52 Raises:
53 ValueError: If no jobs are available to wait for.
54 TimeoutError: If timeout is exceeded.
55 """
56 pass
57
[docs]
58 @abstractmethod
59 def read_results(self, job_id: str) -> Any:
60 """
61 Read the result of a finished job.
62
63 Args:
64 job_id: Job ID returned by submit().
65
66 Returns:
67 Result object produced by the job.
68
69 Raises:
70 KeyError: If job_id is not known to this manager.
71 FileNotFoundError: If the result file does not exist.
72 """
73 pass
74
[docs]
75 @abstractmethod
76 def cancel(self, job_id: str):
77 """
78 Cancel a given job
79
80 Args:
81 job_id (str): id of the job to cancel.
82
83 Raises:
84 KeyError: If job with a given id was not submitted by this manager.
85
86 Returns:
87 None
88 """
89 pass
90
[docs]
91 @abstractmethod
92 def clean_up(self) -> None:
93 """
94 Clean up temporary files and resources created by the manager.
95 """
96 pass
97
[docs]
98 def run(
99 self,
100 function: Callable,
101 **kwargs,
102 ) -> Any:
103 """
104 Convenience method: submit job, wait for completion, read results, and cleanup.
105
106 This method handles the complete lifecycle of a job execution.
107
108 Args:
109 function: Function to be executed.
110 cores: Number of CPU cores per task.
111 **kwargs: Manager-specific additional arguments.
112
113 Returns:
114 Result object produced by the job.
115 """
116 try:
117 job_id = self.submit(function, **kwargs)
118 self.wait_for_a_job(job_id)
119 return self.read_results(job_id)
120 finally:
121 self.clean_up()
122
123 def _count_not_finished(self) -> int:
124 """Count how many jobs are not yet marked as finished."""
125 return len([job for job in self.jobs.values() if not job.get('finished', False)])
126
127 def _make_job_uid(self) -> str:
128 """Generate a unique job identifier."""
129 return f'{len(self.jobs):05d}'
130
131
132if __name__ == '__main__':
133 from qlauncher import QLauncher
134 from qlauncher.problems import MaxCut
135 from qlauncher.routines.qiskit import QAOA, QiskitBackend
136
137 problem = MaxCut.from_preset('default')
138 algorithm = QAOA(p=3)
139 backend = QiskitBackend('local_simulator')
140
141 # SlurmJobManager
142 from qlauncher.workflow.slurm_job_manager import SlurmJobManager
143
144 slurm_mgr = SlurmJobManager(
145 slurm_options={
146 'time': '00:30:00',
147 # "licenses": "orca1:1",
148 },
149 env_setup=[
150 # "module load Python/python-3.11.0",
151 # "source ~/venv/bin/activate",
152 ],
153 )
154 slurm_result = slurm_mgr.run(QLauncher(problem, algorithm, backend).run, cores=1)
155 print(slurm_result)
156
157 # # PilotJobManager
158 # from qlauncher.workflow.pilotjob_scheduler import PilotJobManager
159 #
160 # out_dir = Path("./pilotjob_out")
161 #
162 # pilot_mgr = PilotJobManager()
163 # pilot_result = pilot_mgr.run(
164 # problem,
165 # algorithm,
166 # backend,
167 # cores=1,
168 # output_path=out_dir,
169 # )
170 # print(pilot_result)