Skip to content

Commit

Permalink
Defer connection to systemd journal
Browse files Browse the repository at this point in the history
  • Loading branch information
Unrud committed Mar 22, 2023
1 parent e23f028 commit 5070533
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 27 deletions.
62 changes: 37 additions & 25 deletions radicale/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@
"""

import contextlib
import io
import logging
import os
import socket
import struct
import sys
import threading
import time
from typing import (Any, Callable, ClassVar, Dict, Iterator, Optional, Tuple,
Union)
from typing import (Any, Callable, ClassVar, Dict, Iterator, Mapping, Optional,
Tuple, Union, cast)

from radicale import types

Expand Down Expand Up @@ -85,49 +86,60 @@ class ThreadedStreamHandler(logging.Handler):
_streams: Dict[int, types.ErrorStream]
_journal_stream_id: Optional[Tuple[int, int]]
_journal_socket: Optional[socket.socket]
_journal_socket_failed: bool

def __init__(self) -> None:
super().__init__()
self._streams = {}
self._journal_stream_id = None
with contextlib.suppress(TypeError, ValueError):
dev, inode = os.environ.get("JOURNAL_STREAM", "").split(":", 1)
self._journal_stream_id = int(dev), int(inode)
self._journal_stream_id = (int(dev), int(inode))
self._journal_socket = None
if self._journal_stream_id and hasattr(socket, "AF_UNIX"):
journal_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
journal_socket.connect("/run/systemd/journal/socket")
except OSError:
journal_socket.close()
else:
self._journal_socket = journal_socket
self._journal_socket_failed = False

def _detect_journal(self, stream):
if not self._journal_stream_id:
def _detect_journal(self, stream: types.ErrorStream) -> bool:
if not self._journal_stream_id or not isinstance(stream, io.IOBase):
return False
try:
stat = os.fstat(stream.fileno())
except Exception:
except OSError:
return False
return self._journal_stream_id == (stat.st_dev, stat.st_ino)

@staticmethod
def _encode_journal(data):
def _encode_journal(data: Mapping[str, Optional[Union[str, int]]]
) -> bytes:
msg = b""
for key, value in data.items():
if value is None:
continue
key = key.encode()
value = str(value).encode()
if b"\n" in value:
msg += (key + b"\n" +
struct.pack("<Q", len(value)) + value + b"\n")
keyb = key.encode()
valueb = str(value).encode()
if b"\n" in valueb:
msg += (keyb + b"\n" +
struct.pack("<Q", len(valueb)) + valueb + b"\n")
else:
msg += key + b"=" + value + b"\n"
msg += keyb + b"=" + valueb + b"\n"
return msg

def _emit_journal(self, record):
def _try_emit_journal(self, record: logging.LogRecord) -> bool:
if not self._journal_socket:
# Try to connect to systemd journal socket
if self._journal_socket_failed or not hasattr(socket, "AF_UNIX"):
return False
journal_socket = None
try:
journal_socket = socket.socket(
socket.AF_UNIX, socket.SOCK_DGRAM)
journal_socket.connect("/run/systemd/journal/socket")
except OSError:
self._journal_socket_failed = True
if journal_socket:
journal_socket.close()
return False
self._journal_socket = journal_socket

priority = {"DEBUG": 7,
"INFO": 6,
"WARNING": 4,
Expand All @@ -136,7 +148,7 @@ def _emit_journal(self, record):
timestamp = time.strftime("%Y-%m-%dT%H:%M:%S.%%03dZ",
time.gmtime(record.created)) % record.msecs
data = {"PRIORITY": priority,
"TID": record.tid,
"TID": cast(Optional[int], getattr(record, "tid", None)),
"SYSLOG_IDENTIFIER": record.name,
"SYSLOG_FACILITY": 1,
"SYSLOG_PID": record.process,
Expand All @@ -146,12 +158,12 @@ def _emit_journal(self, record):
"CODE_FUNC": record.funcName,
"MESSAGE": self.format(record)}
self._journal_socket.sendall(self._encode_journal(data))
return True

def emit(self, record: logging.LogRecord) -> None:
try:
stream = self._streams.get(threading.get_ident(), sys.stderr)
if self._journal_socket and self._detect_journal(stream):
self._emit_journal(record)
if self._detect_journal(stream) and self._try_emit_journal(record):
return
msg = self.format(record)
stream.write(msg)
Expand Down
4 changes: 2 additions & 2 deletions radicale/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def read(self, size: int = ...) -> bytes: ...

@runtime_checkable
class ErrorStream(Protocol):
def flush(self) -> None: ...
def write(self, s: str) -> None: ...
def flush(self) -> object: ...
def write(self, s: str) -> object: ...
else:
ErrorStream = Any
InputStream = Any
Expand Down

0 comments on commit 5070533

Please sign in to comment.