Source code for ewoksjob.config

import importlib
import logging
import os
import sys
import types
from pathlib import Path
from typing import Tuple

from celery import Celery
from celery.loaders.default import Loader
from ewoksutils import uri_utils

# WARNING: import as little as possible because gevent-patching might be done too late

logger = logging.getLogger(__name__)


[docs] class EwoksLoader(Loader): """Celery loader based on a configuration URI: python file, python module, yaml file, Beacon URL. Requires the environment variable CELERY_LOADER=ewoksjob.config.EwoksLoader """ def __init__(self, app: Celery) -> None: self.app = app super().__init__(app)
[docs] def read_configuration(self) -> dict: if "" not in sys.path: # This happens when the python process was launched # through a python console script sys.path.append("") cfg_uri = get_cfg_uri() if not cfg_uri: # Load from the module "celeryconfig" if available return super().read_configuration() file_type = get_cfg_type(cfg_uri) try: logger.info(f"Loading celery configuration '{cfg_uri}' (type: {file_type})") return read_configuration(cfg_uri=cfg_uri) except Exception as e: raise RuntimeError( f"Cannot load celery configuration from '{cfg_uri}' (type: {file_type})" ) from e
[docs] def get_cfg_uri() -> str: """Returns the celery configuration URI based on environment variables.""" cfg_uri = os.environ.get("EWOKS_CONFIG_URI") if cfg_uri: return cfg_uri cfg_uri = os.environ.get("CELERY_CONFIG_URI") # deprecate? if cfg_uri: return cfg_uri beacon_host = os.environ.get("BEACON_HOST", None) if beacon_host: return "beacon:///ewoks/config.yml" cfg_uri = os.environ.get("CELERY_CONFIG_MODULE") # the CLI option --config sets this environment variable if cfg_uri: return cfg_uri return ""
[docs] def get_cfg_type(cfg_uri: str) -> str: parsed = uri_utils.parse_uri(cfg_uri) if parsed.scheme == "beacon": return "yaml" if parsed.scheme in "file": file_path = uri_utils.path_from_uri(cfg_uri) if file_path.suffix in (".yaml", ".yml"): return "yaml" return "python" return parsed.scheme
[docs] def read_configuration(cfg_uri: str) -> dict: """Different types of URI's are supported: - Python module: - myproject.config - Python file: - /tmp/ewoks/config.py - Yaml file: - /tmp/ewoks/config.yml - Beacon yaml file: - beacon:///ewoks/config.yml (this requires the BEACON_HOST environment variable) - beacon://id22:25000/ewoks/config.yml """ file_type = get_cfg_type(cfg_uri) if file_type == "yaml": config = _read_yaml_config(cfg_uri) elif file_type == "python": config = _read_py_config(cfg_uri) else: raise ValueError(f"Configuration URL '{cfg_uri}' is not supported") # `celery.app.utils.Settings` converts all parameters to lower-case # but we are here before that happens. config = {k.lower(): v for k, v in config.items()} # Celery parameters need to be on the top level for `celery.app.utils.Settings`. if "celery" in config: celery_config = config.pop("celery") config = {**config, **celery_config} return config
def _read_yaml_config(resource: str) -> dict: try: from blissdata.beacon.files import read_config as bliss_read_config except ImportError as e: raise RuntimeError( f"Cannot get celery configuration '{resource}' from Beacon: blissdata is not installed" ) from e if sys.platform == "win32" and resource.startswith("file:///"): # Bug in blissdata. See https://gitlab.esrf.fr/bliss/blissdata/-/issues/22. resource = resource.replace("file:///", "file://") return bliss_read_config(resource) def _read_py_config(cfg_uri: str) -> dict: """Warning: this is not thread-safe and it has side-effects during execution""" file_path = uri_utils.path_from_uri(cfg_uri) sys_path, module = _get_config_module(file_path) keep_sys_path = list(sys.path) sys.path.insert(0, sys_path) try: config = vars(importlib.import_module(module)) finally: sys.path = keep_sys_path config = { k: v for k, v in config.items() if not k.startswith("_") and not isinstance(v, types.ModuleType) } return config def _get_config_module(path: Path) -> Tuple[str, str]: if path.is_file(): return str(path.parent.resolve()), path.stem return os.getcwd(), str(path)