# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of logilab-common. # # logilab-common is free software: you can redistribute it and/or modify it under # the terms of the GNU Lesser General Public License as published by the Free # Software Foundation, either version 2.1 of the License, or (at your option) any # later version. # # logilab-common is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License along # with logilab-common. If not, see . """module providing: * process information (linux specific: rely on /proc) * a class for resource control (memory / time / cpu time) This module doesn't work on windows platforms (only tested on linux) :organization: Logilab """ __docformat__ = "restructuredtext en" import os import stat from resource import getrlimit, setrlimit, RLIMIT_CPU, RLIMIT_AS from signal import signal, SIGXCPU, SIGKILL, SIGUSR2, SIGUSR1 from threading import Timer, currentThread, Thread, Event from time import time from logilab.common.tree import Node class NoSuchProcess(Exception): pass def proc_exists(pid): """check the a pid is registered in /proc raise NoSuchProcess exception if not """ if not os.path.exists('/proc/%s' % pid): raise NoSuchProcess() PPID = 3 UTIME = 13 STIME = 14 CUTIME = 15 CSTIME = 16 VSIZE = 22 class ProcInfo(Node): """provide access to process information found in /proc""" def __init__(self, pid): self.pid = int(pid) Node.__init__(self, self.pid) proc_exists(self.pid) self.file = '/proc/%s/stat' % self.pid self.ppid = int(self.status()[PPID]) def memory_usage(self): """return the memory usage of the process in Ko""" try : return int(self.status()[VSIZE]) except IOError: return 0 def lineage_memory_usage(self): return self.memory_usage() + sum([child.lineage_memory_usage() for child in self.children]) def time(self, children=0): """return the number of jiffies that this process has been scheduled in user and kernel mode""" status = self.status() time = int(status[UTIME]) + int(status[STIME]) if children: time += int(status[CUTIME]) + int(status[CSTIME]) return time def status(self): """return the list of fields found in /proc//stat""" return open(self.file).read().split() def name(self): """return the process name found in /proc//stat """ return self.status()[1].strip('()') def age(self): """return the age of the process """ return os.stat(self.file)[stat.ST_MTIME] class ProcInfoLoader: """manage process information""" def __init__(self): self._loaded = {} def list_pids(self): """return a list of existent process ids""" for subdir in os.listdir('/proc'): if subdir.isdigit(): yield int(subdir) def load(self, pid): """get a ProcInfo object for a given pid""" pid = int(pid) try: return self._loaded[pid] except KeyError: procinfo = ProcInfo(pid) procinfo.manager = self self._loaded[pid] = procinfo return procinfo def load_all(self): """load all processes information""" for pid in self.list_pids(): try: procinfo = self.load(pid) if procinfo.parent is None and procinfo.ppid: pprocinfo = self.load(procinfo.ppid) pprocinfo.append(procinfo) except NoSuchProcess: pass try: class ResourceError(BaseException): """Error raise when resource limit is reached""" limit = "Unknown Resource Limit" except NameError: class ResourceError(Exception): """Error raise when resource limit is reached""" limit = "Unknown Resource Limit" class XCPUError(ResourceError): """Error raised when CPU Time limit is reached""" limit = "CPU Time" class LineageMemoryError(ResourceError): """Error raised when the total amount of memory used by a process and it's child is reached""" limit = "Lineage total Memory" class TimeoutError(ResourceError): """Error raised when the process is running for to much time""" limit = "Real Time" # Can't use subclass because the StandardError MemoryError raised RESOURCE_LIMIT_EXCEPTION = (ResourceError, MemoryError) class MemorySentinel(Thread): """A class checking a process don't use too much memory in a separated daemonic thread """ def __init__(self, interval, memory_limit, gpid=os.getpid()): Thread.__init__(self, target=self._run, name="Test.Sentinel") self.memory_limit = memory_limit self._stop = Event() self.interval = interval self.setDaemon(True) self.gpid = gpid def stop(self): """stop ap""" self._stop.set() def _run(self): pil = ProcInfoLoader() while not self._stop.isSet(): if self.memory_limit <= pil.load(self.gpid).lineage_memory_usage(): os.killpg(self.gpid, SIGUSR1) self._stop.wait(self.interval) class ResourceController: def __init__(self, max_cpu_time=None, max_time=None, max_memory=None, max_reprieve=60): if SIGXCPU == -1: raise RuntimeError("Unsupported platform") self.max_time = max_time self.max_memory = max_memory self.max_cpu_time = max_cpu_time self._reprieve = max_reprieve self._timer = None self._msentinel = None self._old_max_memory = None self._old_usr1_hdlr = None self._old_max_cpu_time = None self._old_usr2_hdlr = None self._old_sigxcpu_hdlr = None self._limit_set = 0 self._abort_try = 0 self._start_time = None self._elapse_time = 0 def _hangle_sig_timeout(self, sig, frame): raise TimeoutError() def _hangle_sig_memory(self, sig, frame): if self._abort_try < self._reprieve: self._abort_try += 1 raise LineageMemoryError("Memory limit reached") else: os.killpg(os.getpid(), SIGKILL) def _handle_sigxcpu(self, sig, frame): if self._abort_try < self._reprieve: self._abort_try += 1 raise XCPUError("Soft CPU time limit reached") else: os.killpg(os.getpid(), SIGKILL) def _time_out(self): if self._abort_try < self._reprieve: self._abort_try += 1 os.killpg(os.getpid(), SIGUSR2) if self._limit_set > 0: self._timer = Timer(1, self._time_out) self._timer.start() else: os.killpg(os.getpid(), SIGKILL) def setup_limit(self): """set up the process limit""" assert currentThread().getName() == 'MainThread' os.setpgrp() if self._limit_set <= 0: if self.max_time is not None: self._old_usr2_hdlr = signal(SIGUSR2, self._hangle_sig_timeout) self._timer = Timer(max(1, int(self.max_time) - self._elapse_time), self._time_out) self._start_time = int(time()) self._timer.start() if self.max_cpu_time is not None: self._old_max_cpu_time = getrlimit(RLIMIT_CPU) cpu_limit = (int(self.max_cpu_time), self._old_max_cpu_time[1]) self._old_sigxcpu_hdlr = signal(SIGXCPU, self._handle_sigxcpu) setrlimit(RLIMIT_CPU, cpu_limit) if self.max_memory is not None: self._msentinel = MemorySentinel(1, int(self.max_memory) ) self._old_max_memory = getrlimit(RLIMIT_AS) self._old_usr1_hdlr = signal(SIGUSR1, self._hangle_sig_memory) as_limit = (int(self.max_memory), self._old_max_memory[1]) setrlimit(RLIMIT_AS, as_limit) self._msentinel.start() self._limit_set += 1 def clean_limit(self): """reinstall the old process limit""" if self._limit_set > 0: if self.max_time is not None: self._timer.cancel() self._elapse_time += int(time())-self._start_time self._timer = None signal(SIGUSR2, self._old_usr2_hdlr) if self.max_cpu_time is not None: setrlimit(RLIMIT_CPU, self._old_max_cpu_time) signal(SIGXCPU, self._old_sigxcpu_hdlr) if self.max_memory is not None: self._msentinel.stop() self._msentinel = None setrlimit(RLIMIT_AS, self._old_max_memory) signal(SIGUSR1, self._old_usr1_hdlr) self._limit_set -= 1