Source code for ewoksjob.tests.test_task_discovery
from ..client import celery
from ..client import local
from .utils import get_result
[docs]def test_submit(ewoks_worker):
assert_submit(celery)
[docs]def test_submit_local(local_ewoks_worker):
assert_submit(local)
[docs]def assert_submit(mod):
future1 = mod.discover_tasks_from_modules(args=("ewokscore",))
future2 = mod.get_future(future1.task_id)
results = get_result(future1, timeout=10)
assert results
results = get_result(future2, timeout=0)
assert results