Source code for qlauncher.workflow.local_scheduler

  1import contextlib
  2import time
  3import weakref
  4from collections.abc import Callable
  5from threading import Event, Thread
  6from typing import TYPE_CHECKING, Any
  7
  8import multiprocess as mp
  9from multiprocess.queues import Queue as QueueType
 10
 11if TYPE_CHECKING:
 12    from multiprocess.process import BaseProcess as ProcessType
 13else:
 14    ProcessType = Any
 15
 16# LocalJobManager used to run each job via pathos._ProcessPool (apply_async + polling).
 17# On Windows this proved flaky for our test/CI contract (cancel + GC cleanup): worker pools were not
 18# deterministically torn down, leaving child processes alive and causing intermittent timeouts and
 19# noisy Pool.__del__/WinError shutdown errors. I switched to a per-job multiprocess.Process + Queue
 20# so we can reliably terminate/join the worker on cancel/cleanup and ensure no subprocesses leak.
 21
 22from qlauncher.base.base import Result
 23from qlauncher.workflow.base_job_manager import BaseJobManager
 24
 25
[docs] 26def get_timeout(max_timeout: int | float | None, start: int | float) -> float | None: 27 """ 28 Get timeout to wait on an event, useful when awaiting multiple tasks and total timeout must be max_timeout. 29 30 Args: 31 max_timeout (int | float | None): Total allowed timeout, None = infinite wait. 32 start (int | float): Await start timestamp (time.time()) 33 34 Returns: 35 int | float | None: Remaining timeout or None if max_timeout was None 36 """ 37 if max_timeout is None: 38 return None 39 return max_timeout - (time.time() - start)
40 41 42def _run_in_subprocess(q: QueueType, fn: Callable[[], Any]) -> None: 43 """ 44 Execute a callable in a worker process and send its outcome back via a queue. 45 46 Args: 47 q: Queue used to report ("ok", result) or ("err", exception). 48 fn: Zero-argument callable to execute in the child process. 49 50 Returns: 51 None. The result/exception is returned through the queue. 52 """ 53 try: 54 q.put(('ok', fn())) 55 except BaseException as e: 56 # Pass exception object back; parent will raise it. 57 q.put(('err', e)) 58 59 60class _InnerMPTask: 61 """ 62 Internal asynchronous task runner implemented as: 63 - a supervisor thread in the parent process, 64 - a dedicated child process running the user callable, 65 - a queue for returning result or exception. 66 67 Args: 68 task: Zero-argument callable to execute. 69 callbacks: Optional list of callables invoked with the task outcome (result or exception). 70 71 Attributes: 72 task: Callable executed in the child process. 73 callbacks: Functions called after task finishes (best-effort). 74 """ 75 76 def __init__( 77 self, 78 task: Callable, 79 callbacks: list[Callable] | None = None, 80 ) -> None: 81 self.task = task 82 self.callbacks = callbacks if callbacks is not None else [] 83 84 self._cancelled = False 85 self._thread: Thread | None = None 86 self._thread_made = Event() 87 88 self._proc: ProcessType | None = None 89 self._queue: QueueType | None = None 90 91 self._result: Any = None 92 self._done = False 93 94 def _terminate_proc(self) -> None: 95 p = self._proc 96 if p is None: 97 return 98 try: 99 if p.is_alive(): 100 p.terminate() 101 except Exception: 102 pass 103 with contextlib.suppress(Exception): 104 p.join(timeout=1.0) 105 106 def _async_task(self) -> None: 107 if self._cancelled: 108 self._result = None 109 self._done = True 110 return 111 112 ctx = mp.get_context() # na Windows będzie spawn (domyślnie) 113 q: QueueType = ctx.Queue(maxsize=1) 114 self._queue = q 115 116 p = ctx.Process(target=_run_in_subprocess, args=(q, self.task), daemon=True) 117 self._proc = p 118 119 try: 120 p.start() 121 except BaseException as e: 122 self._result = e 123 self._done = True 124 return 125 126 while True: 127 if self._cancelled: 128 self._terminate_proc() 129 self._result = None 130 self._done = True 131 return 132 133 try: 134 p.join(timeout=0.05) 135 except Exception: 136 time.sleep(0.05) 137 138 if not p.is_alive(): 139 break 140 141 try: 142 tag, payload = q.get_nowait() 143 except Exception: 144 tag, payload = ('ok', None) 145 146 if tag == 'err': 147 self._result = payload 148 else: 149 self._result = payload 150 151 self._done = True 152 153 def thread_main(self) -> None: 154 self._async_task() 155 156 for cb in self.callbacks: 157 with contextlib.suppress(Exception): 158 cb(self._result) 159 160 def _set_thread(self) -> None: 161 self._thread = Thread(target=weakref.proxy(self).thread_main, daemon=True) # set daemon so that thread quits as main process quits 162 self._thread.start() 163 self._thread_made.set() 164 165 def start(self) -> None: 166 """ 167 Start task execution. 168 169 Args: 170 None. 171 172 Returns: 173 None. 174 """ 175 if self._thread is not None or self._cancelled: 176 raise ValueError('Cannot start, task already started or cancelled.') 177 self._set_thread() 178 179 def cancel(self) -> bool: 180 """ 181 Attempt to cancel the task. 182 183 Args: 184 None. 185 186 Returns: 187 True if the task is considered canceled. 188 """ 189 self._cancelled = True 190 if self._thread is None: 191 self._terminate_proc() 192 self._result = None 193 self._done = True 194 return True 195 196 # Started: terminate subprocess and wait briefly for supervisor thread to exit 197 self._terminate_proc() 198 with contextlib.suppress(Exception): 199 self._thread.join(timeout=1.0) 200 201 # Mark as done (even if thread is stubborn); public contract: cancelled => terminal 202 self._result = None 203 self._done = True 204 return True 205 206 def cancelled(self) -> bool: 207 """ 208 Returns: 209 bool: True if the task was canceled by the user. 210 """ 211 return self._cancelled 212 213 def done(self) -> bool: 214 """ 215 Returns: 216 bool: True if the task had finished execution. 217 """ 218 return self._done 219 220 def running(self) -> bool: 221 """ 222 Returns: 223 bool: True if the task is currently executing. 224 """ 225 if self._thread is None: 226 return False 227 return self._thread.is_alive() 228 229 def result(self, timeout: float | int | None = None) -> Result | None: 230 """ 231 Wait for the task to finish and return its result. 232 233 Args: 234 timeout: Maximum time to wait in seconds. If None, wait indefinitely. 235 236 Returns: 237 The task result, or None if the task was canceled. 238 239 Raises: 240 TimeoutError: If the task does not finish within the timeout (task is canceled). 241 BaseException: Re-raises an exception produced by the task. 242 """ 243 start = time.time() 244 245 # wait until the supervisor thread exists 246 self._thread_made.wait(timeout=get_timeout(timeout, start)) 247 248 # if never started, just return current result 249 if self._thread is None: 250 return None 251 252 self._thread.join(timeout=get_timeout(timeout, start)) 253 if self._thread.is_alive(): 254 self.cancel() 255 raise TimeoutError # thread still running after timeout 256 if isinstance(self._result, BaseException): 257 raise self._result 258 return self._result 259 260
[docs] 261class MPTask: 262 """ 263 Public-facing wrapper around _InnerMPTask. 264 265 This wrapper exists so that if the task object is garbage-collected, 266 we best-effort cancel the underlying execution to avoid leaking subprocesses. 267 268 Args: 269 task: Zero-argument callable to execute. 270 callbacks: Optional list of callables invoked with the task outcome. 271 272 Attributes: 273 _inner_task: The underlying _InnerMPTask instance. 274 """ 275 276 def __init__( 277 self, 278 task: Callable, 279 callbacks: list[Callable] | None = None, 280 ) -> None: 281 self._inner_task = _InnerMPTask(task, callbacks) 282 weakref.finalize(self, self._inner_task.cancel) 283 284 def __getattr__(self, name: str): 285 return getattr(self._inner_task, name)
286 287
[docs] 288class LocalJobManager(BaseJobManager): 289 """ 290 Run jobs locally using a per-job child process. 291 292 This manager implements the BaseJobManager interface and is primarily intended 293 for local execution and testing (supports waiting for any job, cancellation, and cleanup). 294 """ 295 296 def __init__(self, poll_interval_s: float = 0.05): 297 super().__init__() 298 self.tasks: dict[str, MPTask] = {} 299 self._poll_interval_s = poll_interval_s 300 301 def __del__(self): 302 with contextlib.suppress(Exception): 303 self.clean_up() 304
[docs] 305 def submit( 306 self, 307 function: Callable, 308 **kwargs, 309 ) -> str: 310 """ 311 Submit a function job to the scheduler. 312 313 Args: 314 function: Function to be executed. 315 **kwargs: Manager-specific additional arguments (currently unused by LocalJobManager). 316 317 Returns: 318 Job ID as a string. 319 """ 320 jid = self._make_job_uid() 321 while jid in self.tasks or jid in self.jobs: 322 jid = self._make_job_uid() 323 324 self.jobs[jid] = {'finished': False} 325 326 t = MPTask(function) 327 self.tasks[jid] = t 328 t.start() 329 return jid
330
[docs] 331 def wait_for_a_job( 332 self, 333 job_id: str | None = None, 334 timeout: float | None = None, 335 ) -> str | None: 336 """ 337 Wait for a job to finish. 338 339 Args: 340 job_id: If provided, wait for this specific job. If None, wait for any unfinished job. 341 timeout: Maximum time to wait in seconds. If None, wait indefinitely. 342 343 Returns: 344 The finished job ID. 345 346 Raises: 347 KeyError: If job_id is provided but unknown. 348 ValueError: If waiting for any job but no jobs exist. 349 TimeoutError: If timeout expires before a job finishes. 350 """ 351 if job_id is not None: 352 if job_id not in self.tasks: 353 raise KeyError('No such job!') 354 self.tasks[job_id].result(timeout=timeout) 355 if job_id in self.jobs: 356 self.jobs[job_id]['finished'] = True 357 return job_id 358 359 if not self.tasks: 360 raise ValueError('No jobs to wait for.') 361 362 start = time.time() 363 while True: 364 for jid, t in self.tasks.items(): 365 if jid in self.jobs and self.jobs[jid].get('finished', False): 366 continue 367 if t.done() or t.cancelled(): 368 if jid in self.jobs: 369 self.jobs[jid]['finished'] = True 370 return jid 371 372 if timeout is not None and (time.time() - start) >= timeout: 373 raise TimeoutError 374 375 time.sleep(self._poll_interval_s)
376
[docs] 377 def read_results(self, job_id: str) -> Result: 378 """ 379 Read results for a finished job. 380 381 Args: 382 job_id: Job ID. 383 384 Returns: 385 The job result. 386 387 Raises: 388 KeyError: If job_id is unknown. 389 BaseException: Re-raises an exception produced by the job. 390 """ 391 if job_id not in self.tasks: 392 raise KeyError('No such job!') 393 res = self.tasks[job_id].result() 394 if job_id in self.jobs: 395 self.jobs[job_id]['finished'] = True 396 return res
397
[docs] 398 def cancel(self, job_id: str) -> bool: 399 """ 400 Cancel a submitted job. 401 402 Args: 403 job_id: Job ID. 404 405 Returns: 406 True if cancellation was performed (best-effort) and the job is marked finished. 407 408 Raises: 409 KeyError: If job_id is unknown. 410 """ 411 if job_id not in self.tasks: 412 raise KeyError('No such job!') 413 ok = self.tasks[job_id].cancel() 414 if ok and job_id in self.jobs: 415 self.jobs[job_id]['finished'] = True 416 return ok
417
[docs] 418 def clean_up(self) -> None: 419 """ 420 Cancel all known jobs and mark them as finished. 421 422 Args: 423 None. 424 425 Returns: 426 None. 427 """ 428 for jid, t in list(self.tasks.items()): 429 with contextlib.suppress(Exception): 430 t.cancel() 431 if jid in self.jobs: 432 self.jobs[jid]['finished'] = True