Source code for ewoksjob.events.readers.base

import time
from typing import Dict, Iterable, Optional, Tuple
import json
from datetime import datetime
from threading import Event
from ewoksutils.datetime_utils import fromisoformat

try:
    from ewokscore.variable import Variable, VariableContainer
except ImportError:
    Variable = VariableContainer = None

__all__ = ["EventType", "EwoksEventReader"]

EventType = Dict[str, str]


[docs] class EwoksEventReader: """Base class for receiving ewoks events on the client side.""" def __del__(self): self.close()
[docs] def close(self): pass
[docs] def wait_events( self, timeout=None, stop_event: Optional[Event] = None, **filters ) -> Iterable[EventType]: """Yield events matching the filter until timeout is reached.""" raise NotImplementedError
[docs] def poll_events( self, timeout=None, stop_event: Optional[Event] = None, interval=0.1, **filters ) -> Iterable[EventType]: """Yield events matching the filter until timeout is reached.""" start = time.time() n = 0 while True: try: events = list(self.get_events(**filters)) except Exception as e: if "no such table" not in str(e): raise else: events = events[n:] n += len(events) yield from events if timeout is not None and (time.time() - start) > timeout: return if stop_event is not None and stop_event.is_set(): return time.sleep(interval)
[docs] def get_events(self, **filters) -> Iterable[EventType]: """Returns all currently available events matching the filter.""" raise NotImplementedError
[docs] def wait_events_with_variables(self, *args, **kwargs) -> Iterable[EventType]: """`get_events` with URI dereferencing.""" for event in self.wait_events(*args, **kwargs): self.dereference_data_uris(event) yield event
[docs] def poll_events_with_variables(self, *args, **kwargs) -> Iterable[EventType]: """`get_events` with URI dereferencing.""" for event in self.poll_events(*args, **kwargs): self.dereference_data_uris(event) yield event
[docs] def get_events_with_variables(self, *args, **kwargs) -> Iterable[EventType]: """`get_events` with URI dereferencing.""" for event in self.get_events(*args, **kwargs): self.dereference_data_uris(event) yield event
[docs] def get_full_job_events(self, **filters) -> Iterable[Tuple[EventType]]: """Returns events grouped by "job_id". When one event matches the filter, all events with the "job_id" are returned. """ job_id = None for event in self.get_events(**filters): if job_id != event["job_id"]: job_id = event["job_id"] yield tuple(self.get_events(job_id=job_id))
[docs] def get_full_job_events_with_variables( self, **filters ) -> Iterable[Tuple[EventType]]: """`get_full_job_events` with URI dereferencing.""" job_id = None for event in self.get_events(**filters): if job_id != event["job_id"]: job_id = event["job_id"] yield tuple(self.get_events_with_variables(job_id=job_id))
[docs] @staticmethod def dereference_data_uris(event: EventType) -> None: if Variable is None: raise ImportError("requires 'ewoks'") input_uris = event.get("input_uris") if input_uris: if isinstance(input_uris, str): input_uris = json.loads(input_uris) inputs = { uri["name"]: ( Variable(data_uri=uri["value"]) if uri["value"] else Variable() ) for uri in input_uris } event["inputs"] = VariableContainer(inputs) task_uri = event.get("task_uri") if task_uri: event["outputs"] = VariableContainer(data_uri=task_uri)
[docs] @staticmethod def event_passes_filter( event: EventType, starttime: Optional[datetime] = None, endtime: Optional[datetime] = None, ) -> bool: if not (starttime or endtime): return True time = fromisoformat(event["time"]) if starttime is not None: if isinstance(starttime, str): starttime = fromisoformat(starttime) if time < starttime: return False if endtime is not None: if isinstance(endtime, str): endtime = fromisoformat(endtime) if time > endtime: return False return True
[docs] @staticmethod def split_filter( starttime: Optional[datetime] = None, endtime: Optional[datetime] = None, **is_equal_filter ) -> Tuple[dict, dict]: """Splits the filter to be applied on the list of events fetched from the database """ if starttime and not isinstance(starttime, datetime): raise TypeError("starttime needs to be a datetime object") if endtime and not isinstance(endtime, datetime): raise TypeError("starttime needs to be a datetime object") post_filter = {"starttime": starttime, "endtime": endtime} return is_equal_filter, post_filter