#!/usr/bin/env python
import atexit
from time import sleep
from typing import Dict, List, Optional
from psutil import NoSuchProcess, Process, process_iter
[docs]
class DiskManagement:
"""
DiskManagement can be used to locate temporary files, which are not
cleaned up by the application itself. When the program terminates
a detailed report will be printed to stdout containing all open
files grouped by their access mode.
:param pid: Process Identifier of the process to be traced
:param cmdline: command line of the process to be traced
:raise ValueError: both pid and cmdline are None,
cmdline of process does not match with specified cmdline,
process with the specified command line could not be found
"""
open_files: Dict[str, str] = {}
def __init__(self, pid: Optional[int] = None, cmdline: Optional[str] = None):
atexit.register(self._print_open_files)
if pid is not None:
process = Process(pid)
if cmdline is not None:
process_cmdline = " ".join(process.cmdline())
if process_cmdline != cmdline:
raise ValueError(f"PID {pid} has wrong cmdline: {cmdline} -> {process_cmdline}")
elif cmdline is not None:
process = self._find_process(cmdline)
if process is None:
raise ValueError(f"process could not be found: {cmdline}")
else:
raise ValueError("pid or cmdline must be given")
self.processes = self._get_process_tree(process)
print("Observing the following processes:")
for process in self.processes:
process_cmdline = " ".join(process.cmdline())
print(f"\t{process.pid} -> {process_cmdline}")
@staticmethod
def _find_process(cmdline: str, timeout: float = 10.0, interval: float = 0.2) -> Optional[Process]:
"""
Find a process with the specified command line. The command line consists of
the executable name concatenated with the arguments using spaces.
:param cmdline: command line containing the executable name and the arguments
:param timeout: timeout in seconds after which None will be returned
:param interval: time in seconds between checks
:return: Process if it was found
"""
while timeout > 0:
for process in process_iter(["cmdline"]):
process_cmdline = " ".join(process.cmdline())
if process_cmdline == cmdline:
return process
timeout -= interval
sleep(interval)
return None
def _get_process_tree(self, process: Process) -> List[Process]:
"""
Traverse the process tree recursively and get all child
processes of the specified process.
:param process: process which may have child processes
:return: list of specified process with all child processes
"""
processes = [process]
for child_process in process.children():
processes.extend(self._get_process_tree(child_process))
return processes
[docs]
def collect_open_files(self, interval: float = 0.2) -> None:
"""
Collect a list of open files from all registered processes.
If a file is opened in read only mode, then the access mode
will be "r". If a file is opened in read/write mode, then the
access mode will be "r+". Also, a file can be reopened with
write privileges. Additionally, a message with the number of
new files will be printed to stdout.
:param interval: time between checks in seconds
"""
print("\nSearching for open files", end="", flush=True)
while len(self.processes) > 0:
new_file_counter = 0
dead_pids = []
for process in self.processes:
try:
open_files = process.open_files()
except (NoSuchProcess, PermissionError):
dead_pids.append(process.pid)
continue
for open_file in open_files:
old_access_mode = self.open_files.get(open_file.path)
if old_access_mode is None:
new_file_counter += 1
access_mode = "r+" if old_access_mode == "r+" else open_file.mode
self.open_files[open_file.path] = access_mode
self.processes = list(filter(lambda p: p.pid not in dead_pids, self.processes))
# indicate progress to the user by printing a dot for each new file
for _ in range(new_file_counter):
print(".", end="", flush=True)
sleep(interval)
def _print_open_files(self) -> None:
"""
Print all open files and group them into files with only
read access and files with read/write access.
:raise RuntimeError: entry has an invalid read/write mode
"""
read_files = []
write_files = []
for path, mode in self.open_files.items():
if mode == "r+":
write_files.append(path)
elif mode == "r":
read_files.append(path)
else:
raise RuntimeError(f"unknown mode `{mode}`: {path}")
if len(read_files) > 0:
print("\nREAD:")
for path in sorted(read_files):
print(f"\t{path}")
if len(write_files) > 0:
print("\nREAD + WRITE:")
for path in sorted(write_files):
print(f"\t{path}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser("Disk Management")
parser.add_argument("--pid", type=int, help="PID of the process, which should be traced")
parser.add_argument("--cmdline", default="python -m uwsift", help="find the process using the cmdline")
args = parser.parse_args()
try:
disk_management = DiskManagement(pid=args.pid, cmdline=args.cmdline)
disk_management.collect_open_files()
except KeyboardInterrupt:
pass