Source code for ewoksjob.events.handlers.redis
import json
import logging
import os
import socket
from typing import Dict
from typing import Optional
from typing import Tuple
import redis
from ewokscore.events.handlers import EwoksEventHandlerMixIn
from ewoksutils.logging_utils.connection import ConnectionHandler
RedisRecordType = Tuple[str, Dict[str, str]]
[docs]
class RedisEwoksEventHandler(EwoksEventHandlerMixIn, ConnectionHandler):
# TODO: https://redisql.redbeardlab.com/blog/python/using-redisql-with-python/
def __init__(self, url: str, ttl=None):
"""An example url is "redis://localhost:10003?db=2"."""
self._redis_url = url
self._ttl = ttl
super().__init__()
def _connect(self, timeout=1) -> None:
"""This is called when no connection exists."""
client_name = f"ewoks:writer:{socket.gethostname()}:{os.getpid()}"
self._connection = redis.Redis.from_url(
self._redis_url, client_name=client_name
)
def _disconnect(self) -> None:
"""This is called when a connection exists and is connected."""
self._connection.close()
def _serialize_record(self, record: logging.LogRecord) -> Optional[RedisRecordType]:
"""Convert a record to something that can be given to the connection."""
job_id = getattr(record, "job_id", None)
adict = dict()
for field in self.FIELD_TYPES:
value = getattr(record, field, None)
adict[field] = json.dumps(value)
return job_id, adict
def _send_serialized_record(self, srecord: Optional[RedisRecordType]):
"""Send the output from `_serialize_record` to the connection."""
if not srecord:
return
job_id, value = srecord
n = self._connection.incrby("ewoks_events_count")
key = f"ewoks:{job_id}:{n}"
self._connection.hset(key, mapping=value)
if self._ttl:
self._connection.expire(key, self._ttl)