summaryrefslogtreecommitdiff
path: root/flup/server/prefork.py
diff options
context:
space:
mode:
Diffstat (limited to 'flup/server/prefork.py')
-rw-r--r--flup/server/prefork.py364
1 files changed, 364 insertions, 0 deletions
diff --git a/flup/server/prefork.py b/flup/server/prefork.py
new file mode 100644
index 0000000..191a651
--- /dev/null
+++ b/flup/server/prefork.py
@@ -0,0 +1,364 @@
+# Copyright (c) 2005 Allan Saddi <allan@saddi.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+#
+# $Id$
+
+__author__ = 'Allan Saddi <allan@saddi.com>'
+__version__ = '$Revision$'
+
+import sys
+import os
+import socket
+import select
+import errno
+import signal
+
+class PreforkServer(object):
+ """
+ A preforked server model conceptually similar to Apache httpd(2). At
+ any given time, ensures there are at least minSpare children ready to
+ process new requests (up to a maximum of maxChildren children total).
+ If the number of idle children is ever above maxSpare, the extra
+ children are killed.
+
+ jobClass should be a class whose constructor takes at least two
+ arguments: the client socket and client address. jobArgs, which
+ must be a list or tuple, is any additional (static) arguments you
+ wish to pass to the constructor.
+
+ jobClass should have a run() method (taking no arguments) that does
+ the actual work. When run() returns, the request is considered
+ complete and the child process moves to idle state.
+ """
+ def __init__(self, minSpare=1, maxSpare=5, maxChildren=50,
+ jobClass=None, jobArgs=()):
+ self._minSpare = minSpare
+ self._maxSpare = maxSpare
+ self._maxChildren = max(maxSpare, maxChildren)
+ self._jobClass = jobClass
+ self._jobArgs = jobArgs
+
+ # Internal state of children. Maps pids to dictionaries with two
+ # members: 'file' and 'avail'. 'file' is the socket to that
+ # individidual child and 'avail' is whether or not the child is
+ # free to process requests.
+ self._children = {}
+
+ def run(self, sock):
+ """
+ The main loop. Pass a socket that is ready to accept() client
+ connections. Return value will be True or False indiciating whether
+ or not the loop was exited due to SIGHUP.
+ """
+ # Set up signal handlers.
+ self._keepGoing = True
+ self._hupReceived = False
+ self._installSignalHandlers()
+
+ # Don't want operations on main socket to block.
+ sock.setblocking(0)
+
+ # Main loop.
+ while self._keepGoing:
+ # Maintain minimum number of children.
+ while len(self._children) < self._maxSpare:
+ if not self._spawnChild(sock): break
+
+ # Wait on any socket activity from live children.
+ r = [x['file'] for x in self._children.values()
+ if x['file'] is not None]
+
+ if len(r) == len(self._children):
+ timeout = None
+ else:
+ # There are dead children that need to be reaped, ensure
+ # that they are by timing out, if necessary.
+ timeout = 2
+
+ try:
+ r, w, e = select.select(r, [], [], timeout)
+ except select.error, e:
+ if e[0] != errno.EINTR:
+ raise
+
+ # Scan child sockets and tend to those that need attention.
+ for child in r:
+ # Receive status byte.
+ try:
+ state = child.recv(1)
+ except socket.error, e:
+ if e[0] in (errno.EAGAIN, errno.EINTR):
+ # Guess it really didn't need attention?
+ continue
+ raise
+ # Try to match it with a child. (Do we need a reverse map?)
+ for pid,d in self._children.items():
+ if child is d['file']:
+ if state:
+ # Set availability status accordingly.
+ self._children[pid]['avail'] = state != '\x00'
+ else:
+ # Didn't receive anything. Child is most likely
+ # dead.
+ d = self._children[pid]
+ d['file'].close()
+ d['file'] = None
+ d['avail'] = False
+
+ # Reap children.
+ self._reapChildren()
+
+ # See who and how many children are available.
+ availList = filter(lambda x: x[1]['avail'], self._children.items())
+ avail = len(availList)
+
+ if avail < self._minSpare:
+ # Need to spawn more children.
+ while avail < self._minSpare and \
+ len(self._children) < self._maxChildren:
+ if not self._spawnChild(sock): break
+ avail += 1
+ elif avail > self._maxSpare:
+ # Too many spares, kill off the extras.
+ pids = [x[0] for x in availList]
+ pids.sort()
+ pids = pids[self._maxSpare:]
+ for pid in pids:
+ d = self._children[pid]
+ d['file'].close()
+ d['file'] = None
+ d['avail'] = False
+
+ # Clean up all child processes.
+ self._cleanupChildren()
+
+ # Restore signal handlers.
+ self._restoreSignalHandlers()
+
+ # Return bool based on whether or not SIGHUP was received.
+ return self._hupReceived
+
+ def _cleanupChildren(self):
+ """
+ Closes all child sockets (letting those that are available know
+ that it's time to exit). Sends SIGINT to those that are currently
+ processing (and hopes that it finishses ASAP).
+
+ Any children remaining after 10 seconds is SIGKILLed.
+ """
+ # Let all children know it's time to go.
+ for pid,d in self._children.items():
+ if d['file'] is not None:
+ d['file'].close()
+ d['file'] = None
+ if not d['avail']:
+ # Child is unavailable. SIGINT it.
+ try:
+ os.kill(pid, signal.SIGINT)
+ except OSError, e:
+ if e[0] != errno.ESRCH:
+ raise
+
+ def alrmHandler(signum, frame):
+ pass
+
+ # Set up alarm to wake us up after 10 seconds.
+ oldSIGALRM = signal.getsignal(signal.SIGALRM)
+ signal.signal(signal.SIGALRM, alrmHandler)
+ signal.alarm(10)
+
+ # Wait for all children to die.
+ while len(self._children):
+ try:
+ pid, status = os.wait()
+ except OSError, e:
+ if e[0] in (errno.ECHILD, errno.EINTR):
+ break
+ if self._children.has_key(pid):
+ del self._children[pid]
+
+ signal.signal(signal.SIGALRM, oldSIGALRM)
+
+ # Forcefully kill any remaining children.
+ for pid in self._children.keys():
+ try:
+ os.kill(pid, signal.SIGKILL)
+ except OSError, e:
+ if e[0] != errno.ESRCH:
+ raise
+
+ def _reapChildren(self):
+ """Cleans up self._children whenever children die."""
+ while True:
+ try:
+ pid, status = os.waitpid(-1, os.WNOHANG)
+ except OSError, e:
+ if e[0] == errno.ECHILD:
+ break
+ raise
+ if pid <= 0:
+ break
+ if self._children.has_key(pid): # Sanity check.
+ if self._children[pid]['file'] is not None:
+ self._children[pid]['file'].close()
+ del self._children[pid]
+
+ def _spawnChild(self, sock):
+ """
+ Spawn a single child. Returns True if successful, False otherwise.
+ """
+ # This socket pair is used for very simple communication between
+ # the parent and its children.
+ parent, child = socket.socketpair()
+ parent.setblocking(0)
+ child.setblocking(0)
+ try:
+ pid = os.fork()
+ except OSError, e:
+ if e[0] in (errno.EAGAIN, errno.ENOMEM):
+ return False # Can't fork anymore.
+ raise
+ if not pid:
+ # Child
+ child.close()
+ # Put child into its own process group.
+ pid = os.getpid()
+ os.setpgid(pid, pid)
+ # Restore signal handlers.
+ self._restoreSignalHandlers()
+ # Close copies of child sockets.
+ for f in [x['file'] for x in self._children.values()
+ if x['file'] is not None]:
+ f.close()
+ self._children = {}
+ try:
+ # Enter main loop.
+ self._child(sock, parent)
+ except KeyboardInterrupt:
+ pass
+ sys.exit(0)
+ else:
+ # Parent
+ parent.close()
+ d = self._children[pid] = {}
+ d['file'] = child
+ d['avail'] = True
+ return True
+
+ def _isClientAllowed(self, addr):
+ """Override to provide access control."""
+ return True
+
+ def _child(self, sock, parent):
+ """Main loop for children."""
+ while True:
+ # Wait for any activity on the main socket or parent socket.
+ r, w, e = select.select([sock, parent], [], [])
+
+ for f in r:
+ # If there's any activity on the parent socket, it
+ # means the parent wants us to die or has died itself.
+ # Either way, exit.
+ if f is parent:
+ return
+
+ # Otherwise, there's activity on the main socket...
+ try:
+ clientSock, addr = sock.accept()
+ except socket.error, e:
+ if e[0] == errno.EAGAIN:
+ # Or maybe not.
+ continue
+ raise
+
+ # Check if this client is allowed.
+ if not self._isClientAllowed(addr):
+ clientSock.close()
+ continue
+
+ # Notify parent we're no longer available.
+ try:
+ parent.send('\x00')
+ except socket.error, e:
+ # If parent is gone, finish up this request.
+ if e[0] != errno.EPIPE:
+ raise
+
+ # Do the job.
+ self._jobClass(clientSock, addr, *self._jobArgs).run()
+
+ # Tell parent we're free again.
+ try:
+ parent.send('\xff')
+ except socket.error, e:
+ if e[0] == errno.EPIPE:
+ # Parent is gone.
+ return
+ raise
+
+ # Signal handlers
+
+ def _hupHandler(self, signum, frame):
+ self._keepGoing = False
+ self._hupReceived = True
+
+ def _intHandler(self, signum, frame):
+ self._keepGoing = False
+
+ def _chldHandler(self, signum, frame):
+ # Do nothing (breaks us out of select and allows us to reap children).
+ pass
+
+ def _installSignalHandlers(self):
+ """Installs signal handlers."""
+ self._oldSIGs = [(x,signal.getsignal(x)) for x in
+ (signal.SIGHUP, signal.SIGINT, signal.SIGQUIT,
+ signal.SIGTERM, signal.SIGCHLD)]
+ signal.signal(signal.SIGHUP, self._hupHandler)
+ signal.signal(signal.SIGINT, self._intHandler)
+ signal.signal(signal.SIGQUIT, self._intHandler)
+ signal.signal(signal.SIGTERM, self._intHandler)
+
+ def _restoreSignalHandlers(self):
+ """Restores previous signal handlers."""
+ for signum,handler in self._oldSIGs:
+ signal.signal(signum, handler)
+
+if __name__ == '__main__':
+ class TestJob(object):
+ def __init__(self, sock, addr):
+ self._sock = sock
+ self._addr = addr
+ def run(self):
+ print "Client connection opened from %s:%d" % self._addr
+ self._sock.send('Hello World!\n')
+ self._sock.setblocking(1)
+ self._sock.recv(1)
+ self._sock.close()
+ print "Client connection closed from %s:%d" % self._addr
+ sock = socket.socket()
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(('', 8080))
+ sock.listen(socket.SOMAXCONN)
+ PreforkServer(maxChildren=10, jobClass=TestJob).run(sock)