Source code for ewoksjob.events.readers.base

import json
import time
from datetime import datetime
from threading import Event
from typing import Dict
from typing import Iterator
from typing import Optional
from typing import Tuple

from ewoksutils.datetime_utils import fromisoformat

try:
    from ewokscore.variable import Variable
    from ewokscore.variable import VariableContainer
except ImportError:
    Variable = VariableContainer = None


EventType = Dict[str, str]


[docs] class EwoksEventReader: """Base class for receiving ewoks events on the client side.""" def __enter__(self) -> "EwoksEventReader": return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __del__(self): self.close()
[docs] def close(self): pass
[docs] def wait_events( self, timeout=None, stop_event: Optional[Event] = None, **filters ) -> Iterator[EventType]: """Yield events matching the filter until timeout is reached.""" raise NotImplementedError
[docs] def poll_events( self, timeout=None, stop_event: Optional[Event] = None, interval: Optional[float] = None, **filters, ) -> Iterator[EventType]: """Yield events matching the filter until timeout is reached.""" if interval is None: interval = 0.1 else: interval = max(interval, 0.001) start = time.time() n = 0 while True: try: events = list(self.get_events(**filters)) except Exception as e: if "no such table" not in str(e): raise else: events = events[n:] n += len(events) yield from events if timeout is not None and (time.time() - start) > timeout: return if stop_event is not None and stop_event.is_set(): return time.sleep(interval)
[docs] def get_events(self, **filters) -> Iterator[EventType]: """Returns all currently available events matching the filter.""" raise NotImplementedError
[docs] def wait_events_with_variables(self, *args, **kwargs) -> Iterator[EventType]: """`get_events` with URI dereferencing.""" for event in self.wait_events(*args, **kwargs): self.dereference_data_uris(event) yield event
[docs] def poll_events_with_variables(self, *args, **kwargs) -> Iterator[EventType]: """`get_events` with URI dereferencing.""" for event in self.poll_events(*args, **kwargs): self.dereference_data_uris(event) yield event
[docs] def get_events_with_variables(self, *args, **kwargs) -> Iterator[EventType]: """`get_events` with URI dereferencing.""" for event in self.get_events(*args, **kwargs): self.dereference_data_uris(event) yield event
[docs] def get_full_job_events(self, **filters) -> Iterator[Tuple[EventType, ...]]: """Returns events grouped by "job_id". When one event matches the filter, all events with the "job_id" are returned. """ job_id = None for event in self.get_events(**filters): if job_id != event["job_id"]: job_id = event["job_id"] yield tuple(self.get_events(job_id=job_id))
[docs] def get_full_job_events_with_variables( self, **filters ) -> Iterator[Tuple[EventType, ...]]: """`get_full_job_events` with URI dereferencing.""" job_id = None for event in self.get_events(**filters): if job_id != event["job_id"]: job_id = event["job_id"] yield tuple(self.get_events_with_variables(job_id=job_id))
[docs] @staticmethod def dereference_data_uris(event: EventType) -> None: if Variable is None: raise ImportError("requires 'ewoks'") input_uris = event.get("input_uris") if input_uris: if isinstance(input_uris, str): input_uris = json.loads(input_uris) inputs = { uri["name"]: ( Variable(data_uri=uri["value"]) if uri["value"] else Variable() ) for uri in input_uris } event["inputs"] = VariableContainer(inputs) task_uri = event.get("task_uri") if task_uri: event["outputs"] = VariableContainer(data_uri=task_uri)
[docs] @staticmethod def event_passes_filter( event: EventType, starttime: Optional[datetime] = None, endtime: Optional[datetime] = None, ) -> bool: if not (starttime or endtime): return True time = fromisoformat(event["time"]) if starttime is not None: if isinstance(starttime, str): starttime = fromisoformat(starttime) if time < starttime: return False if endtime is not None: if isinstance(endtime, str): endtime = fromisoformat(endtime) if time > endtime: return False return True
[docs] @staticmethod def split_filter( starttime: Optional[datetime] = None, endtime: Optional[datetime] = None, **is_equal_filter, ) -> Tuple[dict, dict]: """Splits the filter to be applied on the list of events fetched from the database """ if starttime and not isinstance(starttime, datetime): raise TypeError("starttime needs to be a datetime object") if endtime and not isinstance(endtime, datetime): raise TypeError("starttime needs to be a datetime object") post_filter = {"starttime": starttime, "endtime": endtime} return is_equal_filter, post_filter