Source code for uwsift.util.ps_profiler

#!/usr/bin/env python
from time import perf_counter, sleep
from typing import List, Optional

from psutil import NoSuchProcess, Process, process_iter


[docs] class PsProfiler: file = None def __init__(self, cmdline: str): self.main_process = self._find_process(cmdline) if self.main_process is None: raise RuntimeError(f"can't find a process with the following cmdline: {cmdline}") print(f"found process: {self.main_process.pid}") @staticmethod def _find_process(cmdline: str, timeout: float = 10.0, interval: float = 0.2) -> Optional[Process]: """ Find a process with the specified command line. The command line consists of the executable name concatenated with the arguments using spaces. :param cmdline: command line containing the executable name and the arguments :param timeout: timeout in seconds after which None will be returned :param interval: time in seconds between checks :return: Process if it was found """ while timeout > 0: for process in process_iter(["cmdline"]): process_cmdline = " ".join(process.cmdline()) if process_cmdline == cmdline: return process timeout -= interval sleep(interval) return None def _get_process_tree(self, process: Process) -> List[Process]: """ Traverse the process tree recursively and get all child processes of the specified process. :param process: process which may have child processes :return: list of specified process with all child processes """ processes = [process] for child_process in process.children(): processes.extend(self._get_process_tree(child_process)) return processes
[docs] def run(self, out_file: str, interval: float = 0.2): assert self.main_process is not None # nosec B101 # suppress mypy [union-attr] start_time = perf_counter() self.file = open(out_file, "w") self.file.write("time,pid,uss,pss,lib,shared,vms,rss\n") while True: pss_sum = 0 time_delta = perf_counter() - start_time try: processes = self._get_process_tree(self.main_process) except NoSuchProcess: try: self.main_process.status() continue except NoSuchProcess: self.file.close() self.file = None return pids = [] for process in processes: try: mem_info = process.memory_full_info() except NoSuchProcess: continue self.file.write( f"{time_delta:.4f},{process.pid},{mem_info.uss}," f"{mem_info.pss},{mem_info.lib},{mem_info.shared}," f"{mem_info.vms},{mem_info.rss}\n" ) pss_sum += mem_info.pss pids.append(process.pid) self.file.flush() print(f"Memory consumption of {pids}: {pss_sum}") sleep(interval)
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser("PsProfiler") parser.add_argument("--cmdline", default="python -m uwsift") parser.add_argument("--outfile", required=True) args = parser.parse_args() ps_profiler = None try: ps_profiler = PsProfiler(args.cmdline) ps_profiler.run(args.outfile) except KeyboardInterrupt: if ps_profiler is not None and ps_profiler.file is not None: ps_profiler.file.close()