Source code for ewoksjob.bindings

import json
import logging
from typing import Any
from typing import Callable
from typing import Optional

import yaml

from .client import submit
from .client.futures import FutureInterface

logger = logging.getLogger(__name__)


[docs] def submit_graph( graph, _convert_graph: Optional[Callable] = None, _celery_options: Optional[dict] = None, resolve_graph_remotely: Optional[bool] = None, load_options: Optional[dict] = None, **options, ) -> FutureInterface: """Submit a workflow to be executed remotely. The workflow is resolved on the client-side by default (e.g. load from a file) but can optionally be resolved remotely. """ if submit is None: raise RuntimeError("requires the 'ewoksjob' package") if _celery_options is None: _celery_options = dict() if resolve_graph_remotely: deserialized_graph = None else: if _convert_graph is None: _convert_graph = _load_graph try: deserialized_graph = _load_graph(graph, load_options=load_options) ex = None except Exception as e: ex = e deserialized_graph = None if deserialized_graph is None: if ex: logger.warning( "Failed loading the graph on the client side (%s). Try loading it remotely.", ex, ) else: logger.warning( "Failed loading the graph on the client side. Try loading it remotely." ) if deserialized_graph is None: options["load_options"] = load_options else: graph = deserialized_graph return submit(args=(graph,), kwargs=options, **_celery_options)
def _load_graph(graph, load_options: Optional[dict] = None) -> Any: if not isinstance(graph, str): return if not graph.endswith((".json", ".yml", ".yaml")): return with open(graph, "r") as f: if graph.endswith(".json"): return json.load(f, **load_options) else: return yaml.safe_load(f, **load_options)