Source code for ewoksjob.config

import os
import sys
import types
import logging
import importlib
from pathlib import Path
from typing import Tuple
from urllib.parse import urlparse, ParseResult

from celery.loaders.default import Loader
from celery import Celery

try:
    from blissdata.beacon.files import read_config as bliss_read_config
except ImportError:
    bliss_read_config = None

logger = logging.getLogger(__name__)


[docs] class EwoksLoader(Loader): """Celery loader based on a configuration URI: python file, python module, yaml file, Beacon URL. Requires the environment variable CELERY_LOADER=ewoksjob.config.EwoksLoader """ def __init__(self, app: Celery) -> None: self.app = app super().__init__(app)
[docs] def read_configuration(self) -> dict: if "" not in sys.path: # This happens when the python process was launched # through a python console script sys.path.append("") cfg_uri = get_cfg_uri() if not cfg_uri: # Load from the module "celeryconfig" if available return super().read_configuration() try: file_type = get_cfg_type(cfg_uri) logger.info(f"Loading celery configuration '{cfg_uri}' (type: {file_type})") config = read_configuration(cfg_uri=cfg_uri) except Exception: raise RuntimeError( f"Cannot load celery configuration from '{cfg_uri}' (type: {file_type})" ) return config
[docs] def get_cfg_uri() -> str: """Returns the celery configuration URI based on environment variables.""" cfg_uri = os.environ.get("EWOKS_CONFIG_URI") if cfg_uri: return cfg_uri cfg_uri = os.environ.get("CELERY_CONFIG_URI") # deprecate? if cfg_uri: return cfg_uri beacon_host = os.environ.get("BEACON_HOST", None) if beacon_host: return "beacon:///ewoks/config.yml" cfg_uri = os.environ.get("CELERY_CONFIG_MODULE") # the CLI option --config sets this environment variable if cfg_uri: return cfg_uri return ""
[docs] def get_cfg_type(cfg_uri: str) -> str: presult = _parse_url(cfg_uri) if presult.scheme == "beacon": return "yaml" if presult.scheme in ("file", ""): cfg_uri = _url_to_filename(presult) ext = os.path.splitext(cfg_uri)[-1] if ext in (".yaml", "yml"): return "yaml" return "python" return ""
[docs] def read_configuration(cfg_uri: str) -> dict: """Different types of URI's are supported: - Python module: - myproject.config - Python file: - /tmp/ewoks/config.py - Yaml file: - /tmp/ewoks/config.yml - Beacon yaml file: - beacon:///ewoks/config.yml (this requires the BEACON_HOST environment variable) - beacon://id22:25000/ewoks/config.yml """ file_type = get_cfg_type(cfg_uri) if file_type == "yaml": config = _read_yaml_config(cfg_uri) elif file_type == "python": config = _read_py_config(cfg_uri) else: raise ValueError(f"Configuration URL '{cfg_uri}' is not supported") if "celery" in config: config = config["celery"] elif "CELERY" in config: config = config["CELERY"] return config
def _parse_url(url: str) -> ParseResult: presult = urlparse(url) if presult.scheme == "beacon": # beacon:///path/to/file.yml # beacon://id00:25000/path/to/file.yml return presult elif presult.scheme in ("file", ""): # /path/to/file.yaml # file:///path/to/file.yaml return presult elif sys.platform == "win32" and len(presult.scheme) == 1: # c:\\path\\to\\file.yaml return urlparse(f"file://{url}") else: return presult def _url_to_filename(presult: ParseResult) -> str: if presult.netloc and presult.path: # urlparse("file://c:/a/b") return presult.netloc + presult.path elif presult.netloc: # urlparse("file://c:\\a\\b") return presult.netloc else: # urlparse("file:///a/b") return presult.path def _read_yaml_config(resource: str) -> dict: if bliss_read_config is None: raise RuntimeError( f"Cannot get celery configuration '{resource}' from Beacon: blissdata is not installed" ) return bliss_read_config(resource) def _read_py_config(cfg_uri: str) -> dict: """Warning: this is not thread-safe and it has side-effects during execution""" presult = _parse_url(cfg_uri) sys_path, module = _get_config_module(_url_to_filename(presult)) keep_sys_path = list(sys.path) sys.path.insert(0, sys_path) try: config = vars(importlib.import_module(module)) config = { k: v for k, v in config.items() if not k.startswith("_") and not isinstance(v, types.ModuleType) } return config finally: sys.path = keep_sys_path def _get_config_module(cfg_uri: str) -> Tuple[str, str]: path = Path(cfg_uri) if path.is_file(): parent = str(path.parent.resolve()) return parent, path.stem return os.getcwd(), cfg_uri