dumb-init/testing/__init__.py

78 lines
1.9 KiB
Python
Raw Normal View History

2016-06-17 22:47:34 +03:00
import os
import re
2015-09-12 01:57:22 +03:00
import signal
2016-06-17 22:47:34 +03:00
import sys
from contextlib import contextmanager
from subprocess import PIPE
from subprocess import Popen
2015-09-12 01:57:22 +03:00
from py._path.local import LocalPath
# these signals cause dumb-init to suspend itself
SUSPEND_SIGNALS = frozenset([
signal.SIGTSTP,
signal.SIGTTOU,
signal.SIGTTIN,
])
NORMAL_SIGNALS = frozenset(
2016-03-12 00:42:56 +03:00
set(range(1, 32)) -
set([signal.SIGKILL, signal.SIGSTOP, signal.SIGCHLD]) -
SUSPEND_SIGNALS
2015-09-12 01:57:22 +03:00
)
2016-06-17 22:47:34 +03:00
@contextmanager
def print_signals(args=()):
"""Start print_signals and yield dumb-init process and print_signals PID."""
proc = Popen(
(
('dumb-init',) +
tuple(args) +
(sys.executable, '-m', 'testing.print_signals')
),
stdout=PIPE,
)
line = proc.stdout.readline()
m = re.match(b'^ready \(pid: ([0-9]+)\)\n$', line)
assert m, line
yield proc, m.group(1).decode('ascii')
for pid in pid_tree(proc.pid):
os.kill(pid, signal.SIGKILL)
2015-09-12 01:57:22 +03:00
def child_pids(pid):
"""Return a list of direct child PIDs for the given PID."""
pid = str(pid)
tasks = LocalPath('/proc').join(pid, 'task').listdir()
2015-09-12 03:24:18 +03:00
return set(
2015-09-12 01:57:22 +03:00
int(child_pid)
for task in tasks
for child_pid in task.join('children').read().split()
2015-09-12 03:24:18 +03:00
)
2015-09-12 01:57:22 +03:00
def pid_tree(pid):
"""Return a list of all descendant PIDs for the given PID."""
children = child_pids(pid)
2015-09-12 03:24:18 +03:00
return set(
2015-09-12 01:57:22 +03:00
pid
for child in children
for pid in pid_tree(child)
2015-09-12 03:24:18 +03:00
) | children
2015-09-12 01:57:22 +03:00
def is_alive(pid):
"""Return whether a process is running with the given PID."""
return LocalPath('/proc').join(str(pid)).isdir()
def process_state(pid):
"""Return a process' state, such as "stopped" or "running"."""
status = LocalPath('/proc').join(str(pid), 'status').read()
m = re.search('^State:\s+[A-Z] \(([a-z]+)\)$', status, re.MULTILINE)
return m.group(1)