Source code for ewoksjob.events.readers
from contextlib import contextmanager
from typing import Iterator
from .base import EventType # noqa F401
from .base import EwoksEventReader
from .sqlite3 import Sqlite3EwoksEventReader # noqa F401
try:
from .redis import RedisEwoksEventReader # noqa F401
except ImportError:
pass
[docs]
def instantiate_reader(url: str, **kw) -> EwoksEventReader:
s = url.lower()
if any(s.startswith(scheme) for scheme in ("redis:", "rediss:", "unix:")):
return RedisEwoksEventReader(url, **kw)
elif s.startswith("file:"):
return Sqlite3EwoksEventReader(url, **kw)
else:
raise ValueError(f"unknown scheme for '{url}'")
[docs]
@contextmanager
def read_ewoks_events(url: str, **kw) -> Iterator[EwoksEventReader]:
with instantiate_reader(url, **kw) as reader:
yield reader