Source code for ewoksjob.cli_utils.submit

import traceback
from argparse import Namespace
from pprint import pprint
from typing import Callable
from typing import List
from typing import Literal
from typing import Optional
from typing import Union

import click
from ewoksutils.cli_utils import cli_submit_utils
from ewoksutils.cli_utils.cli_click import add_click_options

from ..bindings import submit_graph


@click.command("submit")
@add_click_options(cli_submit_utils.submit_arguments(shell=True))
def submit(cli_args: Namespace) -> Union[List[dict], Literal[0, 1]]:
    """Submit an Ewoks workflow."""
    result = command_submit(cli_args, shell=True)
    if result:
        click.get_current_context().exit(result)


[docs] def command_submit( cli_args: Namespace, _convert_graph: Optional[Callable] = None, shell: bool = False ) -> Union[List[dict], Literal[0, 1]]: cli_submit_utils.parse_submit_arguments(cli_args, shell=shell) return_code = 0 keep_results = [] futures = list() for workflow, graph in zip(cli_args.workflows, cli_args.graphs): future = submit_graph( graph, _convert_graph=_convert_graph, engine=cli_args.engine, resolve_graph_remotely=cli_args.resolve_graph_remotely, **cli_args.execute_options, _celery_options=cli_args.cparameters, ) print(f"Workflow '{workflow}' submitted (ID: {future.uuid})") futures.append(future) if cli_args.wait < 0: if shell: return return_code return keep_results print("Waiting for results ...") print() for workflow, future in zip(cli_args.workflows, futures): print( "###########################################################################" ) print(f"# Result of workflow '{workflow}' (ID: {future.uuid})") print( "###########################################################################" ) try: results = future.result(timeout=cli_args.wait) except Exception as ex: if _is_timeout(ex): print(f"Not finished after {cli_args.wait}s") else: traceback.print_exc() print("FAILED") results = ex return_code = 1 else: if cli_args.outputs == "none": if results is None: print("FAILED") else: print("FINISHED") else: pprint(results) print("FINISHED") if results is None: return_code = 1 finally: print() if not shell: keep_results.append(results) if shell: return return_code return keep_results
def _is_timeout(exception: Optional[Exception]) -> bool: if exception is None: return False if isinstance(exception, TimeoutError): return True if _is_timeout(exception.__cause__): return True if _is_timeout(exception.__context__): return True return False