Source code for ewoksjob.events.readers

from .base import *  # noqa F401
from .sqlite3 import Sqlite3EwoksEventReader  # noqa F401

try:
    from .redis import RedisEwoksEventReader  # noqa F401
except ImportError:
    pass


[docs] def instantiate_reader(url: str, **kw) -> EwoksEventReader: # noqa F405 s = url.lower() if any(s.startswith(scheme) for scheme in ("redis:", "rediss:", "unix:")): return RedisEwoksEventReader(url, **kw) elif s.startswith("file:"): return Sqlite3EwoksEventReader(url, **kw) else: raise ValueError(f"unknown scheme for '{url}'")