Submit an Ewoks workflow#

Example workflow#

workflow = {
    "graph": {"id": "mygraph"},
    "nodes": [
        {"id": "task1", "task_type": "method", "task_identifier": "numpy.add"},
        {"id": "task2", "task_type": "method", "task_identifier": "numpy.add"},
    ],
    "links": [
        {
            "source": "task1",
            "target": "task2",
            "data_mapping": [{"source_output": "return_value", "target_input": 0}],
        }
    ],
}

Job arguments#

Handler configuration for sending/receiving Ewoks events through Redis

import os

# Redis backend
events_url = "redis://localhost:10003/2"
handlers = [
    {
        "class": "ewoksjob.events.handlers.RedisEwoksEventHandler",
        "arguments": [{"name": "url", "value": events_url}],
    }
]

# SQLite backend (does not support task monitoring or cancelling)
events_url = f"file://{os.path.join(..., 'ewoks_events.db')}"
handlers = [
    {
        "class": "ewoksjob.events.handlers.Sqlite3EwoksEventHandler",
        "arguments": [{"name": "uri", "value": events_url}],
    }
]

Job arguments are the same as the arguments of ewoks.execute_graph

varinfo = {"root_uri": ..., "scheme": "nexus"}
inputs = [
    {"id": "task1", "name": 0, "value": 1},
    {"id": "task1", "name": 1, "value": 2},
    {"id": "task2", "name": 1, "value": 3},
]
execinfo = {"handlers": handlers}
args = (workflow,)
kwargs = {
    "engine": None,
    "execinfo": execinfo,
    "inputs": inputs,
    "varinfo": varinfo,
    "outputs": [{"all": False}],
}

Submit workflow#

Execute workflow and get results

from ewoksjob.client import submit

future = submit(args=args, kwargs=kwargs)
job_id = future.uuid
# events could be received in the mean time (see below)
workflow_results = future.result(timeout=3)
assert workflow_results == {"return_value": 6}

Monitor Ewoks events#

Get intermediate results from Ewoks events

from ewoksjob.events.readers import read_ewoks_events

with read_ewoks_events(events_url) as reader:
    results_during_execution = list(reader.get_events(job_id=job_id))
    assert len(results_during_execution) == 8  # start/stop for job, workflow and node

    # Get start event of node "task1"
    result_event = list(
        reader.get_events_with_variables(job_id=job_id, node_id="task1", type="start")
    )
    assert len(result_event) == 1
    result_event = result_event[0]

    # Get access to all output variables of "task1"
    results = result_event["outputs"]
    assert results["return_value"].value == 3