1import contextlib
2import time
3import weakref
4from collections.abc import Callable
5from threading import Event, Thread
6from typing import TYPE_CHECKING, Any
7
8import multiprocess as mp
9from multiprocess.queues import Queue as QueueType
10
11if TYPE_CHECKING:
12 from multiprocess.process import BaseProcess as ProcessType
13else:
14 ProcessType = Any
15
16# LocalJobManager used to run each job via pathos._ProcessPool (apply_async + polling).
17# On Windows this proved flaky for our test/CI contract (cancel + GC cleanup): worker pools were not
18# deterministically torn down, leaving child processes alive and causing intermittent timeouts and
19# noisy Pool.__del__/WinError shutdown errors. I switched to a per-job multiprocess.Process + Queue
20# so we can reliably terminate/join the worker on cancel/cleanup and ensure no subprocesses leak.
21
22from qlauncher.base.base import Result
23from qlauncher.workflow.base_job_manager import BaseJobManager
24
25
[docs]
26def get_timeout(max_timeout: int | float | None, start: int | float) -> float | None:
27 """
28 Get timeout to wait on an event, useful when awaiting multiple tasks and total timeout must be max_timeout.
29
30 Args:
31 max_timeout (int | float | None): Total allowed timeout, None = infinite wait.
32 start (int | float): Await start timestamp (time.time())
33
34 Returns:
35 int | float | None: Remaining timeout or None if max_timeout was None
36 """
37 if max_timeout is None:
38 return None
39 return max_timeout - (time.time() - start)
40
41
42def _run_in_subprocess(q: QueueType, fn: Callable[[], Any]) -> None:
43 """
44 Execute a callable in a worker process and send its outcome back via a queue.
45
46 Args:
47 q: Queue used to report ("ok", result) or ("err", exception).
48 fn: Zero-argument callable to execute in the child process.
49
50 Returns:
51 None. The result/exception is returned through the queue.
52 """
53 try:
54 q.put(('ok', fn()))
55 except BaseException as e:
56 # Pass exception object back; parent will raise it.
57 q.put(('err', e))
58
59
60class _InnerMPTask:
61 """
62 Internal asynchronous task runner implemented as:
63 - a supervisor thread in the parent process,
64 - a dedicated child process running the user callable,
65 - a queue for returning result or exception.
66
67 Args:
68 task: Zero-argument callable to execute.
69 callbacks: Optional list of callables invoked with the task outcome (result or exception).
70
71 Attributes:
72 task: Callable executed in the child process.
73 callbacks: Functions called after task finishes (best-effort).
74 """
75
76 def __init__(
77 self,
78 task: Callable,
79 callbacks: list[Callable] | None = None,
80 ) -> None:
81 self.task = task
82 self.callbacks = callbacks if callbacks is not None else []
83
84 self._cancelled = False
85 self._thread: Thread | None = None
86 self._thread_made = Event()
87
88 self._proc: ProcessType | None = None
89 self._queue: QueueType | None = None
90
91 self._result: Any = None
92 self._done = False
93
94 def _terminate_proc(self) -> None:
95 p = self._proc
96 if p is None:
97 return
98 try:
99 if p.is_alive():
100 p.terminate()
101 except Exception:
102 pass
103 with contextlib.suppress(Exception):
104 p.join(timeout=1.0)
105
106 def _async_task(self) -> None:
107 if self._cancelled:
108 self._result = None
109 self._done = True
110 return
111
112 ctx = mp.get_context() # na Windows będzie spawn (domyślnie)
113 q: QueueType = ctx.Queue(maxsize=1)
114 self._queue = q
115
116 p = ctx.Process(target=_run_in_subprocess, args=(q, self.task), daemon=True)
117 self._proc = p
118
119 try:
120 p.start()
121 except BaseException as e:
122 self._result = e
123 self._done = True
124 return
125
126 while True:
127 if self._cancelled:
128 self._terminate_proc()
129 self._result = None
130 self._done = True
131 return
132
133 try:
134 p.join(timeout=0.05)
135 except Exception:
136 time.sleep(0.05)
137
138 if not p.is_alive():
139 break
140
141 try:
142 tag, payload = q.get_nowait()
143 except Exception:
144 tag, payload = ('ok', None)
145
146 if tag == 'err':
147 self._result = payload
148 else:
149 self._result = payload
150
151 self._done = True
152
153 def thread_main(self) -> None:
154 self._async_task()
155
156 for cb in self.callbacks:
157 with contextlib.suppress(Exception):
158 cb(self._result)
159
160 def _set_thread(self) -> None:
161 self._thread = Thread(target=weakref.proxy(self).thread_main, daemon=True) # set daemon so that thread quits as main process quits
162 self._thread.start()
163 self._thread_made.set()
164
165 def start(self) -> None:
166 """
167 Start task execution.
168
169 Args:
170 None.
171
172 Returns:
173 None.
174 """
175 if self._thread is not None or self._cancelled:
176 raise ValueError('Cannot start, task already started or cancelled.')
177 self._set_thread()
178
179 def cancel(self) -> bool:
180 """
181 Attempt to cancel the task.
182
183 Args:
184 None.
185
186 Returns:
187 True if the task is considered canceled.
188 """
189 self._cancelled = True
190 if self._thread is None:
191 self._terminate_proc()
192 self._result = None
193 self._done = True
194 return True
195
196 # Started: terminate subprocess and wait briefly for supervisor thread to exit
197 self._terminate_proc()
198 with contextlib.suppress(Exception):
199 self._thread.join(timeout=1.0)
200
201 # Mark as done (even if thread is stubborn); public contract: cancelled => terminal
202 self._result = None
203 self._done = True
204 return True
205
206 def cancelled(self) -> bool:
207 """
208 Returns:
209 bool: True if the task was canceled by the user.
210 """
211 return self._cancelled
212
213 def done(self) -> bool:
214 """
215 Returns:
216 bool: True if the task had finished execution.
217 """
218 return self._done
219
220 def running(self) -> bool:
221 """
222 Returns:
223 bool: True if the task is currently executing.
224 """
225 if self._thread is None:
226 return False
227 return self._thread.is_alive()
228
229 def result(self, timeout: float | int | None = None) -> Result | None:
230 """
231 Wait for the task to finish and return its result.
232
233 Args:
234 timeout: Maximum time to wait in seconds. If None, wait indefinitely.
235
236 Returns:
237 The task result, or None if the task was canceled.
238
239 Raises:
240 TimeoutError: If the task does not finish within the timeout (task is canceled).
241 BaseException: Re-raises an exception produced by the task.
242 """
243 start = time.time()
244
245 # wait until the supervisor thread exists
246 self._thread_made.wait(timeout=get_timeout(timeout, start))
247
248 # if never started, just return current result
249 if self._thread is None:
250 return None
251
252 self._thread.join(timeout=get_timeout(timeout, start))
253 if self._thread.is_alive():
254 self.cancel()
255 raise TimeoutError # thread still running after timeout
256 if isinstance(self._result, BaseException):
257 raise self._result
258 return self._result
259
260
[docs]
261class MPTask:
262 """
263 Public-facing wrapper around _InnerMPTask.
264
265 This wrapper exists so that if the task object is garbage-collected,
266 we best-effort cancel the underlying execution to avoid leaking subprocesses.
267
268 Args:
269 task: Zero-argument callable to execute.
270 callbacks: Optional list of callables invoked with the task outcome.
271
272 Attributes:
273 _inner_task: The underlying _InnerMPTask instance.
274 """
275
276 def __init__(
277 self,
278 task: Callable,
279 callbacks: list[Callable] | None = None,
280 ) -> None:
281 self._inner_task = _InnerMPTask(task, callbacks)
282 weakref.finalize(self, self._inner_task.cancel)
283
284 def __getattr__(self, name: str):
285 return getattr(self._inner_task, name)
286
287
[docs]
288class LocalJobManager(BaseJobManager):
289 """
290 Run jobs locally using a per-job child process.
291
292 This manager implements the BaseJobManager interface and is primarily intended
293 for local execution and testing (supports waiting for any job, cancellation, and cleanup).
294 """
295
296 def __init__(self, poll_interval_s: float = 0.05):
297 super().__init__()
298 self.tasks: dict[str, MPTask] = {}
299 self._poll_interval_s = poll_interval_s
300
301 def __del__(self):
302 with contextlib.suppress(Exception):
303 self.clean_up()
304
[docs]
305 def submit(
306 self,
307 function: Callable,
308 **kwargs,
309 ) -> str:
310 """
311 Submit a function job to the scheduler.
312
313 Args:
314 function: Function to be executed.
315 **kwargs: Manager-specific additional arguments (currently unused by LocalJobManager).
316
317 Returns:
318 Job ID as a string.
319 """
320 jid = self._make_job_uid()
321 while jid in self.tasks or jid in self.jobs:
322 jid = self._make_job_uid()
323
324 self.jobs[jid] = {'finished': False}
325
326 t = MPTask(function)
327 self.tasks[jid] = t
328 t.start()
329 return jid
330
[docs]
331 def wait_for_a_job(
332 self,
333 job_id: str | None = None,
334 timeout: float | None = None,
335 ) -> str | None:
336 """
337 Wait for a job to finish.
338
339 Args:
340 job_id: If provided, wait for this specific job. If None, wait for any unfinished job.
341 timeout: Maximum time to wait in seconds. If None, wait indefinitely.
342
343 Returns:
344 The finished job ID.
345
346 Raises:
347 KeyError: If job_id is provided but unknown.
348 ValueError: If waiting for any job but no jobs exist.
349 TimeoutError: If timeout expires before a job finishes.
350 """
351 if job_id is not None:
352 if job_id not in self.tasks:
353 raise KeyError('No such job!')
354 self.tasks[job_id].result(timeout=timeout)
355 if job_id in self.jobs:
356 self.jobs[job_id]['finished'] = True
357 return job_id
358
359 if not self.tasks:
360 raise ValueError('No jobs to wait for.')
361
362 start = time.time()
363 while True:
364 for jid, t in self.tasks.items():
365 if jid in self.jobs and self.jobs[jid].get('finished', False):
366 continue
367 if t.done() or t.cancelled():
368 if jid in self.jobs:
369 self.jobs[jid]['finished'] = True
370 return jid
371
372 if timeout is not None and (time.time() - start) >= timeout:
373 raise TimeoutError
374
375 time.sleep(self._poll_interval_s)
376
[docs]
377 def read_results(self, job_id: str) -> Result:
378 """
379 Read results for a finished job.
380
381 Args:
382 job_id: Job ID.
383
384 Returns:
385 The job result.
386
387 Raises:
388 KeyError: If job_id is unknown.
389 BaseException: Re-raises an exception produced by the job.
390 """
391 if job_id not in self.tasks:
392 raise KeyError('No such job!')
393 res = self.tasks[job_id].result()
394 if job_id in self.jobs:
395 self.jobs[job_id]['finished'] = True
396 return res
397
[docs]
398 def cancel(self, job_id: str) -> bool:
399 """
400 Cancel a submitted job.
401
402 Args:
403 job_id: Job ID.
404
405 Returns:
406 True if cancellation was performed (best-effort) and the job is marked finished.
407
408 Raises:
409 KeyError: If job_id is unknown.
410 """
411 if job_id not in self.tasks:
412 raise KeyError('No such job!')
413 ok = self.tasks[job_id].cancel()
414 if ok and job_id in self.jobs:
415 self.jobs[job_id]['finished'] = True
416 return ok
417
[docs]
418 def clean_up(self) -> None:
419 """
420 Cancel all known jobs and mark them as finished.
421
422 Args:
423 None.
424
425 Returns:
426 None.
427 """
428 for jid, t in list(self.tasks.items()):
429 with contextlib.suppress(Exception):
430 t.cancel()
431 if jid in self.jobs:
432 self.jobs[jid]['finished'] = True