Source code for uwsift.util.watchdog

#!/usr/bin/env python

import logging
import os
import shlex
import signal
import subprocess  # nosec: B404
from datetime import datetime, timedelta, timezone
from socket import gethostname
from time import sleep
from typing import List, Optional, Tuple

import appdirs
from donfig import Config
from psutil import NoSuchProcess, Process

from uwsift.util.default_paths import APPLICATION_AUTHOR, APPLICATION_NAME

LOG = logging.getLogger(__name__)


# This constant must be kept in sync with the definition in the following
# file: uwsift/__main__.py
WATCHDOG_DATETIME_FORMAT_STORE = "%Y-%m-%d %H:%M:%S %z"


[docs] def get_config_value(config: Config, key: str) -> str: """ Wrapper for the `get` method from the donfig library, which provides a more friendly error message if the key doesn't exist. :param config: Config object from donfig :param key: key to the config value :return: str or dict with the config value """ try: return config.get(key) except KeyError: raise KeyError(f"Can't find `{key}` in the watchdog config")
[docs] class Watchdog: ask_again_interval: Optional[timedelta] = None restart_interval: Optional[timedelta] = None allowed_mem_usage: Optional[int] = None notification_cmd: Optional[str] = None def __init__(self, config_dirs: List[str], cache_dir: str): """ Create a new Watchdog object. :param config_dirs: List of search paths for the watchdog YAML configuration files :param cache_dir: Path to the SIFT caching directory, which is used as the default location of the heartbeat file. """ self.hostname = gethostname() config = Config("uwsift", paths=config_dirs) heartbeat_file = get_config_value(config, "watchdog.heartbeat_file") self.heartbeat_file = heartbeat_file.replace("$$CACHE_DIR$$", cache_dir) notification_cmd = config.get("watchdog.notification_cmd", None) if not notification_cmd: LOG.warning("Can't send notifications" " because `notification_cmd` isn't configured") else: self.notification_cmd = shlex.quote(notification_cmd) self.heartbeat_check_interval = float(get_config_value(config, "watchdog.heartbeat_check_interval")) self.max_tolerable_dataset_age = float(get_config_value(config, "watchdog.max_tolerable_dataset_age")) self.max_tolerable_idle_time = float(get_config_value(config, "watchdog.max_tolerable_idle_time")) restart_interval = int(config.get("watchdog.auto_restart_interval", 0)) if restart_interval == 0: LOG.warning("Auto Restart is disabled") else: self.restart_interval = timedelta(seconds=restart_interval) ask_again_interval = int(config.get("watchdog.auto_restart_ask_again_interval", 0)) if ask_again_interval == 0: LOG.warning("Auto Restart will ask the user only once") else: self.ask_again_interval = timedelta(seconds=ask_again_interval) allowed_max_mem = config.get("watchdog.max_memory_consumption", None) if allowed_max_mem: self.allowed_mem_usage = self._parse_byte_count(allowed_max_mem) else: LOG.warning("Memory consumption won't be checked") # Variable runtime state self.old_pid = None self.application_start_time = None self.sent_restart_request = False @staticmethod def _parse_byte_count(byte_count_str: str) -> int: """ Parses the str representation of a byte count and converts the units `M` (*Mebibytes*) and `G` (*Gibibytes*) into the appropriate byte count. **Note:** To be compatible with *systemd* the units are interpreted with the base 1024 (not 1000) although they are called *Megabytes* and *Gigabytes* there, see e.g. https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#MemoryHigh=bytes :param byte_count: str of the byte count with unit suffix :return: number of bytes as int """ if not byte_count_str: raise ValueError("expected a unit like `M` or `G`") byte_count, unit = int(byte_count_str[:-1]), byte_count_str[-1] MEBIBYTE_BYTES = 1024**2 GIBIBYTE_BYTES = 1024**3 if unit == "M": return byte_count * MEBIBYTE_BYTES elif unit == "G": return byte_count * GIBIBYTE_BYTES else: raise ValueError(f"byte count contains unknown unit: {unit}") def _read_watchdog_file(self) -> Tuple[int, datetime]: """ Open the watchdog file and parse it. :return: tuple of the PID as int and datetime of the dataset creation """ with open(self.heartbeat_file) as file: content = file.read() pid, timestamp = content.splitlines() return int(pid), datetime.strptime(timestamp, WATCHDOG_DATETIME_FORMAT_STORE) def _notify(self, level: int, text: str): """ If the notification_cmd was defined in the config, then invoke the command using the subprocess API. Otherwise print text with the specified level using the logging API. :param level: int as defined in the logging package :param text: message to log """ if not self.notification_cmd: LOG.log(level, text) else: machine = shlex.quote(self.hostname) process_name = shlex.quote(f"{APPLICATION_NAME}-watchdog") severity = shlex.quote(logging.getLevelName(level)) text = shlex.quote(text) cmd = [self.notification_cmd, machine, process_name, severity, text] try: subprocess.run(cmd, shell=True, check=True) # nosec: B602 except subprocess.CalledProcessError as err: LOG.error(f"Can't run the notification command: {err}") LOG.log(level, text) def _get_process_tree(self, pid: int) -> List[Process]: """ Traverse the process tree recursively and get all child processes of the specified process. :param pid: PID of process which may have child processes :return: list of specified process with all child processes """ try: process = Process(pid) except NoSuchProcess: return [] processes = [process] for child_process in process.children(): processes.extend(self._get_process_tree(child_process)) return processes def _get_memory_consumption(self, pid: int) -> int: """ Calculates the memory consumption in bytes of the specified process and all its child processes. Shared memory will only be counted once. :param pid: PID of the process :return: memory consumption in bytes or 0 if the process isn't alive any more """ pss_sum = 0 for process in self._get_process_tree(pid): try: mem_info = process.memory_full_info() pss_sum += mem_info.pss except NoSuchProcess: pass return pss_sum def _restart_application(self, pid: int): """ Issue a restart request using the SIGUSR1 signal. The process may choose to ignore this request. Don't use SIGTERM, because ``systemctl --user stop uwsift`` should terminate SIFT without asking the user. However the application should be able to save its state, so don't use SIGKILL either. This function doesn't use the command ``systemctl restart uwsift``, because the application may choose to ignore the restart request. Systemd first sends SIGTERM and then SIGKILL if the application is still alive. The wait time between SIGTERM and SIGKILL could be configured by ``TimeoutStopSec=infinity`` in order to accommodate this use case, but by doing so the ``systemctl stop uwsift`` command can't reliably terminate the program in case of a hang. :param pid: PID of SIFT as int """ try: os.kill(pid, signal.SIGUSR1) self._notify(logging.INFO, f"Sent restart request to {pid}") except ProcessLookupError: self._notify(logging.WARNING, f"Can't issue restart request because" f" the PID {pid} doesn't exist")
[docs] def run(self): """ Run the watchdog in blocking mode. The watchdog will read the heartbeat file periodically and check whether the timestamp of the dataset creation is too old. Additional it will ensure that the application is restarted from time to time and that the memory consumption isn't too high. """ while True: if self.restart_interval is None: sleep(self.heartbeat_check_interval) else: sleep(min(self.heartbeat_check_interval, self.restart_interval.seconds)) try: pid, latest_dataset_time = self._read_watchdog_file() except ValueError as err: self._notify(logging.ERROR, f"Can't parse the watchdog file:" f" {err}") continue except FileNotFoundError: # application might not have been started yet or # the application is running but no data was loaded yet self._notify(logging.INFO, f"Heartbeat file doesn't exist:" f" {self.heartbeat_file}") continue now_utc = datetime.now(tz=timezone.utc) self._check_idle_time(now_utc) self._check_dataset_age(latest_dataset_time, now_utc) if self.application_start_time is None: self.application_start_time = now_utc self._check_application_restarted(now_utc, pid) self._check_restart_application_on_memory_shortage(pid) self._check_restart_application_on_runtime_exceeded(now_utc, pid)
def _check_restart_application_on_runtime_exceeded(self, now_utc, pid): if self.restart_interval is not None and not self.sent_restart_request: runtime = now_utc - self.application_start_time if runtime > self.restart_interval: self._restart_application(pid) if self.ask_again_interval is None: # send the restart request only once self.sent_restart_request = True else: self.application_start_time += self.ask_again_interval def _check_restart_application_on_memory_shortage(self, pid): if self.allowed_mem_usage: mem_usage = self._get_memory_consumption(pid) if mem_usage > self.allowed_mem_usage: LOG.warning(f"program uses too much memory: {mem_usage}") self._restart_application(pid) def _check_application_restarted(self, now_utc, pid): if self.old_pid is None: self.old_pid = pid elif self.old_pid != pid: self._notify(logging.INFO, f"Application was restarted: {pid}") self.application_start_time = now_utc self.old_pid = pid def _check_dataset_age(self, latest_dataset_time, now_utc): dataset_age = now_utc - latest_dataset_time if dataset_age.total_seconds() > self.max_tolerable_dataset_age: overdue_time = dataset_age.total_seconds() - self.max_tolerable_dataset_age self._notify( logging.WARNING, f"Current dataset scheduled time for observation" f" ('start_time'): {latest_dataset_time} -" f" Next dataset is overdue by " f"{overdue_time:.1f} seconds.", ) else: self._notify( logging.INFO, f"Current dataset scheduled time for observation" f" ('start_time'): {latest_dataset_time} - OK", ) def _check_idle_time(self, now_utc): # The last update time is implicitly stored as the file modification # time. It is possible to approximate this time by checking whether # the file content changes, but if the user loads the same dataset # again, then this update won't be detected. modification_time = datetime.fromtimestamp(os.path.getmtime(self.heartbeat_file), tz=timezone.utc) idle_time = now_utc - modification_time if idle_time.total_seconds() > self.max_tolerable_idle_time: self._notify(logging.WARNING, f"Dataset was not updated" f" since {modification_time}") else: self._notify( logging.INFO, f"Dataset was updated {idle_time.total_seconds()}" f" seconds ago at: {modification_time}", )
if __name__ == "__main__": user_cache_dir = appdirs.user_cache_dir(APPLICATION_NAME, APPLICATION_AUTHOR) user_config_dir = appdirs.user_config_dir(APPLICATION_NAME, APPLICATION_AUTHOR, roaming=True) config_dir = os.path.join(user_config_dir, "settings", "config") try: Watchdog([config_dir], user_cache_dir).run() except KeyboardInterrupt: pass