Source code for ewoksjob.client.local.futures

from concurrent.futures import CancelledError as NativeCancelledError
from concurrent.futures import Future as NativeFuture
from concurrent.futures import TimeoutError as NativeTimeoutError
from contextlib import contextmanager
from typing import Any
from typing import Optional
from typing import Union

try:
    from pyslurmutils.client.errors import RemoteExit
    from pyslurmutils.concurrent.futures import SlurmRestFuture

    NATIVE_FUTURE_TYPES = Union[NativeFuture, SlurmRestFuture]
except ImportError:

[docs] class RemoteExit(Exception): pass
SlurmRestFuture = None NATIVE_FUTURE_TYPES = Union[NativeFuture] from ..futures import CancelledError from ..futures import FutureInterface from ..futures import TimeoutError
[docs] class LocalFuture(FutureInterface): def __init__(self, uuid: str, future: Optional[NATIVE_FUTURE_TYPES] = None) -> None: if future is None: from .pool import get_active_pool pool = get_active_pool() future = pool.get_future(uuid) self._native_future = future if SlurmRestFuture is None: self._is_slurm = False else: self._is_slurm = isinstance(future, SlurmRestFuture) self._uuid = uuid # Same API has `concurrent.futures.LocalFuture`
[docs] def cancel(self) -> bool: return self._native_future.cancel()
[docs] def cancelled(self) -> bool: return self._native_future.cancelled()
[docs] def running(self) -> bool: return self._native_future.running()
[docs] def done(self) -> bool: return self._native_future.done()
[docs] def result(self, timeout: Optional[float] = None) -> Any: with self._convert_exceptions(timeout): return self._native_future.result(timeout=timeout)
[docs] def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: with self._convert_exceptions(timeout): return self._native_future.exception(timeout=timeout)
@contextmanager def _convert_exceptions(self, timeout): try: yield except (NativeCancelledError, RemoteExit) as e: err_msg = f"future of job '{self.uuid}' was cancelled" raise CancelledError(err_msg) from e except NativeTimeoutError as e: err_msg = f"job '{self.uuid}' is not done with {timeout} seconds" raise TimeoutError(err_msg) from e # API in addition to `concurrent.futures.LocalFuture` @property def uuid(self) -> str: return self._uuid
[docs] def aborted(self) -> bool: if self._is_slurm: slurm_client = self._native_future.slurm_client if slurm_client is None: return False status = slurm_client.get_status(self._native_future.job_id) return status == "CANCELLED" else: return False
[docs] def abort(self) -> bool: if self._is_slurm: self._native_future.abort() return self.aborted()
def _get_queue(self) -> str: return "local"