Source code for ewoksjob.events.handlers.redis

import os
import json
import socket
import logging
import redis
from typing import Dict, Optional, Tuple

from ewoksutils.logging_utils.connection import ConnectionHandler
from ewokscore.events.handlers import EwoksEventHandlerMixIn


RedisRecordType = Tuple[str, Dict[str, str]]


[docs] class RedisEwoksEventHandler(EwoksEventHandlerMixIn, ConnectionHandler): # TODO: https://redisql.redbeardlab.com/blog/python/using-redisql-with-python/ def __init__(self, url: str, ttl=None): """An example url is "redis://localhost:10003?db=2".""" self._redis_url = url self._ttl = ttl super().__init__() def _connect(self, timeout=1) -> None: """This is called when no connection exists.""" client_name = f"ewoks:writer:{socket.gethostname()}:{os.getpid()}" self._connection = redis.Redis.from_url( self._redis_url, client_name=client_name ) def _disconnect(self) -> None: """This is called when a connection exists and is connected.""" self._connection.close() def _serialize_record(self, record: logging.LogRecord) -> Optional[RedisRecordType]: """Convert a record to something that can be given to the connection.""" job_id = getattr(record, "job_id", None) adict = dict() for field in self.FIELD_TYPES: value = getattr(record, field, None) adict[field] = json.dumps(value) return job_id, adict def _send_serialized_record(self, srecord: Optional[RedisRecordType]): """Send the output from `_serialize_record` to the connection.""" if not srecord: return job_id, value = srecord n = self._connection.incrby("ewoks_events_count") key = f"ewoks:{job_id}:{n}" self._connection.hset(key, mapping=value) if self._ttl: self._connection.expire(key, self._ttl)