Source code for ewoksjob.tests.utils

import logging
import subprocess
import time
from types import ModuleType

logger = logging.getLogger(__name__)


[docs] def has_redis() -> bool: try: import redis # noqa F401 except ImportError: return False return _check_redis_server()
def _check_redis_server() -> bool: try: result = subprocess.run( ["redis-server", "--version"], capture_output=True, text=True, check=True ) return bool(result.stdout.strip()) except Exception: return False
[docs] def wait_not_finished(mod: ModuleType, expected_uuids: set, timeout=3): """Wait until all running job ID's are `expected_uuids`""" if mod.__name__.endswith("celery") and not has_redis(): logger.warning( "memory and sqlite do not support task monitoring, sleep %f sec instead", timeout, ) time.sleep(timeout) return t0 = time.time() while True: uuids = set(mod.get_unfinished_uuids()) if uuids == expected_uuids: return dt = time.time() - t0 if dt > timeout: assert uuids == expected_uuids, f"{uuids} != {expected_uuids}" time.sleep(0.2)