Source code for ewoksjob.client.local.tasks

import warnings
from typing import Mapping
from typing import Optional
from typing import Tuple
from uuid import uuid4

import ewoks
from ewokscore import task_discovery

from ..dummy_workflow import dummy_workflow
from .futures import LocalFuture
from .pool import get_active_pool

__all__ = [
    "execute_graph",
    "execute_test_graph",
    "convert_graph",
    "convert_workflow",
    "discover_tasks_from_modules",
    "discover_all_tasks",
]


[docs] def execute_graph( args: Optional[Tuple] = tuple(), kwargs: Optional[Mapping] = None ) -> LocalFuture: pool = get_active_pool() if kwargs is None: kwargs = dict() execinfo = kwargs.setdefault("execinfo", dict()) uuid = str(uuid4()) execinfo["job_id"] = uuid return pool.submit(ewoks.execute_graph, uuid=uuid, args=args, kwargs=kwargs)
[docs] def execute_test_graph( seconds=0, filename=None, kwargs: Optional[Mapping] = None ) -> LocalFuture: args = (dummy_workflow(),) if kwargs is None: kwargs = dict() kwargs["inputs"] = [ {"id": "sleep", "name": 0, "value": seconds}, {"id": "result", "name": "filename", "value": filename}, ] return execute_graph(args=args, kwargs=kwargs)
[docs] def convert_graph( args: Optional[Tuple] = tuple(), kwargs: Optional[Mapping] = None ) -> LocalFuture: pool = get_active_pool() return pool.submit(ewoks.convert_graph, args=args, kwargs=kwargs)
[docs] def convert_workflow(**kw) -> LocalFuture: warnings.warn( "convert_workflow is deprecated, use convert_graph instead", stacklevel=2 ) return convert_graph(**kw)
[docs] def discover_tasks_from_modules( args: Optional[Tuple] = tuple(), kwargs: Optional[Mapping] = None ) -> LocalFuture: pool = get_active_pool() return pool.submit( task_discovery.discover_tasks_from_modules, args=args, kwargs=kwargs )
[docs] def discover_all_tasks( args: Optional[Tuple] = tuple(), kwargs: Optional[Mapping] = None ) -> LocalFuture: pool = get_active_pool() return pool.submit(task_discovery.discover_all_tasks, args=args, kwargs=kwargs)