summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAllan Saddi <allan@saddi.com>2005-04-15 02:55:03 +0000
committerAllan Saddi <allan@saddi.com>2005-04-15 02:55:03 +0000
commit700071bc26f40727331651b4f273465a4faa0c7d (patch)
tree62cb10677bb5e732f29dd807e0dd37307564fc87
parente8f091226f39a888c019b1637fb8a47927b8a4ab (diff)
downloadflup-700071bc26f40727331651b4f273465a4faa0c7d.tar.gz
Checkpoint commit.
-rw-r--r--flup/server/ajp.py1042
-rw-r--r--flup/server/ajp_base.py916
-rw-r--r--flup/server/ajp_fork.py870
-rw-r--r--flup/server/fcgi.py1231
-rw-r--r--flup/server/fcgi_base.py1128
-rw-r--r--flup/server/fcgi_fork.py1052
-rw-r--r--flup/server/preforkserver.py (renamed from flup/server/prefork.py)5
-rw-r--r--flup/server/scgi.py565
-rw-r--r--flup/server/scgi_base.py435
-rw-r--r--flup/server/scgi_fork.py392
-rw-r--r--flup/server/threadedserver.py151
11 files changed, 2759 insertions, 5028 deletions
diff --git a/flup/server/ajp.py b/flup/server/ajp.py
index baa2deb..be9c3a7 100644
--- a/flup/server/ajp.py
+++ b/flup/server/ajp.py
@@ -79,810 +79,15 @@ that SCRIPT_NAME/PATH_INFO are correctly deduced.
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
-import sys
import socket
-import select
-import struct
-import signal
import logging
-import errno
-import datetime
-import time
-# Unfortunately, for now, threads are required.
-import thread
-import threading
+from ajp_base import BaseAJPServer, Connection
+from threadedserver import ThreadedServer
__all__ = ['WSGIServer']
-# Packet header prefixes.
-SERVER_PREFIX = '\x12\x34'
-CONTAINER_PREFIX = 'AB'
-
-# Server packet types.
-PKTTYPE_FWD_REQ = '\x02'
-PKTTYPE_SHUTDOWN = '\x07'
-PKTTYPE_PING = '\x08'
-PKTTYPE_CPING = '\x0a'
-
-# Container packet types.
-PKTTYPE_SEND_BODY = '\x03'
-PKTTYPE_SEND_HEADERS = '\x04'
-PKTTYPE_END_RESPONSE = '\x05'
-PKTTYPE_GET_BODY = '\x06'
-PKTTYPE_CPONG = '\x09'
-
-# Code tables for methods/headers/attributes.
-methodTable = [
- None,
- 'OPTIONS',
- 'GET',
- 'HEAD',
- 'POST',
- 'PUT',
- 'DELETE',
- 'TRACE',
- 'PROPFIND',
- 'PROPPATCH',
- 'MKCOL',
- 'COPY',
- 'MOVE',
- 'LOCK',
- 'UNLOCK',
- 'ACL',
- 'REPORT',
- 'VERSION-CONTROL',
- 'CHECKIN',
- 'CHECKOUT',
- 'UNCHECKOUT',
- 'SEARCH',
- 'MKWORKSPACE',
- 'UPDATE',
- 'LABEL',
- 'MERGE',
- 'BASELINE_CONTROL',
- 'MKACTIVITY'
- ]
-
-requestHeaderTable = [
- None,
- 'Accept',
- 'Accept-Charset',
- 'Accept-Encoding',
- 'Accept-Language',
- 'Authorization',
- 'Connection',
- 'Content-Type',
- 'Content-Length',
- 'Cookie',
- 'Cookie2',
- 'Host',
- 'Pragma',
- 'Referer',
- 'User-Agent'
- ]
-
-attributeTable = [
- None,
- 'CONTEXT',
- 'SERVLET_PATH',
- 'REMOTE_USER',
- 'AUTH_TYPE',
- 'QUERY_STRING',
- 'JVM_ROUTE',
- 'SSL_CERT',
- 'SSL_CIPHER',
- 'SSL_SESSION',
- None, # name follows
- 'SSL_KEY_SIZE'
- ]
-
-responseHeaderTable = [
- None,
- 'content-type',
- 'content-language',
- 'content-length',
- 'date',
- 'last-modified',
- 'location',
- 'set-cookie',
- 'set-cookie2',
- 'servlet-engine',
- 'status',
- 'www-authenticate'
- ]
-
-# The main classes use this name for logging.
-LoggerName = 'ajp-wsgi'
-
-# Set up module-level logger.
-console = logging.StreamHandler()
-console.setLevel(logging.DEBUG)
-console.setFormatter(logging.Formatter('%(asctime)s : %(message)s',
- '%Y-%m-%d %H:%M:%S'))
-logging.getLogger(LoggerName).addHandler(console)
-del console
-
-class ProtocolError(Exception):
- """
- Exception raised when the server does something unexpected or
- sends garbled data. Usually leads to a Connection closing.
- """
- pass
-
-def decodeString(data, pos=0):
- """Decode a string."""
- try:
- length = struct.unpack('>H', data[pos:pos+2])[0]
- pos += 2
- if length == 0xffff: # This was undocumented!
- return '', pos
- s = data[pos:pos+length]
- return s, pos+length+1 # Don't forget NUL
- except Exception, e:
- raise ProtocolError, 'decodeString: '+str(e)
-
-def decodeRequestHeader(data, pos=0):
- """Decode a request header/value pair."""
- try:
- if data[pos] == '\xa0':
- # Use table
- i = ord(data[pos+1])
- name = requestHeaderTable[i]
- if name is None:
- raise ValueError, 'bad request header code'
- pos += 2
- else:
- name, pos = decodeString(data, pos)
- value, pos = decodeString(data, pos)
- return name, value, pos
- except Exception, e:
- raise ProtocolError, 'decodeRequestHeader: '+str(e)
-
-def decodeAttribute(data, pos=0):
- """Decode a request attribute."""
- try:
- i = ord(data[pos])
- pos += 1
- if i == 0xff:
- # end
- return None, None, pos
- elif i == 0x0a:
- # name follows
- name, pos = decodeString(data, pos)
- elif i == 0x0b:
- # Special handling of SSL_KEY_SIZE.
- name = attributeTable[i]
- # Value is an int, not a string.
- value = struct.unpack('>H', data[pos:pos+2])[0]
- return name, str(value), pos+2
- else:
- name = attributeTable[i]
- if name is None:
- raise ValueError, 'bad attribute code'
- value, pos = decodeString(data, pos)
- return name, value, pos
- except Exception, e:
- raise ProtocolError, 'decodeAttribute: '+str(e)
-
-def encodeString(s):
- """Encode a string."""
- return struct.pack('>H', len(s)) + s + '\x00'
-
-def encodeResponseHeader(name, value):
- """Encode a response header/value pair."""
- lname = name.lower()
- if lname in responseHeaderTable:
- # Use table
- i = responseHeaderTable.index(lname)
- out = '\xa0' + chr(i)
- else:
- out = encodeString(name)
- out += encodeString(value)
- return out
-
-class Packet(object):
- """An AJP message packet."""
- def __init__(self):
- self.data = ''
- # Don't set this on write, it will be calculated automatically.
- self.length = 0
-
- def _recvall(sock, length):
- """
- Attempts to receive length bytes from a socket, blocking if necessary.
- (Socket may be blocking or non-blocking.)
- """
- dataList = []
- recvLen = 0
- while length:
- try:
- data = sock.recv(length)
- except socket.error, e:
- if e[0] == errno.EAGAIN:
- select.select([sock], [], [])
- continue
- else:
- raise
- if not data: # EOF
- break
- dataList.append(data)
- dataLen = len(data)
- recvLen += dataLen
- length -= dataLen
- return ''.join(dataList), recvLen
- _recvall = staticmethod(_recvall)
-
- def read(self, sock):
- """Attempt to read a packet from the server."""
- try:
- header, length = self._recvall(sock, 4)
- except socket.error:
- # Treat any sort of socket errors as EOF (close Connection).
- raise EOFError
-
- if length < 4:
- raise EOFError
-
- if header[:2] != SERVER_PREFIX:
- raise ProtocolError, 'invalid header'
-
- self.length = struct.unpack('>H', header[2:4])[0]
- if self.length:
- try:
- self.data, length = self._recvall(sock, self.length)
- except socket.error:
- raise EOFError
-
- if length < self.length:
- raise EOFError
-
- def _sendall(sock, data):
- """
- Writes data to a socket and does not return until all the data is sent.
- """
- length = len(data)
- while length:
- try:
- sent = sock.send(data)
- except socket.error, e:
- if e[0] == errno.EPIPE:
- return # Don't bother raising an exception. Just ignore.
- elif e[0] == errno.EAGAIN:
- select.select([], [sock], [])
- continue
- else:
- raise
- data = data[sent:]
- length -= sent
- _sendall = staticmethod(_sendall)
-
- def write(self, sock):
- """Send a packet to the server."""
- self.length = len(self.data)
- self._sendall(sock, CONTAINER_PREFIX + struct.pack('>H', self.length))
- if self.length:
- self._sendall(sock, self.data)
-
-class InputStream(object):
- """
- File-like object that represents the request body (if any). Supports
- the bare mininum methods required by the WSGI spec. Thanks to
- StringIO for ideas.
- """
- def __init__(self, conn):
- self._conn = conn
-
- # See WSGIServer.
- self._shrinkThreshold = conn.server.inputStreamShrinkThreshold
-
- self._buf = ''
- self._bufList = []
- self._pos = 0 # Current read position.
- self._avail = 0 # Number of bytes currently available.
- self._length = 0 # Set to Content-Length in request.
-
- self.logger = logging.getLogger(LoggerName)
-
- def bytesAvailForAdd(self):
- return self._length - self._avail
-
- def _shrinkBuffer(self):
- """Gets rid of already read data (since we can't rewind)."""
- if self._pos >= self._shrinkThreshold:
- self._buf = self._buf[self._pos:]
- self._avail -= self._pos
- self._length -= self._pos
- self._pos = 0
-
- assert self._avail >= 0 and self._length >= 0
-
- def _waitForData(self):
- toAdd = min(self.bytesAvailForAdd(), 0xffff)
- assert toAdd > 0
- pkt = Packet()
- pkt.data = PKTTYPE_GET_BODY + \
- struct.pack('>H', toAdd)
- self._conn.writePacket(pkt)
- self._conn.processInput()
-
- def read(self, n=-1):
- if self._pos == self._length:
- return ''
- while True:
- if n < 0 or (self._avail - self._pos) < n:
- # Not enough data available.
- if not self.bytesAvailForAdd():
- # And there's no more coming.
- newPos = self._avail
- break
- else:
- # Ask for more data and wait.
- self._waitForData()
- continue
- else:
- newPos = self._pos + n
- break
- # Merge buffer list, if necessary.
- if self._bufList:
- self._buf += ''.join(self._bufList)
- self._bufList = []
- r = self._buf[self._pos:newPos]
- self._pos = newPos
- self._shrinkBuffer()
- return r
-
- def readline(self, length=None):
- if self._pos == self._length:
- return ''
- while True:
- # Unfortunately, we need to merge the buffer list early.
- if self._bufList:
- self._buf += ''.join(self._bufList)
- self._bufList = []
- # Find newline.
- i = self._buf.find('\n', self._pos)
- if i < 0:
- # Not found?
- if not self.bytesAvailForAdd():
- # No more data coming.
- newPos = self._avail
- break
- else:
- # Wait for more to come.
- self._waitForData()
- continue
- else:
- newPos = i + 1
- break
- if length is not None:
- if self._pos + length < newPos:
- newPos = self._pos + length
- r = self._buf[self._pos:newPos]
- self._pos = newPos
- self._shrinkBuffer()
- return r
-
- def readlines(self, sizehint=0):
- total = 0
- lines = []
- line = self.readline()
- while line:
- lines.append(line)
- total += len(line)
- if 0 < sizehint <= total:
- break
- line = self.readline()
- return lines
-
- def __iter__(self):
- return self
-
- def next(self):
- r = self.readline()
- if not r:
- raise StopIteration
- return r
-
- def setDataLength(self, length):
- """
- Once Content-Length is known, Request calls this method to set it.
- """
- self._length = length
-
- def addData(self, data):
- """
- Adds data from the server to this InputStream. Note that we never ask
- the server for data beyond the Content-Length, so the server should
- never send us an EOF (empty string argument).
- """
- if not data:
- raise ProtocolError, 'short data'
- self._bufList.append(data)
- length = len(data)
- self._avail += length
- if self._avail > self._length:
- raise ProtocolError, 'too much data'
-
-class Request(object):
- """
- A Request object. A more fitting name would probably be Transaction, but
- it's named Request to mirror my FastCGI driver. :) This object
- encapsulates all the data about the HTTP request and allows the handler
- to send a response.
-
- The only attributes/methods that the handler should concern itself
- with are: environ, input, startResponse(), and write().
- """
- # Do not ever change the following value.
- _maxWrite = 8192 - 4 - 3 # 8k - pkt header - send body header
-
- def __init__(self, conn):
- self._conn = conn
-
- self.environ = {
- 'SCRIPT_NAME': conn.server.scriptName
- }
- self.input = InputStream(conn)
-
- self._headersSent = False
-
- self.logger = logging.getLogger(LoggerName)
-
- def run(self):
- self.logger.info('%s %s',
- self.environ['REQUEST_METHOD'],
- self.environ['REQUEST_URI'])
-
- start = datetime.datetime.now()
-
- try:
- self._conn.server.handler(self)
- except:
- self.logger.exception('Exception caught from handler')
- if not self._headersSent:
- self._conn.server.error(self)
-
- end = datetime.datetime.now()
-
- # Notify server of end of response (reuse flag is set to true).
- pkt = Packet()
- pkt.data = PKTTYPE_END_RESPONSE + '\x01'
- self._conn.writePacket(pkt)
-
- handlerTime = end - start
- self.logger.debug('%s %s done (%.3f secs)',
- self.environ['REQUEST_METHOD'],
- self.environ['REQUEST_URI'],
- handlerTime.seconds +
- handlerTime.microseconds / 1000000.0)
-
- # The following methods are called from the Connection to set up this
- # Request.
-
- def setMethod(self, value):
- self.environ['REQUEST_METHOD'] = value
-
- def setProtocol(self, value):
- self.environ['SERVER_PROTOCOL'] = value
-
- def setRequestURI(self, value):
- self.environ['REQUEST_URI'] = value
-
- scriptName = self._conn.server.scriptName
- if not value.startswith(scriptName):
- self.logger.warning('scriptName does not match request URI')
-
- self.environ['PATH_INFO'] = value[len(scriptName):]
-
- def setRemoteAddr(self, value):
- self.environ['REMOTE_ADDR'] = value
-
- def setRemoteHost(self, value):
- self.environ['REMOTE_HOST'] = value
-
- def setServerName(self, value):
- self.environ['SERVER_NAME'] = value
-
- def setServerPort(self, value):
- self.environ['SERVER_PORT'] = str(value)
-
- def setIsSSL(self, value):
- if value:
- self.environ['HTTPS'] = 'on'
-
- def addHeader(self, name, value):
- name = name.replace('-', '_').upper()
- if name in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
- self.environ[name] = value
- if name == 'CONTENT_LENGTH':
- length = int(value)
- self.input.setDataLength(length)
- else:
- self.environ['HTTP_'+name] = value
-
- def addAttribute(self, name, value):
- self.environ[name] = value
-
- # The only two methods that should be called from the handler.
-
- def startResponse(self, statusCode, statusMsg, headers):
- """
- Begin the HTTP response. This must only be called once and it
- must be called before any calls to write().
-
- statusCode is the integer status code (e.g. 200). statusMsg
- is the associated reason message (e.g.'OK'). headers is a list
- of 2-tuples - header name/value pairs. (Both header name and value
- must be strings.)
- """
- assert not self._headersSent, 'Headers already sent!'
-
- pkt = Packet()
- pkt.data = PKTTYPE_SEND_HEADERS + \
- struct.pack('>H', statusCode) + \
- encodeString(statusMsg) + \
- struct.pack('>H', len(headers)) + \
- ''.join([encodeResponseHeader(name, value)
- for name,value in headers])
-
- self._conn.writePacket(pkt)
-
- self._headersSent = True
-
- def write(self, data):
- """
- Write data (which comprises the response body). Note that due to
- restrictions on AJP packet size, we limit our writes to 8185 bytes
- each packet.
- """
- assert self._headersSent, 'Headers must be sent first!'
-
- bytesLeft = len(data)
- while bytesLeft:
- toWrite = min(bytesLeft, self._maxWrite)
-
- pkt = Packet()
- pkt.data = PKTTYPE_SEND_BODY + \
- struct.pack('>H', toWrite) + \
- data[:toWrite]
- self._conn.writePacket(pkt)
-
- data = data[toWrite:]
- bytesLeft -= toWrite
-
-class Connection(object):
- """
- A single Connection with the server. Requests are not multiplexed over the
- same connection, so at any given time, the Connection is either
- waiting for a request, or processing a single request.
- """
- def __init__(self, sock, addr, server):
- self.server = server
- self._sock = sock
- self._addr = addr
-
- self._request = None
-
- self.logger = logging.getLogger(LoggerName)
-
- def run(self):
- self.logger.debug('Connection starting up (%s:%d)',
- self._addr[0], self._addr[1])
-
- # Main loop. Errors will cause the loop to be exited and
- # the socket to be closed.
- while True:
- try:
- self.processInput()
- except ProtocolError, e:
- self.logger.error("Protocol error '%s'", str(e))
- break
- except EOFError:
- break
- except:
- self.logger.exception('Exception caught in Connection')
- break
-
- self.logger.debug('Connection shutting down (%s:%d)',
- self._addr[0], self._addr[1])
-
- self._sock.close()
-
- def processInput(self):
- """Wait for and process a single packet."""
- pkt = Packet()
- select.select([self._sock], [], [])
- pkt.read(self._sock)
-
- # Body chunks have no packet type code.
- if self._request is not None:
- self._processBody(pkt)
- return
-
- if not pkt.length:
- raise ProtocolError, 'unexpected empty packet'
-
- pkttype = pkt.data[0]
- if pkttype == PKTTYPE_FWD_REQ:
- self._forwardRequest(pkt)
- elif pkttype == PKTTYPE_SHUTDOWN:
- self._shutdown(pkt)
- elif pkttype == PKTTYPE_PING:
- self._ping(pkt)
- elif pkttype == PKTTYPE_CPING:
- self._cping(pkt)
- else:
- raise ProtocolError, 'unknown packet type'
-
- def _forwardRequest(self, pkt):
- """
- Creates a Request object, fills it in from the packet, then runs it.
- """
- assert self._request is None
-
- req = self.server.requestClass(self)
- i = ord(pkt.data[1])
- method = methodTable[i]
- if method is None:
- raise ValueError, 'bad method field'
- req.setMethod(method)
- value, pos = decodeString(pkt.data, 2)
- req.setProtocol(value)
- value, pos = decodeString(pkt.data, pos)
- req.setRequestURI(value)
- value, pos = decodeString(pkt.data, pos)
- req.setRemoteAddr(value)
- value, pos = decodeString(pkt.data, pos)
- req.setRemoteHost(value)
- value, pos = decodeString(pkt.data, pos)
- req.setServerName(value)
- value = struct.unpack('>H', pkt.data[pos:pos+2])[0]
- req.setServerPort(value)
- i = ord(pkt.data[pos+2])
- req.setIsSSL(i != 0)
-
- # Request headers.
- numHeaders = struct.unpack('>H', pkt.data[pos+3:pos+5])[0]
- pos += 5
- for i in range(numHeaders):
- name, value, pos = decodeRequestHeader(pkt.data, pos)
- req.addHeader(name, value)
-
- # Attributes.
- while True:
- name, value, pos = decodeAttribute(pkt.data, pos)
- if name is None:
- break
- req.addAttribute(name, value)
-
- self._request = req
-
- # Read first body chunk, if needed.
- if req.input.bytesAvailForAdd():
- self.processInput()
-
- # Run Request.
- req.run()
-
- self._request = None
-
- def _shutdown(self, pkt):
- """Not sure what to do with this yet."""
- self.logger.info('Received shutdown request from server')
-
- def _ping(self, pkt):
- """I have no idea what this packet means."""
- self.logger.debug('Received ping')
-
- def _cping(self, pkt):
- """Respond to a PING (CPING) packet."""
- self.logger.debug('Received PING, sending PONG')
- pkt = Packet()
- pkt.data = PKTTYPE_CPONG
- self.writePacket(pkt)
-
- def _processBody(self, pkt):
- """
- Handles a body chunk from the server by appending it to the
- InputStream.
- """
- if pkt.length:
- length = struct.unpack('>H', pkt.data[:2])[0]
- self._request.input.addData(pkt.data[2:2+length])
- else:
- # Shouldn't really ever get here.
- self._request.input.addData('')
-
- def writePacket(self, pkt):
- """Sends a Packet to the server."""
- pkt.write(self._sock)
-
-class ThreadPool(object):
- """
- Thread pool that maintains the number of idle threads between
- minSpare and maxSpare inclusive. By default, there is no limit on
- the number of threads that can be started, but this can be controlled
- by maxThreads.
- """
- def __init__(self, minSpare=1, maxSpare=5, maxThreads=sys.maxint):
- self._minSpare = minSpare
- self._maxSpare = maxSpare
- self._maxThreads = max(minSpare, maxThreads)
-
- self._lock = threading.Condition()
- self._workQueue = []
- self._idleCount = self._workerCount = maxSpare
-
- # Start the minimum number of worker threads.
- for i in range(maxSpare):
- thread.start_new_thread(self._worker, ())
-
- def addJob(self, job, allowQueuing=True):
- """
- Adds a job to the work queue. The job object should have a run()
- method. If allowQueuing is True (the default), the job will be
- added to the work queue regardless if there are any idle threads
- ready. (The only way for there to be no idle threads is if maxThreads
- is some reasonable, finite limit.)
-
- Otherwise, if allowQueuing is False, and there are no more idle
- threads, the job will not be queued.
-
- Returns True if the job was queued, False otherwise.
- """
- self._lock.acquire()
- try:
- # Maintain minimum number of spares.
- while self._idleCount < self._minSpare and \
- self._workerCount < self._maxThreads:
- self._workerCount += 1
- self._idleCount += 1
- thread.start_new_thread(self._worker, ())
-
- # Hand off the job.
- if self._idleCount or allowQueuing:
- self._workQueue.append(job)
- self._lock.notify()
- return True
- else:
- return False
- finally:
- self._lock.release()
-
- def _worker(self):
- """
- Worker thread routine. Waits for a job, executes it, repeat.
- """
- self._lock.acquire()
- while True:
- while not self._workQueue:
- self._lock.wait()
-
- # We have a job to do...
- job = self._workQueue.pop(0)
-
- assert self._idleCount > 0
- self._idleCount -= 1
-
- self._lock.release()
-
- job.run()
-
- self._lock.acquire()
-
- if self._idleCount == self._maxSpare:
- break # NB: lock still held
- self._idleCount += 1
- assert self._idleCount <= self._maxSpare
-
- # Die off...
- assert self._workerCount > self._maxSpare
- self._workerCount -= 1
-
- self._lock.release()
-
-class WSGIServer(object):
+class WSGIServer(BaseAJPServer, ThreadedServer):
"""
AJP1.3/WSGI server. Runs your WSGI application as a persistant program
that understands AJP1.3. Opens up a TCP socket, binds it, and then
@@ -895,15 +100,6 @@ class WSGIServer(object):
Of course you will need an AJP1.3 connector for your webserver (e.g.
mod_jk) - see <http://jakarta.apache.org/tomcat/connectors-doc/>.
"""
- # What Request class to use.
- requestClass = Request
-
- # Limits the size of the InputStream's string buffer to this size + 8k.
- # Since the InputStream is not seekable, we throw away already-read
- # data once this certain amount has been read. (The 8k is there because
- # it is the maximum size of new data added per chunk.)
- inputStreamShrinkThreshold = 102400 - 8192
-
def __init__(self, application, scriptName='', environ=None,
multithreaded=True,
bindAddress=('localhost', 8009), allowedServers=None,
@@ -917,8 +113,6 @@ class WSGIServer(object):
environ, which must be a dictionary, can contain any additional
environment variables you want to pass to your application.
- Set multithreaded to False if your application is not thread-safe.
-
bindAddress is the address to bind to, which must be a tuple of
length 2. The first element is a string, which is the host name
or IPv4 address of a local interface. The 2nd element is the port
@@ -929,67 +123,25 @@ class WSGIServer(object):
connections from anywhere.
loggingLevel sets the logging level of the module-level logger.
-
- Any additional keyword arguments are passed to the underlying
- ThreadPool.
"""
- if environ is None:
- environ = {}
-
- self.application = application
- self.scriptName = scriptName
- self.environ = environ
- self.multithreaded = multithreaded
- self._bindAddress = bindAddress
- self._allowedServers = allowedServers
-
- # Used to force single-threadedness.
- self._appLock = thread.allocate_lock()
-
- self._threadPool = ThreadPool(**kw)
-
- self.logger = logging.getLogger(LoggerName)
- self.logger.setLevel(loggingLevel)
-
- def _setupSocket(self):
- """Creates and binds the socket for communication with the server."""
- sock = socket.socket()
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(self._bindAddress)
- sock.listen(socket.SOMAXCONN)
- return sock
+ BaseAJPServer.__init__(self, application,
+ scriptName=scriptName,
+ environ=environ,
+ multithreaded=multithreaded,
+ bindAddress=bindAddress,
+ allowedServers=allowedServers,
+ loggingLevel=loggingLevel)
+ for key in ('jobClass', 'jobArgs'):
+ if kw.has_key(key):
+ del kw[key]
+ ThreadedServer.__init__(self, jobClass=Connection, jobArgs=(self,),
+ **kw)
- def _cleanupSocket(self, sock):
- """Closes the main socket."""
- sock.close()
-
- def _isServerAllowed(self, addr):
- return self._allowedServers is None or \
- addr[0] in self._allowedServers
-
- def _installSignalHandlers(self):
- self._oldSIGs = [(x,signal.getsignal(x)) for x in
- (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)]
- signal.signal(signal.SIGHUP, self._hupHandler)
- signal.signal(signal.SIGINT, self._intHandler)
- signal.signal(signal.SIGTERM, self._intHandler)
-
- def _restoreSignalHandlers(self):
- for signum,handler in self._oldSIGs:
- signal.signal(signum, handler)
-
- def _hupHandler(self, signum, frame):
- self._hupReceived = True
- self._keepGoing = False
-
- def _intHandler(self, signum, frame):
- self._keepGoing = False
-
- def run(self, timeout=1.0):
+ def run(self):
"""
Main loop. Call this after instantiating WSGIServer. SIGHUP, SIGINT,
- SIGTERM cause it to cleanup and return. (If a SIGHUP is caught, this
- method returns True. Returns False otherwise.)
+ SIGQUIT, SIGTERM cause it to cleanup and return. (If a SIGHUP
+ is caught, this method returns True. Returns False otherwise.)
"""
self.logger.info('%s starting up', self.__class__.__name__)
@@ -999,168 +151,14 @@ class WSGIServer(object):
self.logger.error('Failed to bind socket (%s), exiting', e[1])
return False
- self._keepGoing = True
- self._hupReceived = False
-
- # Install signal handlers.
- self._installSignalHandlers()
-
- while self._keepGoing:
- try:
- r, w, e = select.select([sock], [], [], timeout)
- except select.error, e:
- if e[0] == errno.EINTR:
- continue
- raise
-
- if r:
- try:
- clientSock, addr = sock.accept()
- except socket.error, e:
- if e[0] in (errno.EINTR, errno.EAGAIN):
- continue
- raise
-
- if not self._isServerAllowed(addr):
- self.logger.warning('Server connection from %s disallowed',
- addr[0])
- clientSock.close()
- continue
-
- # Hand off to Connection.
- conn = Connection(clientSock, addr, self)
- if not self._threadPool.addJob(conn, allowQueuing=False):
- # No thread left, immediately close the socket to hopefully
- # indicate to the web server that we're at our limit...
- # and to prevent having too many opened (and useless)
- # files.
- clientSock.close()
-
- self._mainloopPeriodic()
-
- # Restore old signal handlers.
- self._restoreSignalHandlers()
+ ret = ThreadedServer.run(self, sock)
self._cleanupSocket(sock)
self.logger.info('%s shutting down%s', self.__class__.__name__,
self._hupReceived and ' (reload requested)' or '')
- return self._hupReceived
-
- def _mainloopPeriodic(self):
- """
- Called with just about each iteration of the main loop. Meant to
- be overridden.
- """
- pass
-
- def _exit(self, reload=False):
- """
- Protected convenience method for subclasses to force an exit. Not
- really thread-safe, which is why it isn't public.
- """
- if self._keepGoing:
- self._keepGoing = False
- self._hupReceived = reload
-
- def handler(self, request):
- """
- WSGI handler. Sets up WSGI environment, calls the application,
- and sends the application's response.
- """
- environ = request.environ
- environ.update(self.environ)
-
- environ['wsgi.version'] = (1,0)
- environ['wsgi.input'] = request.input
- environ['wsgi.errors'] = sys.stderr
- environ['wsgi.multithread'] = self.multithreaded
- environ['wsgi.multiprocess'] = True
- environ['wsgi.run_once'] = False
-
- if environ.get('HTTPS', 'off') in ('on', '1'):
- environ['wsgi.url_scheme'] = 'https'
- else:
- environ['wsgi.url_scheme'] = 'http'
-
- headers_set = []
- headers_sent = []
- result = None
-
- def write(data):
- assert type(data) is str, 'write() argument must be string'
- assert headers_set, 'write() before start_response()'
-
- if not headers_sent:
- status, responseHeaders = headers_sent[:] = headers_set
- statusCode = int(status[:3])
- statusMsg = status[4:]
- found = False
- for header,value in responseHeaders:
- if header.lower() == 'content-length':
- found = True
- break
- if not found and result is not None:
- try:
- if len(result) == 1:
- responseHeaders.append(('Content-Length',
- str(len(data))))
- except:
- pass
- request.startResponse(statusCode, statusMsg, responseHeaders)
-
- request.write(data)
-
- def start_response(status, response_headers, exc_info=None):
- if exc_info:
- try:
- if headers_sent:
- # Re-raise if too late
- raise exc_info[0], exc_info[1], exc_info[2]
- finally:
- exc_info = None # avoid dangling circular ref
- else:
- assert not headers_set, 'Headers already set!'
-
- assert type(status) is str, 'Status must be a string'
- assert len(status) >= 4, 'Status must be at least 4 characters'
- assert int(status[:3]), 'Status must begin with 3-digit code'
- assert status[3] == ' ', 'Status must have a space after code'
- assert type(response_headers) is list, 'Headers must be a list'
- if __debug__:
- for name,val in response_headers:
- assert type(name) is str, 'Header names must be strings'
- assert type(val) is str, 'Header values must be strings'
-
- headers_set[:] = [status, response_headers]
- return write
-
- if not self.multithreaded:
- self._appLock.acquire()
- try:
- result = self.application(environ, start_response)
- try:
- for data in result:
- if data:
- write(data)
- if not headers_sent:
- write('') # in case body was empty
- finally:
- if hasattr(result, 'close'):
- result.close()
- finally:
- if not self.multithreaded:
- self._appLock.release()
-
- def error(self, request):
- """
- Override to provide custom error handling. Ideally, however,
- all errors should be caught at the application level.
- """
- request.startResponse(200, 'OK', [('Content-Type', 'text/html')])
- import cgitb
- request.write(cgitb.html(sys.exc_info()))
+ return ret
if __name__ == '__main__':
def test_app(environ, start_response):
diff --git a/flup/server/ajp_base.py b/flup/server/ajp_base.py
new file mode 100644
index 0000000..32e0f9c
--- /dev/null
+++ b/flup/server/ajp_base.py
@@ -0,0 +1,916 @@
+# Copyright (c) 2005 Allan Saddi <allan@saddi.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+#
+# $Id$
+
+__author__ = 'Allan Saddi <allan@saddi.com>'
+__version__ = '$Revision$'
+
+import sys
+import socket
+import select
+import struct
+import signal
+import logging
+import errno
+import datetime
+import time
+
+# Unfortunately, for now, threads are required.
+import thread
+import threading
+
+__all__ = ['BaseAJPServer']
+
+# Packet header prefixes.
+SERVER_PREFIX = '\x12\x34'
+CONTAINER_PREFIX = 'AB'
+
+# Server packet types.
+PKTTYPE_FWD_REQ = '\x02'
+PKTTYPE_SHUTDOWN = '\x07'
+PKTTYPE_PING = '\x08'
+PKTTYPE_CPING = '\x0a'
+
+# Container packet types.
+PKTTYPE_SEND_BODY = '\x03'
+PKTTYPE_SEND_HEADERS = '\x04'
+PKTTYPE_END_RESPONSE = '\x05'
+PKTTYPE_GET_BODY = '\x06'
+PKTTYPE_CPONG = '\x09'
+
+# Code tables for methods/headers/attributes.
+methodTable = [
+ None,
+ 'OPTIONS',
+ 'GET',
+ 'HEAD',
+ 'POST',
+ 'PUT',
+ 'DELETE',
+ 'TRACE',
+ 'PROPFIND',
+ 'PROPPATCH',
+ 'MKCOL',
+ 'COPY',
+ 'MOVE',
+ 'LOCK',
+ 'UNLOCK',
+ 'ACL',
+ 'REPORT',
+ 'VERSION-CONTROL',
+ 'CHECKIN',
+ 'CHECKOUT',
+ 'UNCHECKOUT',
+ 'SEARCH',
+ 'MKWORKSPACE',
+ 'UPDATE',
+ 'LABEL',
+ 'MERGE',
+ 'BASELINE_CONTROL',
+ 'MKACTIVITY'
+ ]
+
+requestHeaderTable = [
+ None,
+ 'Accept',
+ 'Accept-Charset',
+ 'Accept-Encoding',
+ 'Accept-Language',
+ 'Authorization',
+ 'Connection',
+ 'Content-Type',
+ 'Content-Length',
+ 'Cookie',
+ 'Cookie2',
+ 'Host',
+ 'Pragma',
+ 'Referer',
+ 'User-Agent'
+ ]
+
+attributeTable = [
+ None,
+ 'CONTEXT',
+ 'SERVLET_PATH',
+ 'REMOTE_USER',
+ 'AUTH_TYPE',
+ 'QUERY_STRING',
+ 'JVM_ROUTE',
+ 'SSL_CERT',
+ 'SSL_CIPHER',
+ 'SSL_SESSION',
+ None, # name follows
+ 'SSL_KEY_SIZE'
+ ]
+
+responseHeaderTable = [
+ None,
+ 'content-type',
+ 'content-language',
+ 'content-length',
+ 'date',
+ 'last-modified',
+ 'location',
+ 'set-cookie',
+ 'set-cookie2',
+ 'servlet-engine',
+ 'status',
+ 'www-authenticate'
+ ]
+
+# The main classes use this name for logging.
+LoggerName = 'ajp-wsgi'
+
+# Set up module-level logger.
+console = logging.StreamHandler()
+console.setLevel(logging.DEBUG)
+console.setFormatter(logging.Formatter('%(asctime)s : %(message)s',
+ '%Y-%m-%d %H:%M:%S'))
+logging.getLogger(LoggerName).addHandler(console)
+del console
+
+class ProtocolError(Exception):
+ """
+ Exception raised when the server does something unexpected or
+ sends garbled data. Usually leads to a Connection closing.
+ """
+ pass
+
+def decodeString(data, pos=0):
+ """Decode a string."""
+ try:
+ length = struct.unpack('>H', data[pos:pos+2])[0]
+ pos += 2
+ if length == 0xffff: # This was undocumented!
+ return '', pos
+ s = data[pos:pos+length]
+ return s, pos+length+1 # Don't forget NUL
+ except Exception, e:
+ raise ProtocolError, 'decodeString: '+str(e)
+
+def decodeRequestHeader(data, pos=0):
+ """Decode a request header/value pair."""
+ try:
+ if data[pos] == '\xa0':
+ # Use table
+ i = ord(data[pos+1])
+ name = requestHeaderTable[i]
+ if name is None:
+ raise ValueError, 'bad request header code'
+ pos += 2
+ else:
+ name, pos = decodeString(data, pos)
+ value, pos = decodeString(data, pos)
+ return name, value, pos
+ except Exception, e:
+ raise ProtocolError, 'decodeRequestHeader: '+str(e)
+
+def decodeAttribute(data, pos=0):
+ """Decode a request attribute."""
+ try:
+ i = ord(data[pos])
+ pos += 1
+ if i == 0xff:
+ # end
+ return None, None, pos
+ elif i == 0x0a:
+ # name follows
+ name, pos = decodeString(data, pos)
+ elif i == 0x0b:
+ # Special handling of SSL_KEY_SIZE.
+ name = attributeTable[i]
+ # Value is an int, not a string.
+ value = struct.unpack('>H', data[pos:pos+2])[0]
+ return name, str(value), pos+2
+ else:
+ name = attributeTable[i]
+ if name is None:
+ raise ValueError, 'bad attribute code'
+ value, pos = decodeString(data, pos)
+ return name, value, pos
+ except Exception, e:
+ raise ProtocolError, 'decodeAttribute: '+str(e)
+
+def encodeString(s):
+ """Encode a string."""
+ return struct.pack('>H', len(s)) + s + '\x00'
+
+def encodeResponseHeader(name, value):
+ """Encode a response header/value pair."""
+ lname = name.lower()
+ if lname in responseHeaderTable:
+ # Use table
+ i = responseHeaderTable.index(lname)
+ out = '\xa0' + chr(i)
+ else:
+ out = encodeString(name)
+ out += encodeString(value)
+ return out
+
+class Packet(object):
+ """An AJP message packet."""
+ def __init__(self):
+ self.data = ''
+ # Don't set this on write, it will be calculated automatically.
+ self.length = 0
+
+ def _recvall(sock, length):
+ """
+ Attempts to receive length bytes from a socket, blocking if necessary.
+ (Socket may be blocking or non-blocking.)
+ """
+ dataList = []
+ recvLen = 0
+ while length:
+ try:
+ data = sock.recv(length)
+ except socket.error, e:
+ if e[0] == errno.EAGAIN:
+ select.select([sock], [], [])
+ continue
+ else:
+ raise
+ if not data: # EOF
+ break
+ dataList.append(data)
+ dataLen = len(data)
+ recvLen += dataLen
+ length -= dataLen
+ return ''.join(dataList), recvLen
+ _recvall = staticmethod(_recvall)
+
+ def read(self, sock):
+ """Attempt to read a packet from the server."""
+ try:
+ header, length = self._recvall(sock, 4)
+ except socket.error:
+ # Treat any sort of socket errors as EOF (close Connection).
+ raise EOFError
+
+ if length < 4:
+ raise EOFError
+
+ if header[:2] != SERVER_PREFIX:
+ raise ProtocolError, 'invalid header'
+
+ self.length = struct.unpack('>H', header[2:4])[0]
+ if self.length:
+ try:
+ self.data, length = self._recvall(sock, self.length)
+ except socket.error:
+ raise EOFError
+
+ if length < self.length:
+ raise EOFError
+
+ def _sendall(sock, data):
+ """
+ Writes data to a socket and does not return until all the data is sent.
+ """
+ length = len(data)
+ while length:
+ try:
+ sent = sock.send(data)
+ except socket.error, e:
+ if e[0] == errno.EPIPE:
+ return # Don't bother raising an exception. Just ignore.
+ elif e[0] == errno.EAGAIN:
+ select.select([], [sock], [])
+ continue
+ else:
+ raise
+ data = data[sent:]
+ length -= sent
+ _sendall = staticmethod(_sendall)
+
+ def write(self, sock):
+ """Send a packet to the server."""
+ self.length = len(self.data)
+ self._sendall(sock, CONTAINER_PREFIX + struct.pack('>H', self.length))
+ if self.length:
+ self._sendall(sock, self.data)
+
+class InputStream(object):
+ """
+ File-like object that represents the request body (if any). Supports
+ the bare mininum methods required by the WSGI spec. Thanks to
+ StringIO for ideas.
+ """
+ def __init__(self, conn):
+ self._conn = conn
+
+ # See WSGIServer.
+ self._shrinkThreshold = conn.server.inputStreamShrinkThreshold
+
+ self._buf = ''
+ self._bufList = []
+ self._pos = 0 # Current read position.
+ self._avail = 0 # Number of bytes currently available.
+ self._length = 0 # Set to Content-Length in request.
+
+ self.logger = logging.getLogger(LoggerName)
+
+ def bytesAvailForAdd(self):
+ return self._length - self._avail
+
+ def _shrinkBuffer(self):
+ """Gets rid of already read data (since we can't rewind)."""
+ if self._pos >= self._shrinkThreshold:
+ self._buf = self._buf[self._pos:]
+ self._avail -= self._pos
+ self._length -= self._pos
+ self._pos = 0
+
+ assert self._avail >= 0 and self._length >= 0
+
+ def _waitForData(self):
+ toAdd = min(self.bytesAvailForAdd(), 0xffff)
+ assert toAdd > 0
+ pkt = Packet()
+ pkt.data = PKTTYPE_GET_BODY + \
+ struct.pack('>H', toAdd)
+ self._conn.writePacket(pkt)
+ self._conn.processInput()
+
+ def read(self, n=-1):
+ if self._pos == self._length:
+ return ''
+ while True:
+ if n < 0 or (self._avail - self._pos) < n:
+ # Not enough data available.
+ if not self.bytesAvailForAdd():
+ # And there's no more coming.
+ newPos = self._avail
+ break
+ else:
+ # Ask for more data and wait.
+ self._waitForData()
+ continue
+ else:
+ newPos = self._pos + n
+ break
+ # Merge buffer list, if necessary.
+ if self._bufList:
+ self._buf += ''.join(self._bufList)
+ self._bufList = []
+ r = self._buf[self._pos:newPos]
+ self._pos = newPos
+ self._shrinkBuffer()
+ return r
+
+ def readline(self, length=None):
+ if self._pos == self._length:
+ return ''
+ while True:
+ # Unfortunately, we need to merge the buffer list early.
+ if self._bufList:
+ self._buf += ''.join(self._bufList)
+ self._bufList = []
+ # Find newline.
+ i = self._buf.find('\n', self._pos)
+ if i < 0:
+ # Not found?
+ if not self.bytesAvailForAdd():
+ # No more data coming.
+ newPos = self._avail
+ break
+ else:
+ # Wait for more to come.
+ self._waitForData()
+ continue
+ else:
+ newPos = i + 1
+ break
+ if length is not None:
+ if self._pos + length < newPos:
+ newPos = self._pos + length
+ r = self._buf[self._pos:newPos]
+ self._pos = newPos
+ self._shrinkBuffer()
+ return r
+
+ def readlines(self, sizehint=0):
+ total = 0
+ lines = []
+ line = self.readline()
+ while line:
+ lines.append(line)
+ total += len(line)
+ if 0 < sizehint <= total:
+ break
+ line = self.readline()
+ return lines
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ r = self.readline()
+ if not r:
+ raise StopIteration
+ return r
+
+ def setDataLength(self, length):
+ """
+ Once Content-Length is known, Request calls this method to set it.
+ """
+ self._length = length
+
+ def addData(self, data):
+ """
+ Adds data from the server to this InputStream. Note that we never ask
+ the server for data beyond the Content-Length, so the server should
+ never send us an EOF (empty string argument).
+ """
+ if not data:
+ raise ProtocolError, 'short data'
+ self._bufList.append(data)
+ length = len(data)
+ self._avail += length
+ if self._avail > self._length:
+ raise ProtocolError, 'too much data'
+
+class Request(object):
+ """
+ A Request object. A more fitting name would probably be Transaction, but
+ it's named Request to mirror my FastCGI driver. :) This object
+ encapsulates all the data about the HTTP request and allows the handler
+ to send a response.
+
+ The only attributes/methods that the handler should concern itself
+ with are: environ, input, startResponse(), and write().
+ """
+ # Do not ever change the following value.
+ _maxWrite = 8192 - 4 - 3 # 8k - pkt header - send body header
+
+ def __init__(self, conn):
+ self._conn = conn
+
+ self.environ = {
+ 'SCRIPT_NAME': conn.server.scriptName
+ }
+ self.input = InputStream(conn)
+
+ self._headersSent = False
+
+ self.logger = logging.getLogger(LoggerName)
+
+ def run(self):
+ self.logger.info('%s %s',
+ self.environ['REQUEST_METHOD'],
+ self.environ['REQUEST_URI'])
+
+ start = datetime.datetime.now()
+
+ try:
+ self._conn.server.handler(self)
+ except:
+ self.logger.exception('Exception caught from handler')
+ if not self._headersSent:
+ self._conn.server.error(self)
+
+ end = datetime.datetime.now()
+
+ # Notify server of end of response (reuse flag is set to true).
+ pkt = Packet()
+ pkt.data = PKTTYPE_END_RESPONSE + '\x01'
+ self._conn.writePacket(pkt)
+
+ handlerTime = end - start
+ self.logger.debug('%s %s done (%.3f secs)',
+ self.environ['REQUEST_METHOD'],
+ self.environ['REQUEST_URI'],
+ handlerTime.seconds +
+ handlerTime.microseconds / 1000000.0)
+
+ # The following methods are called from the Connection to set up this
+ # Request.
+
+ def setMethod(self, value):
+ self.environ['REQUEST_METHOD'] = value
+
+ def setProtocol(self, value):
+ self.environ['SERVER_PROTOCOL'] = value
+
+ def setRequestURI(self, value):
+ self.environ['REQUEST_URI'] = value
+
+ scriptName = self._conn.server.scriptName
+ if not value.startswith(scriptName):
+ self.logger.warning('scriptName does not match request URI')
+
+ self.environ['PATH_INFO'] = value[len(scriptName):]
+
+ def setRemoteAddr(self, value):
+ self.environ['REMOTE_ADDR'] = value
+
+ def setRemoteHost(self, value):
+ self.environ['REMOTE_HOST'] = value
+
+ def setServerName(self, value):
+ self.environ['SERVER_NAME'] = value
+
+ def setServerPort(self, value):
+ self.environ['SERVER_PORT'] = str(value)
+
+ def setIsSSL(self, value):
+ if value:
+ self.environ['HTTPS'] = 'on'
+
+ def addHeader(self, name, value):
+ name = name.replace('-', '_').upper()
+ if name in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
+ self.environ[name] = value
+ if name == 'CONTENT_LENGTH':
+ length = int(value)
+ self.input.setDataLength(length)
+ else:
+ self.environ['HTTP_'+name] = value
+
+ def addAttribute(self, name, value):
+ self.environ[name] = value
+
+ # The only two methods that should be called from the handler.
+
+ def startResponse(self, statusCode, statusMsg, headers):
+ """
+ Begin the HTTP response. This must only be called once and it
+ must be called before any calls to write().
+
+ statusCode is the integer status code (e.g. 200). statusMsg
+ is the associated reason message (e.g.'OK'). headers is a list
+ of 2-tuples - header name/value pairs. (Both header name and value
+ must be strings.)
+ """
+ assert not self._headersSent, 'Headers already sent!'
+
+ pkt = Packet()
+ pkt.data = PKTTYPE_SEND_HEADERS + \
+ struct.pack('>H', statusCode) + \
+ encodeString(statusMsg) + \
+ struct.pack('>H', len(headers)) + \
+ ''.join([encodeResponseHeader(name, value)
+ for name,value in headers])
+
+ self._conn.writePacket(pkt)
+
+ self._headersSent = True
+
+ def write(self, data):
+ """
+ Write data (which comprises the response body). Note that due to
+ restrictions on AJP packet size, we limit our writes to 8185 bytes
+ each packet.
+ """
+ assert self._headersSent, 'Headers must be sent first!'
+
+ bytesLeft = len(data)
+ while bytesLeft:
+ toWrite = min(bytesLeft, self._maxWrite)
+
+ pkt = Packet()
+ pkt.data = PKTTYPE_SEND_BODY + \
+ struct.pack('>H', toWrite) + \
+ data[:toWrite]
+ self._conn.writePacket(pkt)
+
+ data = data[toWrite:]
+ bytesLeft -= toWrite
+
+class Connection(object):
+ """
+ A single Connection with the server. Requests are not multiplexed over the
+ same connection, so at any given time, the Connection is either
+ waiting for a request, or processing a single request.
+ """
+ def __init__(self, sock, addr, server):
+ self.server = server
+ self._sock = sock
+ self._addr = addr
+
+ self._request = None
+
+ self.logger = logging.getLogger(LoggerName)
+
+ def run(self):
+ self.logger.debug('Connection starting up (%s:%d)',
+ self._addr[0], self._addr[1])
+
+ # Main loop. Errors will cause the loop to be exited and
+ # the socket to be closed.
+ while True:
+ try:
+ self.processInput()
+ except ProtocolError, e:
+ self.logger.error("Protocol error '%s'", str(e))
+ break
+ except (EOFError, KeyboardInterrupt):
+ break
+ except:
+ self.logger.exception('Exception caught in Connection')
+ break
+
+ self.logger.debug('Connection shutting down (%s:%d)',
+ self._addr[0], self._addr[1])
+
+ self._sock.close()
+
+ def processInput(self):
+ """Wait for and process a single packet."""
+ pkt = Packet()
+ select.select([self._sock], [], [])
+ pkt.read(self._sock)
+
+ # Body chunks have no packet type code.
+ if self._request is not None:
+ self._processBody(pkt)
+ return
+
+ if not pkt.length:
+ raise ProtocolError, 'unexpected empty packet'
+
+ pkttype = pkt.data[0]
+ if pkttype == PKTTYPE_FWD_REQ:
+ self._forwardRequest(pkt)
+ elif pkttype == PKTTYPE_SHUTDOWN:
+ self._shutdown(pkt)
+ elif pkttype == PKTTYPE_PING:
+ self._ping(pkt)
+ elif pkttype == PKTTYPE_CPING:
+ self._cping(pkt)
+ else:
+ raise ProtocolError, 'unknown packet type'
+
+ def _forwardRequest(self, pkt):
+ """
+ Creates a Request object, fills it in from the packet, then runs it.
+ """
+ assert self._request is None
+
+ req = self.server.requestClass(self)
+ i = ord(pkt.data[1])
+ method = methodTable[i]
+ if method is None:
+ raise ValueError, 'bad method field'
+ req.setMethod(method)
+ value, pos = decodeString(pkt.data, 2)
+ req.setProtocol(value)
+ value, pos = decodeString(pkt.data, pos)
+ req.setRequestURI(value)
+ value, pos = decodeString(pkt.data, pos)
+ req.setRemoteAddr(value)
+ value, pos = decodeString(pkt.data, pos)
+ req.setRemoteHost(value)
+ value, pos = decodeString(pkt.data, pos)
+ req.setServerName(value)
+ value = struct.unpack('>H', pkt.data[pos:pos+2])[0]
+ req.setServerPort(value)
+ i = ord(pkt.data[pos+2])
+ req.setIsSSL(i != 0)
+
+ # Request headers.
+ numHeaders = struct.unpack('>H', pkt.data[pos+3:pos+5])[0]
+ pos += 5
+ for i in range(numHeaders):
+ name, value, pos = decodeRequestHeader(pkt.data, pos)
+ req.addHeader(name, value)
+
+ # Attributes.
+ while True:
+ name, value, pos = decodeAttribute(pkt.data, pos)
+ if name is None:
+ break
+ req.addAttribute(name, value)
+
+ self._request = req
+
+ # Read first body chunk, if needed.
+ if req.input.bytesAvailForAdd():
+ self.processInput()
+
+ # Run Request.
+ req.run()
+
+ self._request = None
+
+ def _shutdown(self, pkt):
+ """Not sure what to do with this yet."""
+ self.logger.info('Received shutdown request from server')
+
+ def _ping(self, pkt):
+ """I have no idea what this packet means."""
+ self.logger.debug('Received ping')
+
+ def _cping(self, pkt):
+ """Respond to a PING (CPING) packet."""
+ self.logger.debug('Received PING, sending PONG')
+ pkt = Packet()
+ pkt.data = PKTTYPE_CPONG
+ self.writePacket(pkt)
+
+ def _processBody(self, pkt):
+ """
+ Handles a body chunk from the server by appending it to the
+ InputStream.
+ """
+ if pkt.length:
+ length = struct.unpack('>H', pkt.data[:2])[0]
+ self._request.input.addData(pkt.data[2:2+length])
+ else:
+ # Shouldn't really ever get here.
+ self._request.input.addData('')
+
+ def writePacket(self, pkt):
+ """Sends a Packet to the server."""
+ pkt.write(self._sock)
+
+class BaseAJPServer(object):
+ # What Request class to use.
+ requestClass = Request
+
+ # Limits the size of the InputStream's string buffer to this size + 8k.
+ # Since the InputStream is not seekable, we throw away already-read
+ # data once this certain amount has been read. (The 8k is there because
+ # it is the maximum size of new data added per chunk.)
+ inputStreamShrinkThreshold = 102400 - 8192
+
+ def __init__(self, application, scriptName='', environ=None,
+ multithreaded=True,
+ bindAddress=('localhost', 8009), allowedServers=None,
+ loggingLevel=logging.INFO):
+ """
+ scriptName is the initial portion of the URL path that "belongs"
+ to your application. It is used to determine PATH_INFO (which doesn't
+ seem to be passed in). An empty scriptName means your application
+ is mounted at the root of your virtual host.
+
+ environ, which must be a dictionary, can contain any additional
+ environment variables you want to pass to your application.
+
+ Set multithreaded to False if your application is not thread-safe.
+
+ bindAddress is the address to bind to, which must be a tuple of
+ length 2. The first element is a string, which is the host name
+ or IPv4 address of a local interface. The 2nd element is the port
+ number.
+
+ allowedServers must be None or a list of strings representing the
+ IPv4 addresses of servers allowed to connect. None means accept
+ connections from anywhere.
+
+ loggingLevel sets the logging level of the module-level logger.
+ """
+ if environ is None:
+ environ = {}
+
+ self.application = application
+ self.scriptName = scriptName
+ self.environ = environ
+ self.multithreaded = multithreaded
+ self._bindAddress = bindAddress
+ self._allowedServers = allowedServers
+
+ # Used to force single-threadedness.
+ self._appLock = thread.allocate_lock()
+
+ self.logger = logging.getLogger(LoggerName)
+ self.logger.setLevel(loggingLevel)
+
+ def _setupSocket(self):
+ """Creates and binds the socket for communication with the server."""
+ sock = socket.socket()
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(self._bindAddress)
+ sock.listen(socket.SOMAXCONN)
+ return sock
+
+ def _cleanupSocket(self, sock):
+ """Closes the main socket."""
+ sock.close()
+
+ def _isClientAllowed(self, addr):
+ ret = self._allowedServers is None or addr[0] in self._allowedServers
+ if not ret:
+ self.logger.warning('Server connection from %s disallowed',
+ addr[0])
+ return ret
+
+ def handler(self, request):
+ """
+ WSGI handler. Sets up WSGI environment, calls the application,
+ and sends the application's response.
+ """
+ environ = request.environ
+ environ.update(self.environ)
+
+ environ['wsgi.version'] = (1,0)
+ environ['wsgi.input'] = request.input
+ environ['wsgi.errors'] = sys.stderr
+ environ['wsgi.multithread'] = self.multithreaded
+ environ['wsgi.multiprocess'] = True
+ environ['wsgi.run_once'] = False
+
+ if environ.get('HTTPS', 'off') in ('on', '1'):
+ environ['wsgi.url_scheme'] = 'https'
+ else:
+ environ['wsgi.url_scheme'] = 'http'
+
+ headers_set = []
+ headers_sent = []
+ result = None
+
+ def write(data):
+ assert type(data) is str, 'write() argument must be string'
+ assert headers_set, 'write() before start_response()'
+
+ if not headers_sent:
+ status, responseHeaders = headers_sent[:] = headers_set
+ statusCode = int(status[:3])
+ statusMsg = status[4:]
+ found = False
+ for header,value in responseHeaders:
+ if header.lower() == 'content-length':
+ found = True
+ break
+ if not found and result is not None:
+ try:
+ if len(result) == 1:
+ responseHeaders.append(('Content-Length',
+ str(len(data))))
+ except:
+ pass
+ request.startResponse(statusCode, statusMsg, responseHeaders)
+
+ request.write(data)
+
+ def start_response(status, response_headers, exc_info=None):
+ if exc_info:
+ try:
+ if headers_sent:
+ # Re-raise if too late
+ raise exc_info[0], exc_info[1], exc_info[2]
+ finally:
+ exc_info = None # avoid dangling circular ref
+ else:
+ assert not headers_set, 'Headers already set!'
+
+ assert type(status) is str, 'Status must be a string'
+ assert len(status) >= 4, 'Status must be at least 4 characters'
+ assert int(status[:3]), 'Status must begin with 3-digit code'
+ assert status[3] == ' ', 'Status must have a space after code'
+ assert type(response_headers) is list, 'Headers must be a list'
+ if __debug__:
+ for name,val in response_headers:
+ assert type(name) is str, 'Header names must be strings'
+ assert type(val) is str, 'Header values must be strings'
+
+ headers_set[:] = [status, response_headers]
+ return write
+
+ if not self.multithreaded:
+ self._appLock.acquire()
+ try:
+ result = self.application(environ, start_response)
+ try:
+ for data in result:
+ if data:
+ write(data)
+ if not headers_sent:
+ write('') # in case body was empty
+ finally:
+ if hasattr(result, 'close'):
+ result.close()
+ finally:
+ if not self.multithreaded:
+ self._appLock.release()
+
+ def error(self, request):
+ """
+ Override to provide custom error handling. Ideally, however,
+ all errors should be caught at the application level.
+ """
+ request.startResponse(200, 'OK', [('Content-Type', 'text/html')])
+ import cgitb
+ request.write(cgitb.html(sys.exc_info()))
diff --git a/flup/server/ajp_fork.py b/flup/server/ajp_fork.py
index 4df9704..4cfaec7 100644
--- a/flup/server/ajp_fork.py
+++ b/flup/server/ajp_fork.py
@@ -79,721 +79,15 @@ that SCRIPT_NAME/PATH_INFO are correctly deduced.
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
-import sys
import socket
-import select
-import struct
import logging
-import errno
-import datetime
-import prefork
-__all__ = ['WSGIServer']
-
-# Packet header prefixes.
-SERVER_PREFIX = '\x12\x34'
-CONTAINER_PREFIX = 'AB'
-
-# Server packet types.
-PKTTYPE_FWD_REQ = '\x02'
-PKTTYPE_SHUTDOWN = '\x07'
-PKTTYPE_PING = '\x08'
-PKTTYPE_CPING = '\x0a'
-
-# Container packet types.
-PKTTYPE_SEND_BODY = '\x03'
-PKTTYPE_SEND_HEADERS = '\x04'
-PKTTYPE_END_RESPONSE = '\x05'
-PKTTYPE_GET_BODY = '\x06'
-PKTTYPE_CPONG = '\x09'
-
-# Code tables for methods/headers/attributes.
-methodTable = [
- None,
- 'OPTIONS',
- 'GET',
- 'HEAD',
- 'POST',
- 'PUT',
- 'DELETE',
- 'TRACE',
- 'PROPFIND',
- 'PROPPATCH',
- 'MKCOL',
- 'COPY',
- 'MOVE',
- 'LOCK',
- 'UNLOCK',
- 'ACL',
- 'REPORT',
- 'VERSION-CONTROL',
- 'CHECKIN',
- 'CHECKOUT',
- 'UNCHECKOUT',
- 'SEARCH',
- 'MKWORKSPACE',
- 'UPDATE',
- 'LABEL',
- 'MERGE',
- 'BASELINE_CONTROL',
- 'MKACTIVITY'
- ]
-
-requestHeaderTable = [
- None,
- 'Accept',
- 'Accept-Charset',
- 'Accept-Encoding',
- 'Accept-Language',
- 'Authorization',
- 'Connection',
- 'Content-Type',
- 'Content-Length',
- 'Cookie',
- 'Cookie2',
- 'Host',
- 'Pragma',
- 'Referer',
- 'User-Agent'
- ]
-
-attributeTable = [
- None,
- 'CONTEXT',
- 'SERVLET_PATH',
- 'REMOTE_USER',
- 'AUTH_TYPE',
- 'QUERY_STRING',
- 'JVM_ROUTE',
- 'SSL_CERT',
- 'SSL_CIPHER',
- 'SSL_SESSION',
- None, # name follows
- 'SSL_KEY_SIZE'
- ]
-
-responseHeaderTable = [
- None,
- 'content-type',
- 'content-language',
- 'content-length',
- 'date',
- 'last-modified',
- 'location',
- 'set-cookie',
- 'set-cookie2',
- 'servlet-engine',
- 'status',
- 'www-authenticate'
- ]
-
-# The main classes use this name for logging.
-LoggerName = 'ajp-wsgi'
-
-# Set up module-level logger.
-console = logging.StreamHandler()
-console.setLevel(logging.DEBUG)
-console.setFormatter(logging.Formatter('%(asctime)s : %(message)s',
- '%Y-%m-%d %H:%M:%S'))
-logging.getLogger(LoggerName).addHandler(console)
-del console
-
-class ProtocolError(Exception):
- """
- Exception raised when the server does something unexpected or
- sends garbled data. Usually leads to a Connection closing.
- """
- pass
-
-def decodeString(data, pos=0):
- """Decode a string."""
- try:
- length = struct.unpack('>H', data[pos:pos+2])[0]
- pos += 2
- if length == 0xffff: # This was undocumented!
- return '', pos
- s = data[pos:pos+length]
- return s, pos+length+1 # Don't forget NUL
- except Exception, e:
- raise ProtocolError, 'decodeString: '+str(e)
-
-def decodeRequestHeader(data, pos=0):
- """Decode a request header/value pair."""
- try:
- if data[pos] == '\xa0':
- # Use table
- i = ord(data[pos+1])
- name = requestHeaderTable[i]
- if name is None:
- raise ValueError, 'bad request header code'
- pos += 2
- else:
- name, pos = decodeString(data, pos)
- value, pos = decodeString(data, pos)
- return name, value, pos
- except Exception, e:
- raise ProtocolError, 'decodeRequestHeader: '+str(e)
-
-def decodeAttribute(data, pos=0):
- """Decode a request attribute."""
- try:
- i = ord(data[pos])
- pos += 1
- if i == 0xff:
- # end
- return None, None, pos
- elif i == 0x0a:
- # name follows
- name, pos = decodeString(data, pos)
- elif i == 0x0b:
- # Special handling of SSL_KEY_SIZE.
- name = attributeTable[i]
- # Value is an int, not a string.
- value = struct.unpack('>H', data[pos:pos+2])[0]
- return name, str(value), pos+2
- else:
- name = attributeTable[i]
- if name is None:
- raise ValueError, 'bad attribute code'
- value, pos = decodeString(data, pos)
- return name, value, pos
- except Exception, e:
- raise ProtocolError, 'decodeAttribute: '+str(e)
-
-def encodeString(s):
- """Encode a string."""
- return struct.pack('>H', len(s)) + s + '\x00'
-
-def encodeResponseHeader(name, value):
- """Encode a response header/value pair."""
- lname = name.lower()
- if lname in responseHeaderTable:
- # Use table
- i = responseHeaderTable.index(lname)
- out = '\xa0' + chr(i)
- else:
- out = encodeString(name)
- out += encodeString(value)
- return out
-
-class Packet(object):
- """An AJP message packet."""
- def __init__(self):
- self.data = ''
- # Don't set this on write, it will be calculated automatically.
- self.length = 0
-
- def _recvall(sock, length):
- """
- Attempts to receive length bytes from a socket, blocking if necessary.
- (Socket may be blocking or non-blocking.)
- """
- dataList = []
- recvLen = 0
- while length:
- try:
- data = sock.recv(length)
- except socket.error, e:
- if e[0] == errno.EAGAIN:
- select.select([sock], [], [])
- continue
- else:
- raise
- if not data: # EOF
- break
- dataList.append(data)
- dataLen = len(data)
- recvLen += dataLen
- length -= dataLen
- return ''.join(dataList), recvLen
- _recvall = staticmethod(_recvall)
-
- def read(self, sock):
- """Attempt to read a packet from the server."""
- try:
- header, length = self._recvall(sock, 4)
- except socket.error:
- # Treat any sort of socket errors as EOF (close Connection).
- raise EOFError
-
- if length < 4:
- raise EOFError
-
- if header[:2] != SERVER_PREFIX:
- raise ProtocolError, 'invalid header'
-
- self.length = struct.unpack('>H', header[2:4])[0]
- if self.length:
- try:
- self.data, length = self._recvall(sock, self.length)
- except socket.error:
- raise EOFError
-
- if length < self.length:
- raise EOFError
-
- def _sendall(sock, data):
- """
- Writes data to a socket and does not return until all the data is sent.
- """
- length = len(data)
- while length:
- try:
- sent = sock.send(data)
- except socket.error, e:
- if e[0] == errno.EPIPE:
- return # Don't bother raising an exception. Just ignore.
- elif e[0] == errno.EAGAIN:
- select.select([], [sock], [])
- continue
- else:
- raise
- data = data[sent:]
- length -= sent
- _sendall = staticmethod(_sendall)
-
- def write(self, sock):
- """Send a packet to the server."""
- self.length = len(self.data)
- self._sendall(sock, CONTAINER_PREFIX + struct.pack('>H', self.length))
- if self.length:
- self._sendall(sock, self.data)
-
-class InputStream(object):
- """
- File-like object that represents the request body (if any). Supports
- the bare mininum methods required by the WSGI spec. Thanks to
- StringIO for ideas.
- """
- def __init__(self, conn):
- self._conn = conn
-
- # See WSGIServer.
- self._shrinkThreshold = conn.server.inputStreamShrinkThreshold
-
- self._buf = ''
- self._bufList = []
- self._pos = 0 # Current read position.
- self._avail = 0 # Number of bytes currently available.
- self._length = 0 # Set to Content-Length in request.
-
- self.logger = logging.getLogger(LoggerName)
-
- def bytesAvailForAdd(self):
- return self._length - self._avail
-
- def _shrinkBuffer(self):
- """Gets rid of already read data (since we can't rewind)."""
- if self._pos >= self._shrinkThreshold:
- self._buf = self._buf[self._pos:]
- self._avail -= self._pos
- self._length -= self._pos
- self._pos = 0
-
- assert self._avail >= 0 and self._length >= 0
-
- def _waitForData(self):
- toAdd = min(self.bytesAvailForAdd(), 0xffff)
- assert toAdd > 0
- pkt = Packet()
- pkt.data = PKTTYPE_GET_BODY + \
- struct.pack('>H', toAdd)
- self._conn.writePacket(pkt)
- self._conn.processInput()
-
- def read(self, n=-1):
- if self._pos == self._length:
- return ''
- while True:
- if n < 0 or (self._avail - self._pos) < n:
- # Not enough data available.
- if not self.bytesAvailForAdd():
- # And there's no more coming.
- newPos = self._avail
- break
- else:
- # Ask for more data and wait.
- self._waitForData()
- continue
- else:
- newPos = self._pos + n
- break
- # Merge buffer list, if necessary.
- if self._bufList:
- self._buf += ''.join(self._bufList)
- self._bufList = []
- r = self._buf[self._pos:newPos]
- self._pos = newPos
- self._shrinkBuffer()
- return r
-
- def readline(self, length=None):
- if self._pos == self._length:
- return ''
- while True:
- # Unfortunately, we need to merge the buffer list early.
- if self._bufList:
- self._buf += ''.join(self._bufList)
- self._bufList = []
- # Find newline.
- i = self._buf.find('\n', self._pos)
- if i < 0:
- # Not found?
- if not self.bytesAvailForAdd():
- # No more data coming.
- newPos = self._avail
- break
- else:
- # Wait for more to come.
- self._waitForData()
- continue
- else:
- newPos = i + 1
- break
- if length is not None:
- if self._pos + length < newPos:
- newPos = self._pos + length
- r = self._buf[self._pos:newPos]
- self._pos = newPos
- self._shrinkBuffer()
- return r
-
- def readlines(self, sizehint=0):
- total = 0
- lines = []
- line = self.readline()
- while line:
- lines.append(line)
- total += len(line)
- if 0 < sizehint <= total:
- break
- line = self.readline()
- return lines
-
- def __iter__(self):
- return self
-
- def next(self):
- r = self.readline()
- if not r:
- raise StopIteration
- return r
-
- def setDataLength(self, length):
- """
- Once Content-Length is known, Request calls this method to set it.
- """
- self._length = length
-
- def addData(self, data):
- """
- Adds data from the server to this InputStream. Note that we never ask
- the server for data beyond the Content-Length, so the server should
- never send us an EOF (empty string argument).
- """
- if not data:
- raise ProtocolError, 'short data'
- self._bufList.append(data)
- length = len(data)
- self._avail += length
- if self._avail > self._length:
- raise ProtocolError, 'too much data'
-
-class Request(object):
- """
- A Request object. A more fitting name would probably be Transaction, but
- it's named Request to mirror my FastCGI driver. :) This object
- encapsulates all the data about the HTTP request and allows the handler
- to send a response.
-
- The only attributes/methods that the handler should concern itself
- with are: environ, input, startResponse(), and write().
- """
- # Do not ever change the following value.
- _maxWrite = 8192 - 4 - 3 # 8k - pkt header - send body header
-
- def __init__(self, conn):
- self._conn = conn
-
- self.environ = {
- 'SCRIPT_NAME': conn.server.scriptName
- }
- self.input = InputStream(conn)
-
- self._headersSent = False
-
- self.logger = logging.getLogger(LoggerName)
-
- def run(self):
- self.logger.info('%s %s',
- self.environ['REQUEST_METHOD'],
- self.environ['REQUEST_URI'])
-
- start = datetime.datetime.now()
-
- try:
- self._conn.server.handler(self)
- except:
- self.logger.exception('Exception caught from handler')
- if not self._headersSent:
- self._conn.server.error(self)
-
- end = datetime.datetime.now()
-
- # Notify server of end of response (reuse flag is set to true).
- pkt = Packet()
- pkt.data = PKTTYPE_END_RESPONSE + '\x01'
- self._conn.writePacket(pkt)
-
- handlerTime = end - start
- self.logger.debug('%s %s done (%.3f secs)',
- self.environ['REQUEST_METHOD'],
- self.environ['REQUEST_URI'],
- handlerTime.seconds +
- handlerTime.microseconds / 1000000.0)
-
- # The following methods are called from the Connection to set up this
- # Request.
-
- def setMethod(self, value):
- self.environ['REQUEST_METHOD'] = value
-
- def setProtocol(self, value):
- self.environ['SERVER_PROTOCOL'] = value
-
- def setRequestURI(self, value):
- self.environ['REQUEST_URI'] = value
-
- scriptName = self._conn.server.scriptName
- if not value.startswith(scriptName):
- self.logger.warning('scriptName does not match request URI')
-
- self.environ['PATH_INFO'] = value[len(scriptName):]
-
- def setRemoteAddr(self, value):
- self.environ['REMOTE_ADDR'] = value
-
- def setRemoteHost(self, value):
- self.environ['REMOTE_HOST'] = value
-
- def setServerName(self, value):
- self.environ['SERVER_NAME'] = value
+from ajp_base import BaseAJPServer, Connection
+from preforkserver import PreforkServer
- def setServerPort(self, value):
- self.environ['SERVER_PORT'] = str(value)
-
- def setIsSSL(self, value):
- if value:
- self.environ['HTTPS'] = 'on'
-
- def addHeader(self, name, value):
- name = name.replace('-', '_').upper()
- if name in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
- self.environ[name] = value
- if name == 'CONTENT_LENGTH':
- length = int(value)
- self.input.setDataLength(length)
- else:
- self.environ['HTTP_'+name] = value
-
- def addAttribute(self, name, value):
- self.environ[name] = value
-
- # The only two methods that should be called from the handler.
-
- def startResponse(self, statusCode, statusMsg, headers):
- """
- Begin the HTTP response. This must only be called once and it
- must be called before any calls to write().
-
- statusCode is the integer status code (e.g. 200). statusMsg
- is the associated reason message (e.g.'OK'). headers is a list
- of 2-tuples - header name/value pairs. (Both header name and value
- must be strings.)
- """
- assert not self._headersSent, 'Headers already sent!'
-
- pkt = Packet()
- pkt.data = PKTTYPE_SEND_HEADERS + \
- struct.pack('>H', statusCode) + \
- encodeString(statusMsg) + \
- struct.pack('>H', len(headers)) + \
- ''.join([encodeResponseHeader(name, value)
- for name,value in headers])
-
- self._conn.writePacket(pkt)
-
- self._headersSent = True
-
- def write(self, data):
- """
- Write data (which comprises the response body). Note that due to
- restrictions on AJP packet size, we limit our writes to 8185 bytes
- each packet.
- """
- assert self._headersSent, 'Headers must be sent first!'
-
- bytesLeft = len(data)
- while bytesLeft:
- toWrite = min(bytesLeft, self._maxWrite)
-
- pkt = Packet()
- pkt.data = PKTTYPE_SEND_BODY + \
- struct.pack('>H', toWrite) + \
- data[:toWrite]
- self._conn.writePacket(pkt)
-
- data = data[toWrite:]
- bytesLeft -= toWrite
-
-class Connection(object):
- """
- A single Connection with the server. Requests are not multiplexed over the
- same connection, so at any given time, the Connection is either
- waiting for a request, or processing a single request.
- """
- def __init__(self, sock, addr, server):
- self.server = server
- self._sock = sock
- self._addr = addr
-
- self._request = None
-
- self.logger = logging.getLogger(LoggerName)
-
- def run(self):
- self.logger.debug('Connection starting up (%s:%d)',
- self._addr[0], self._addr[1])
-
- # Main loop. Errors will cause the loop to be exited and
- # the socket to be closed.
- while True:
- try:
- self.processInput()
- except ProtocolError, e:
- self.logger.error("Protocol error '%s'", str(e))
- break
- except (EOFError, KeyboardInterrupt):
- break
- except:
- self.logger.exception('Exception caught in Connection')
- break
-
- self.logger.debug('Connection shutting down (%s:%d)',
- self._addr[0], self._addr[1])
-
- self._sock.close()
-
- def processInput(self):
- """Wait for and process a single packet."""
- pkt = Packet()
- select.select([self._sock], [], [])
- pkt.read(self._sock)
-
- # Body chunks have no packet type code.
- if self._request is not None:
- self._processBody(pkt)
- return
-
- if not pkt.length:
- raise ProtocolError, 'unexpected empty packet'
-
- pkttype = pkt.data[0]
- if pkttype == PKTTYPE_FWD_REQ:
- self._forwardRequest(pkt)
- elif pkttype == PKTTYPE_SHUTDOWN:
- self._shutdown(pkt)
- elif pkttype == PKTTYPE_PING:
- self._ping(pkt)
- elif pkttype == PKTTYPE_CPING:
- self._cping(pkt)
- else:
- raise ProtocolError, 'unknown packet type'
-
- def _forwardRequest(self, pkt):
- """
- Creates a Request object, fills it in from the packet, then runs it.
- """
- assert self._request is None
-
- req = self.server.requestClass(self)
- i = ord(pkt.data[1])
- method = methodTable[i]
- if method is None:
- raise ValueError, 'bad method field'
- req.setMethod(method)
- value, pos = decodeString(pkt.data, 2)
- req.setProtocol(value)
- value, pos = decodeString(pkt.data, pos)
- req.setRequestURI(value)
- value, pos = decodeString(pkt.data, pos)
- req.setRemoteAddr(value)
- value, pos = decodeString(pkt.data, pos)
- req.setRemoteHost(value)
- value, pos = decodeString(pkt.data, pos)
- req.setServerName(value)
- value = struct.unpack('>H', pkt.data[pos:pos+2])[0]
- req.setServerPort(value)
- i = ord(pkt.data[pos+2])
- req.setIsSSL(i != 0)
-
- # Request headers.
- numHeaders = struct.unpack('>H', pkt.data[pos+3:pos+5])[0]
- pos += 5
- for i in range(numHeaders):
- name, value, pos = decodeRequestHeader(pkt.data, pos)
- req.addHeader(name, value)
-
- # Attributes.
- while True:
- name, value, pos = decodeAttribute(pkt.data, pos)
- if name is None:
- break
- req.addAttribute(name, value)
-
- self._request = req
-
- # Read first body chunk, if needed.
- if req.input.bytesAvailForAdd():
- self.processInput()
-
- # Run Request.
- req.run()
-
- self._request = None
-
- def _shutdown(self, pkt):
- """Not sure what to do with this yet."""
- self.logger.info('Received shutdown request from server')
-
- def _ping(self, pkt):
- """I have no idea what this packet means."""
- self.logger.debug('Received ping')
-
- def _cping(self, pkt):
- """Respond to a PING (CPING) packet."""
- self.logger.debug('Received PING, sending PONG')
- pkt = Packet()
- pkt.data = PKTTYPE_CPONG
- self.writePacket(pkt)
-
- def _processBody(self, pkt):
- """
- Handles a body chunk from the server by appending it to the
- InputStream.
- """
- if pkt.length:
- length = struct.unpack('>H', pkt.data[:2])[0]
- self._request.input.addData(pkt.data[2:2+length])
- else:
- # Shouldn't really ever get here.
- self._request.input.addData('')
-
- def writePacket(self, pkt):
- """Sends a Packet to the server."""
- pkt.write(self._sock)
+__all__ = ['WSGIServer']
-class WSGIServer(prefork.PreforkServer):
+class WSGIServer(BaseAJPServer, PreforkServer):
"""
AJP1.3/WSGI server. Runs your WSGI application as a persistant program
that understands AJP1.3. Opens up a TCP socket, binds it, and then
@@ -806,15 +100,6 @@ class WSGIServer(prefork.PreforkServer):
Of course you will need an AJP1.3 connector for your webserver (e.g.
mod_jk) - see <http://jakarta.apache.org/tomcat/connectors-doc/>.
"""
- # What Request class to use.
- requestClass = Request
-
- # Limits the size of the InputStream's string buffer to this size + 8k.
- # Since the InputStream is not seekable, we throw away already-read
- # data once this certain amount has been read. (The 8k is there because
- # it is the maximum size of new data added per chunk.)
- inputStreamShrinkThreshold = 102400 - 8192
-
def __init__(self, application, scriptName='', environ=None,
bindAddress=('localhost', 8009), allowedServers=None,
loggingLevel=logging.INFO, **kw):
@@ -838,44 +123,17 @@ class WSGIServer(prefork.PreforkServer):
loggingLevel sets the logging level of the module-level logger.
"""
- if kw.has_key('jobClass'):
- del kw['jobClass']
- if kw.has_key('jobArgs'):
- del kw['jobArgs']
- super(WSGIServer, self).__init__(jobClass=Connection,
- jobArgs=(self,), **kw)
-
- if environ is None:
- environ = {}
-
- self.application = application
- self.scriptName = scriptName
- self.environ = environ
-
- self._bindAddress = bindAddress
- self._allowedServers = allowedServers
-
- self.logger = logging.getLogger(LoggerName)
- self.logger.setLevel(loggingLevel)
-
- def _setupSocket(self):
- """Creates and binds the socket for communication with the server."""
- sock = socket.socket()
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(self._bindAddress)
- sock.listen(socket.SOMAXCONN)
- return sock
-
- def _cleanupSocket(self, sock):
- """Closes the main socket."""
- sock.close()
-
- def _isClientAllowed(self, addr):
- ret = self._allowedServers is None or addr[0] in self._allowedServers
- if not ret:
- self.logger.warning('Server connection from %s disallowed',
- addr[0])
- return ret
+ BaseAJPServer.__init__(self, application,
+ scriptName=scriptName,
+ environ=environ,
+ multithreaded=False,
+ bindAddress=bindAddress,
+ allowedServers=allowedServers,
+ loggingLevel=loggingLevel)
+ for key in ('multithreaded', 'jobClass', 'jobArgs'):
+ if kw.has_key(key):
+ del kw[key]
+ PreforkServer.__init__(self, jobClass=Connection, jobArgs=(self,), **kw)
def run(self):
"""
@@ -891,107 +149,15 @@ class WSGIServer(prefork.PreforkServer):
self.logger.error('Failed to bind socket (%s), exiting', e[1])
return False
- ret = super(WSGIServer, self).run(sock)
+ ret = PreforkServer.run(self, sock)
self._cleanupSocket(sock)
- self.logger.info('%s shutting down', self.__class__.__name__)
+ self.logger.info('%s shutting down%s', self.__class__.__name__,
+ self._hupReceived and ' (reload requested)' or '')
return ret
- def handler(self, request):
- """
- WSGI handler. Sets up WSGI environment, calls the application,
- and sends the application's response.
- """
- environ = request.environ
- environ.update(self.environ)
-
- environ['wsgi.version'] = (1,0)
- environ['wsgi.input'] = request.input
- environ['wsgi.errors'] = sys.stderr
- environ['wsgi.multithread'] = False
- environ['wsgi.multiprocess'] = True
- environ['wsgi.run_once'] = False
-
- if environ.get('HTTPS', 'off') in ('on', '1'):
- environ['wsgi.url_scheme'] = 'https'
- else:
- environ['wsgi.url_scheme'] = 'http'
-
- headers_set = []
- headers_sent = []
- result = None
-
- def write(data):
- assert type(data) is str, 'write() argument must be string'
- assert headers_set, 'write() before start_response()'
-
- if not headers_sent:
- status, responseHeaders = headers_sent[:] = headers_set
- statusCode = int(status[:3])
- statusMsg = status[4:]
- found = False
- for header,value in responseHeaders:
- if header.lower() == 'content-length':
- found = True
- break
- if not found and result is not None:
- try:
- if len(result) == 1:
- responseHeaders.append(('Content-Length',
- str(len(data))))
- except:
- pass
- request.startResponse(statusCode, statusMsg, responseHeaders)
-
- request.write(data)
-
- def start_response(status, response_headers, exc_info=None):
- if exc_info:
- try:
- if headers_sent:
- # Re-raise if too late
- raise exc_info[0], exc_info[1], exc_info[2]
- finally:
- exc_info = None # avoid dangling circular ref
- else:
- assert not headers_set, 'Headers already set!'
-
- assert type(status) is str, 'Status must be a string'
- assert len(status) >= 4, 'Status must be at least 4 characters'
- assert int(status[:3]), 'Status must begin with 3-digit code'
- assert status[3] == ' ', 'Status must have a space after code'
- assert type(response_headers) is list, 'Headers must be a list'
- if __debug__:
- for name,val in response_headers:
- assert type(name) is str, 'Header names must be strings'
- assert type(val) is str, 'Header values must be strings'
-
- headers_set[:] = [status, response_headers]
- return write
-
- result = self.application(environ, start_response)
-
- try:
- for data in result:
- if data:
- write(data)
- if not headers_sent:
- write('') # in case body was empty
- finally:
- if hasattr(result, 'close'):
- result.close()
-
- def error(self, request):
- """
- Override to provide custom error handling. Ideally, however,
- all errors should be caught at the application level.
- """
- request.startResponse(200, 'OK', [('Content-Type', 'text/html')])
- import cgitb
- request.write(cgitb.html(sys.exc_info()))
-
if __name__ == '__main__':
def test_app(environ, start_response):
"""Probably not the most efficient example."""
diff --git a/flup/server/fcgi.py b/flup/server/fcgi.py
index 2a536be..df830b9 100644
--- a/flup/server/fcgi.py
+++ b/flup/server/fcgi.py
@@ -39,7 +39,7 @@ Example usage:
from fcgi import WSGIServer
WSGIServer(app).run()
-See the documentation for WSGIServer/Server for more information.
+See the documentation for WSGIServer for more information.
On most platforms, fcgi will fallback to regular CGI behavior if run in a
non-FastCGI context. If you want to force CGI behavior, set the environment
@@ -49,876 +49,25 @@ variable FCGI_FORCE_CGI to "Y" or "y".
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
-import sys
import os
-import signal
-import struct
-import cStringIO as StringIO
-import select
-import socket
-import errno
-import traceback
-try:
- import thread
- import threading
- thread_available = True
-except ImportError:
- import dummy_thread as thread
- import dummy_threading as threading
- thread_available = False
+from fcgi_base import BaseFCGIServer
+from threadedserver import ThreadedServer
__all__ = ['WSGIServer']
-# Constants from the spec.
-FCGI_LISTENSOCK_FILENO = 0
-
-FCGI_HEADER_LEN = 8
-
-FCGI_VERSION_1 = 1
-
-FCGI_BEGIN_REQUEST = 1
-FCGI_ABORT_REQUEST = 2
-FCGI_END_REQUEST = 3
-FCGI_PARAMS = 4
-FCGI_STDIN = 5
-FCGI_STDOUT = 6
-FCGI_STDERR = 7
-FCGI_DATA = 8
-FCGI_GET_VALUES = 9
-FCGI_GET_VALUES_RESULT = 10
-FCGI_UNKNOWN_TYPE = 11
-FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
-
-FCGI_NULL_REQUEST_ID = 0
-
-FCGI_KEEP_CONN = 1
-
-FCGI_RESPONDER = 1
-FCGI_AUTHORIZER = 2
-FCGI_FILTER = 3
-
-FCGI_REQUEST_COMPLETE = 0
-FCGI_CANT_MPX_CONN = 1
-FCGI_OVERLOADED = 2
-FCGI_UNKNOWN_ROLE = 3
-
-FCGI_MAX_CONNS = 'FCGI_MAX_CONNS'
-FCGI_MAX_REQS = 'FCGI_MAX_REQS'
-FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS'
-
-FCGI_Header = '!BBHHBx'
-FCGI_BeginRequestBody = '!HB5x'
-FCGI_EndRequestBody = '!LB3x'
-FCGI_UnknownTypeBody = '!B7x'
-
-FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody)
-FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody)
-
-if __debug__:
- import time
-
- # Set non-zero to write debug output to a file.
- DEBUG = 0
- DEBUGLOG = '/tmp/fcgi.log'
-
- def _debug(level, msg):
- if DEBUG < level:
- return
-
- try:
- f = open(DEBUGLOG, 'a')
- f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg))
- f.close()
- except:
- pass
-
-class InputStream(object):
- """
- File-like object representing FastCGI input streams (FCGI_STDIN and
- FCGI_DATA). Supports the minimum methods required by WSGI spec.
- """
- def __init__(self, conn):
- self._conn = conn
-
- # See Server.
- self._shrinkThreshold = conn.server.inputStreamShrinkThreshold
-
- self._buf = ''
- self._bufList = []
- self._pos = 0 # Current read position.
- self._avail = 0 # Number of bytes currently available.
-
- self._eof = False # True when server has sent EOF notification.
-
- def _shrinkBuffer(self):
- """Gets rid of already read data (since we can't rewind)."""
- if self._pos >= self._shrinkThreshold:
- self._buf = self._buf[self._pos:]
- self._avail -= self._pos
- self._pos = 0
-
- assert self._avail >= 0
-
- def _waitForData(self):
- """Waits for more data to become available."""
- self._conn.process_input()
-
- def read(self, n=-1):
- if self._pos == self._avail and self._eof:
- return ''
- while True:
- if n < 0 or (self._avail - self._pos) < n:
- # Not enough data available.
- if self._eof:
- # And there's no more coming.
- newPos = self._avail
- break
- else:
- # Wait for more data.
- self._waitForData()
- continue
- else:
- newPos = self._pos + n
- break
- # Merge buffer list, if necessary.
- if self._bufList:
- self._buf += ''.join(self._bufList)
- self._bufList = []
- r = self._buf[self._pos:newPos]
- self._pos = newPos
- self._shrinkBuffer()
- return r
-
- def readline(self, length=None):
- if self._pos == self._avail and self._eof:
- return ''
- while True:
- # Unfortunately, we need to merge the buffer list early.
- if self._bufList:
- self._buf += ''.join(self._bufList)
- self._bufList = []
- # Find newline.
- i = self._buf.find('\n', self._pos)
- if i < 0:
- # Not found?
- if self._eof:
- # No more data coming.
- newPos = self._avail
- break
- else:
- # Wait for more to come.
- self._waitForData()
- continue
- else:
- newPos = i + 1
- break
- if length is not None:
- if self._pos + length < newPos:
- newPos = self._pos + length
- r = self._buf[self._pos:newPos]
- self._pos = newPos
- self._shrinkBuffer()
- return r
-
- def readlines(self, sizehint=0):
- total = 0
- lines = []
- line = self.readline()
- while line:
- lines.append(line)
- total += len(line)
- if 0 < sizehint <= total:
- break
- line = self.readline()
- return lines
-
- def __iter__(self):
- return self
-
- def next(self):
- r = self.readline()
- if not r:
- raise StopIteration
- return r
-
- def add_data(self, data):
- if not data:
- self._eof = True
- else:
- self._bufList.append(data)
- self._avail += len(data)
-
-class MultiplexedInputStream(InputStream):
- """
- A version of InputStream meant to be used with MultiplexedConnections.
- Assumes the MultiplexedConnection (the producer) and the Request
- (the consumer) are running in different threads.
- """
- def __init__(self, conn):
- super(MultiplexedInputStream, self).__init__(conn)
-
- # Arbitrates access to this InputStream (it's used simultaneously
- # by a Request and its owning Connection object).
- lock = threading.RLock()
-
- # Notifies Request thread that there is new data available.
- self._lock = threading.Condition(lock)
-
- def _waitForData(self):
- # Wait for notification from add_data().
- self._lock.wait()
-
- def read(self, n=-1):
- self._lock.acquire()
- try:
- return super(MultiplexedInputStream, self).read(n)
- finally:
- self._lock.release()
-
- def readline(self, length=None):
- self._lock.acquire()
- try:
- return super(MultiplexedInputStream, self).readline(length)
- finally:
- self._lock.release()
-
- def add_data(self, data):
- self._lock.acquire()
- try:
- super(MultiplexedInputStream, self).add_data(data)
- self._lock.notify()
- finally:
- self._lock.release()
-
-class OutputStream(object):
- """
- FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to
- write() or writelines() immediately result in Records being sent back
- to the server. Buffering should be done in a higher level!
- """
- def __init__(self, conn, req, type, buffered=False):
- self._conn = conn
- self._req = req
- self._type = type
- self._buffered = buffered
- self._bufList = [] # Used if buffered is True
- self.dataWritten = False
- self.closed = False
-
- def _write(self, data):
- length = len(data)
- while length:
- toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN)
-
- rec = Record(self._type, self._req.requestId)
- rec.contentLength = toWrite
- rec.contentData = data[:toWrite]
- self._conn.writeRecord(rec)
-
- data = data[toWrite:]
- length -= toWrite
-
- def write(self, data):
- assert not self.closed
-
- if not data:
- return
-
- self.dataWritten = True
-
- if self._buffered:
- self._bufList.append(data)
- else:
- self._write(data)
-
- def writelines(self, lines):
- assert not self.closed
-
- for line in lines:
- self.write(line)
-
- def flush(self):
- # Only need to flush if this OutputStream is actually buffered.
- if self._buffered:
- data = ''.join(self._bufList)
- self._bufList = []
- self._write(data)
-
- # Though available, the following should NOT be called by WSGI apps.
- def close(self):
- """Sends end-of-stream notification, if necessary."""
- if not self.closed and self.dataWritten:
- self.flush()
- rec = Record(self._type, self._req.requestId)
- self._conn.writeRecord(rec)
- self.closed = True
-
-class TeeOutputStream(object):
- """
- Simple wrapper around two or more output file-like objects that copies
- written data to all streams.
- """
- def __init__(self, streamList):
- self._streamList = streamList
-
- def write(self, data):
- for f in self._streamList:
- f.write(data)
-
- def writelines(self, lines):
- for line in lines:
- self.write(line)
-
- def flush(self):
- for f in self._streamList:
- f.flush()
-
-class StdoutWrapper(object):
- """
- Wrapper for sys.stdout so we know if data has actually been written.
- """
- def __init__(self, stdout):
- self._file = stdout
- self.dataWritten = False
-
- def write(self, data):
- if data:
- self.dataWritten = True
- self._file.write(data)
-
- def writelines(self, lines):
- for line in lines:
- self.write(line)
-
- def __getattr__(self, name):
- return getattr(self._file, name)
-
-def decode_pair(s, pos=0):
- """
- Decodes a name/value pair.
-
- The number of bytes decoded as well as the name/value pair
- are returned.
- """
- nameLength = ord(s[pos])
- if nameLength & 128:
- nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
- pos += 4
- else:
- pos += 1
-
- valueLength = ord(s[pos])
- if valueLength & 128:
- valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
- pos += 4
- else:
- pos += 1
-
- name = s[pos:pos+nameLength]
- pos += nameLength
- value = s[pos:pos+valueLength]
- pos += valueLength
-
- return (pos, (name, value))
-
-def encode_pair(name, value):
- """
- Encodes a name/value pair.
-
- The encoded string is returned.
- """
- nameLength = len(name)
- if nameLength < 128:
- s = chr(nameLength)
- else:
- s = struct.pack('!L', nameLength | 0x80000000L)
-
- valueLength = len(value)
- if valueLength < 128:
- s += chr(valueLength)
- else:
- s += struct.pack('!L', valueLength | 0x80000000L)
-
- return s + name + value
-
-class Record(object):
- """
- A FastCGI Record.
-
- Used for encoding/decoding records.
- """
- def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID):
- self.version = FCGI_VERSION_1
- self.type = type
- self.requestId = requestId
- self.contentLength = 0
- self.paddingLength = 0
- self.contentData = ''
-
- def _recvall(sock, length):
- """
- Attempts to receive length bytes from a socket, blocking if necessary.
- (Socket may be blocking or non-blocking.)
- """
- dataList = []
- recvLen = 0
- while length:
- try:
- data = sock.recv(length)
- except socket.error, e:
- if e[0] == errno.EAGAIN:
- select.select([sock], [], [])
- continue
- else:
- raise
- if not data: # EOF
- break
- dataList.append(data)
- dataLen = len(data)
- recvLen += dataLen
- length -= dataLen
- return ''.join(dataList), recvLen
- _recvall = staticmethod(_recvall)
-
- def read(self, sock):
- """Read and decode a Record from a socket."""
- try:
- header, length = self._recvall(sock, FCGI_HEADER_LEN)
- except:
- raise EOFError
-
- if length < FCGI_HEADER_LEN:
- raise EOFError
-
- self.version, self.type, self.requestId, self.contentLength, \
- self.paddingLength = struct.unpack(FCGI_Header, header)
-
- if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, '
- 'contentLength = %d' %
- (sock.fileno(), self.type, self.requestId,
- self.contentLength))
-
- if self.contentLength:
- try:
- self.contentData, length = self._recvall(sock,
- self.contentLength)
- except:
- raise EOFError
-
- if length < self.contentLength:
- raise EOFError
-
- if self.paddingLength:
- try:
- self._recvall(sock, self.paddingLength)
- except:
- raise EOFError
-
- def _sendall(sock, data):
- """
- Writes data to a socket and does not return until all the data is sent.
- """
- length = len(data)
- while length:
- try:
- sent = sock.send(data)
- except socket.error, e:
- if e[0] == errno.EPIPE:
- return # Don't bother raising an exception. Just ignore.
- elif e[0] == errno.EAGAIN:
- select.select([], [sock], [])
- continue
- else:
- raise
- data = data[sent:]
- length -= sent
- _sendall = staticmethod(_sendall)
-
- def write(self, sock):
- """Encode and write a Record to a socket."""
- self.paddingLength = -self.contentLength & 7
-
- if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, '
- 'contentLength = %d' %
- (sock.fileno(), self.type, self.requestId,
- self.contentLength))
-
- header = struct.pack(FCGI_Header, self.version, self.type,
- self.requestId, self.contentLength,
- self.paddingLength)
- self._sendall(sock, header)
- if self.contentLength:
- self._sendall(sock, self.contentData)
- if self.paddingLength:
- self._sendall(sock, '\x00'*self.paddingLength)
-
-class Request(object):
- """
- Represents a single FastCGI request.
-
- These objects are passed to your handler and is the main interface
- between your handler and the fcgi module. The methods should not
- be called by your handler. However, server, params, stdin, stdout,
- stderr, and data are free for your handler's use.
- """
- def __init__(self, conn, inputStreamClass):
- self._conn = conn
-
- self.server = conn.server
- self.params = {}
- self.stdin = inputStreamClass(conn)
- self.stdout = OutputStream(conn, self, FCGI_STDOUT)
- self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True)
- self.data = inputStreamClass(conn)
-
- def run(self):
- """Runs the handler, flushes the streams, and ends the request."""
- try:
- protocolStatus, appStatus = self.server.handler(self)
- except:
- traceback.print_exc(file=self.stderr)
- self.stderr.flush()
- if not self.stdout.dataWritten:
- self.server.error(self)
-
- protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0
-
- if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' %
- (protocolStatus, appStatus))
-
- self._flush()
- self._end(appStatus, protocolStatus)
-
- def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
- self._conn.end_request(self, appStatus, protocolStatus)
-
- def _flush(self):
- self.stdout.close()
- self.stderr.close()
-
-class CGIRequest(Request):
- """A normal CGI request disguised as a FastCGI request."""
- def __init__(self, server):
- # These are normally filled in by Connection.
- self.requestId = 1
- self.role = FCGI_RESPONDER
- self.flags = 0
- self.aborted = False
-
- self.server = server
- self.params = dict(os.environ)
- self.stdin = sys.stdin
- self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity!
- self.stderr = sys.stderr
- self.data = StringIO.StringIO()
-
- def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
- sys.exit(appStatus)
-
- def _flush(self):
- # Not buffered, do nothing.
- pass
-
-class Connection(object):
- """
- A Connection with the web server.
-
- Each Connection is associated with a single socket (which is
- connected to the web server) and is responsible for handling all
- the FastCGI message processing for that socket.
- """
- _multiplexed = False
- _inputStreamClass = InputStream
-
- def __init__(self, sock, addr, server):
- self._sock = sock
- self._addr = addr
- self.server = server
-
- # Active Requests for this Connection, mapped by request ID.
- self._requests = {}
-
- def _cleanupSocket(self):
- """Close the Connection's socket."""
- self._sock.close()
-
- def run(self):
- """Begin processing data from the socket."""
- self._keepGoing = True
- while self._keepGoing:
- try:
- self.process_input()
- except EOFError:
- break
- except (select.error, socket.error), e:
- if e[0] == errno.EBADF: # Socket was closed by Request.
- break
- raise
-
- self._cleanupSocket()
-
- def process_input(self):
- """Attempt to read a single Record from the socket and process it."""
- # Currently, any children Request threads notify this Connection
- # that it is no longer needed by closing the Connection's socket.
- # We need to put a timeout on select, otherwise we might get
- # stuck in it indefinitely... (I don't like this solution.)
- while self._keepGoing:
- try:
- r, w, e = select.select([self._sock], [], [], 1.0)
- except ValueError:
- # Sigh. ValueError gets thrown sometimes when passing select
- # a closed socket.
- raise EOFError
- if r: break
- if not self._keepGoing:
- return
- rec = Record()
- rec.read(self._sock)
-
- if rec.type == FCGI_GET_VALUES:
- self._do_get_values(rec)
- elif rec.type == FCGI_BEGIN_REQUEST:
- self._do_begin_request(rec)
- elif rec.type == FCGI_ABORT_REQUEST:
- self._do_abort_request(rec)
- elif rec.type == FCGI_PARAMS:
- self._do_params(rec)
- elif rec.type == FCGI_STDIN:
- self._do_stdin(rec)
- elif rec.type == FCGI_DATA:
- self._do_data(rec)
- elif rec.requestId == FCGI_NULL_REQUEST_ID:
- self._do_unknown_type(rec)
- else:
- # Need to complain about this.
- pass
-
- def writeRecord(self, rec):
- """
- Write a Record to the socket.
- """
- rec.write(self._sock)
-
- def end_request(self, req, appStatus=0L,
- protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
- """
- End a Request.
-
- Called by Request objects. An FCGI_END_REQUEST Record is
- sent to the web server. If the web server no longer requires
- the connection, the socket is closed, thereby ending this
- Connection (run() returns).
- """
- rec = Record(FCGI_END_REQUEST, req.requestId)
- rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus,
- protocolStatus)
- rec.contentLength = FCGI_EndRequestBody_LEN
- self.writeRecord(rec)
-
- if remove:
- del self._requests[req.requestId]
-
- if __debug__: _debug(2, 'end_request: flags = %d' % req.flags)
-
- if not (req.flags & FCGI_KEEP_CONN) and not self._requests:
- self._sock.close()
- self._keepGoing = False
-
- def _do_get_values(self, inrec):
- """Handle an FCGI_GET_VALUES request from the web server."""
- outrec = Record(FCGI_GET_VALUES_RESULT)
-
- pos = 0
- while pos < inrec.contentLength:
- pos, (name, value) = decode_pair(inrec.contentData, pos)
- cap = self.server.capability.get(name)
- if cap is not None:
- outrec.contentData += encode_pair(name, str(cap))
-
- outrec.contentLength = len(outrec.contentData)
- self.writeRecord(rec)
-
- def _do_begin_request(self, inrec):
- """Handle an FCGI_BEGIN_REQUEST from the web server."""
- role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData)
-
- req = self.server.request_class(self, self._inputStreamClass)
- req.requestId, req.role, req.flags = inrec.requestId, role, flags
- req.aborted = False
-
- if not self._multiplexed and self._requests:
- # Can't multiplex requests.
- self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False)
- else:
- self._requests[inrec.requestId] = req
-
- def _do_abort_request(self, inrec):
- """
- Handle an FCGI_ABORT_REQUEST from the web server.
-
- We just mark a flag in the associated Request.
- """
- req = self._requests.get(inrec.requestId)
- if req is not None:
- req.aborted = True
-
- def _start_request(self, req):
- """Run the request."""
- # Not multiplexed, so run it inline.
- req.run()
-
- def _do_params(self, inrec):
- """
- Handle an FCGI_PARAMS Record.
-
- If the last FCGI_PARAMS Record is received, start the request.
- """
- req = self._requests.get(inrec.requestId)
- if req is not None:
- if inrec.contentLength:
- pos = 0
- while pos < inrec.contentLength:
- pos, (name, value) = decode_pair(inrec.contentData, pos)
- req.params[name] = value
- else:
- self._start_request(req)
-
- def _do_stdin(self, inrec):
- """Handle the FCGI_STDIN stream."""
- req = self._requests.get(inrec.requestId)
- if req is not None:
- req.stdin.add_data(inrec.contentData)
-
- def _do_data(self, inrec):
- """Handle the FCGI_DATA stream."""
- req = self._requests.get(inrec.requestId)
- if req is not None:
- req.data.add_data(inrec.contentData)
-
- def _do_unknown_type(self, inrec):
- """Handle an unknown request type. Respond accordingly."""
- outrec = Record(FCGI_UNKNOWN_TYPE)
- outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type)
- outrec.contentLength = FCGI_UnknownTypeBody_LEN
- self.writeRecord(rec)
-
-class MultiplexedConnection(Connection):
- """
- A version of Connection capable of handling multiple requests
- simultaneously.
- """
- _multiplexed = True
- _inputStreamClass = MultiplexedInputStream
-
- def __init__(self, sock, addr, server):
- super(MultiplexedConnection, self).__init__(sock, addr, server)
-
- # Used to arbitrate access to self._requests.
- lock = threading.RLock()
-
- # Notification is posted everytime a request completes, allowing us
- # to quit cleanly.
- self._lock = threading.Condition(lock)
-
- def _cleanupSocket(self):
- # Wait for any outstanding requests before closing the socket.
- self._lock.acquire()
- while self._requests:
- self._lock.wait()
- self._lock.release()
-
- super(MultiplexedConnection, self)._cleanupSocket()
-
- def writeRecord(self, rec):
- # Must use locking to prevent intermingling of Records from different
- # threads.
- self._lock.acquire()
- try:
- # Probably faster than calling super. ;)
- rec.write(self._sock)
- finally:
- self._lock.release()
-
- def end_request(self, req, appStatus=0L,
- protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
- self._lock.acquire()
- try:
- super(MultiplexedConnection, self).end_request(req, appStatus,
- protocolStatus,
- remove)
- self._lock.notify()
- finally:
- self._lock.release()
-
- def _do_begin_request(self, inrec):
- self._lock.acquire()
- try:
- super(MultiplexedConnection, self)._do_begin_request(inrec)
- finally:
- self._lock.release()
-
- def _do_abort_request(self, inrec):
- self._lock.acquire()
- try:
- super(MultiplexedConnection, self)._do_abort_request(inrec)
- finally:
- self._lock.release()
-
- def _start_request(self, req):
- thread.start_new_thread(req.run, ())
-
- def _do_params(self, inrec):
- self._lock.acquire()
- try:
- super(MultiplexedConnection, self)._do_params(inrec)
- finally:
- self._lock.release()
-
- def _do_stdin(self, inrec):
- self._lock.acquire()
- try:
- super(MultiplexedConnection, self)._do_stdin(inrec)
- finally:
- self._lock.release()
-
- def _do_data(self, inrec):
- self._lock.acquire()
- try:
- super(MultiplexedConnection, self)._do_data(inrec)
- finally:
- self._lock.release()
-
-class Server(object):
+class WSGIServer(BaseFCGIServer, ThreadedServer):
"""
- The FastCGI server.
-
- Waits for connections from the web server, processing each
- request.
-
- If run in a normal CGI context, it will instead instantiate a
- CGIRequest and run the handler through there.
+ FastCGI server that supports the Web Server Gateway Interface. See
+ <http://www.python.org/peps/pep-0333.html>.
"""
- request_class = Request
- cgirequest_class = CGIRequest
-
- # Limits the size of the InputStream's string buffer to this size + the
- # server's maximum Record size. Since the InputStream is not seekable,
- # we throw away already-read data once this certain amount has been read.
- inputStreamShrinkThreshold = 102400 - 8192
-
- def __init__(self, handler=None, maxwrite=8192, bindAddress=None,
- multiplexed=False):
+ def __init__(self, application, environ=None,
+ multithreaded=True,
+ bindAddress=None, multiplexed=False, **kw):
"""
- handler, if present, must reference a function or method that
- takes one argument: a Request object. If handler is not
- specified at creation time, Server *must* be subclassed.
- (The handler method below is abstract.)
-
- maxwrite is the maximum number of bytes (per Record) to write
- to the server. I've noticed mod_fastcgi has a relatively small
- receive buffer (8K or so).
+ environ, if present, must be a dictionary-like object. Its
+ contents will be copied into application's environ. Useful
+ for passing application-specific variables.
bindAddress, if present, must either be a string or a 2-tuple. If
present, run() will open its own listening socket. You would use
@@ -928,354 +77,40 @@ class Server(object):
socket will be opened. If a tuple, the first element, a string,
is the interface name/IP to bind to, and the second element (an int)
is the port number.
-
- Set multiplexed to True if you want to handle multiple requests
- per connection. Some FastCGI backends (namely mod_fastcgi) don't
- multiplex requests at all, so by default this is off (which saves
- on thread creation/locking overhead). If threads aren't available,
- this keyword is ignored; it's not possible to multiplex requests
- at all.
"""
- if handler is not None:
- self.handler = handler
- self.maxwrite = maxwrite
- if thread_available:
- try:
- import resource
- # Attempt to glean the maximum number of connections
- # from the OS.
- maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
- except ImportError:
- maxConns = 100 # Just some made up number.
- maxReqs = maxConns
- if multiplexed:
- self._connectionClass = MultiplexedConnection
- maxReqs *= 5 # Another made up number.
- else:
- self._connectionClass = Connection
- self.capability = {
- FCGI_MAX_CONNS: maxConns,
- FCGI_MAX_REQS: maxReqs,
- FCGI_MPXS_CONNS: multiplexed and 1 or 0
- }
- else:
- self._connectionClass = Connection
- self.capability = {
- # If threads aren't available, these are pretty much correct.
- FCGI_MAX_CONNS: 1,
- FCGI_MAX_REQS: 1,
- FCGI_MPXS_CONNS: 0
- }
- self._bindAddress = bindAddress
-
- def _setupSocket(self):
- if self._bindAddress is None: # Run as a normal FastCGI?
- isFCGI = True
+ BaseFCGIServer.__init__(self, application,
+ environ=environ,
+ multithreaded=multithreaded,
+ bindAddress=bindAddress,
+ multiplexed=multiplexed)
+ for key in ('jobClass', 'jobArgs'):
+ if kw.has_key(key):
+ del kw[key]
+ ThreadedServer.__init__(self, jobClass=self._connectionClass,
+ jobArgs=(self,), **kw)
+
+ def _isClientAllowed(self, addr):
+ return self._web_server_addrs is None or \
+ (len(addr) == 2 and addr[0] in self._web_server_addrs)
- sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET,
- socket.SOCK_STREAM)
- try:
- sock.getpeername()
- except socket.error, e:
- if e[0] == errno.ENOTSOCK:
- # Not a socket, assume CGI context.
- isFCGI = False
- elif e[0] != errno.ENOTCONN:
- raise
-
- # FastCGI/CGI discrimination is broken on Mac OS X.
- # Set the environment variable FCGI_FORCE_CGI to "Y" or "y"
- # if you want to run your app as a simple CGI. (You can do
- # this with Apache's mod_env [not loaded by default in OS X
- # client, ha ha] and the SetEnv directive.)
- if not isFCGI or \
- os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'):
- req = self.cgirequest_class(self)
- req.run()
- sys.exit(0)
- else:
- # Run as a server
- if type(self._bindAddress) is str:
- # Unix socket
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- try:
- os.unlink(self._bindAddress)
- except OSError:
- pass
- else:
- # INET socket
- assert type(self._bindAddress) is tuple
- assert len(self._bindAddress) == 2
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- sock.bind(self._bindAddress)
- sock.listen(socket.SOMAXCONN)
-
- return sock
-
- def _cleanupSocket(self, sock):
- """Closes the main socket."""
- sock.close()
-
- def _installSignalHandlers(self):
- self._oldSIGs = [(x,signal.getsignal(x)) for x in
- (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)]
- signal.signal(signal.SIGHUP, self._hupHandler)
- signal.signal(signal.SIGINT, self._intHandler)
- signal.signal(signal.SIGTERM, self._intHandler)
-
- def _restoreSignalHandlers(self):
- for signum,handler in self._oldSIGs:
- signal.signal(signum, handler)
-
- def _hupHandler(self, signum, frame):
- self._hupReceived = True
- self._keepGoing = False
-
- def _intHandler(self, signum, frame):
- self._keepGoing = False
-
- def run(self, timeout=1.0):
+ def run(self):
"""
The main loop. Exits on SIGHUP, SIGINT, SIGTERM. Returns True if
SIGHUP was received, False otherwise.
"""
- web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS')
- if web_server_addrs is not None:
- web_server_addrs = map(lambda x: x.strip(),
- web_server_addrs.split(','))
+ self._web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS')
+ if self._web_server_addrs is not None:
+ self._web_server_addrs = map(lambda x: x.strip(),
+ self._web_server_addrs.split(','))
sock = self._setupSocket()
- self._keepGoing = True
- self._hupReceived = False
-
- # Install signal handlers.
- self._installSignalHandlers()
-
- while self._keepGoing:
- try:
- r, w, e = select.select([sock], [], [], timeout)
- except select.error, e:
- if e[0] == errno.EINTR:
- continue
- raise
-
- if r:
- try:
- clientSock, addr = sock.accept()
- except socket.error, e:
- if e[0] in (errno.EINTR, errno.EAGAIN):
- continue
- raise
-
- if web_server_addrs and \
- (len(addr) != 2 or addr[0] not in web_server_addrs):
- clientSock.close()
- continue
-
- # Instantiate a new Connection and begin processing FastCGI
- # messages (either in a new thread or this thread).
- conn = self._connectionClass(clientSock, addr, self)
- thread.start_new_thread(conn.run, ())
-
- self._mainloopPeriodic()
-
- # Restore signal handlers.
- self._restoreSignalHandlers()
+ ret = ThreadedServer.run(self, sock)
self._cleanupSocket(sock)
- return self._hupReceived
-
- def _mainloopPeriodic(self):
- """
- Called with just about each iteration of the main loop. Meant to
- be overridden.
- """
- pass
-
- def _exit(self, reload=False):
- """
- Protected convenience method for subclasses to force an exit. Not
- really thread-safe, which is why it isn't public.
- """
- if self._keepGoing:
- self._keepGoing = False
- self._hupReceived = reload
-
- def handler(self, req):
- """
- Default handler, which just raises an exception. Unless a handler
- is passed at initialization time, this must be implemented by
- a subclass.
- """
- raise NotImplementedError, self.__class__.__name__ + '.handler'
-
- def error(self, req):
- """
- Called by Request if an exception occurs within the handler. May and
- should be overridden.
- """
- import cgitb
- req.stdout.write('Content-Type: text/html\r\n\r\n' +
- cgitb.html(sys.exc_info()))
-
-class WSGIServer(Server):
- """
- FastCGI server that supports the Web Server Gateway Interface. See
- <http://www.python.org/peps/pep-0333.html>.
- """
- def __init__(self, application, environ=None, multithreaded=True, **kw):
- """
- environ, if present, must be a dictionary-like object. Its
- contents will be copied into application's environ. Useful
- for passing application-specific variables.
-
- Set multithreaded to False if your application is not MT-safe.
- """
- if kw.has_key('handler'):
- del kw['handler'] # Doesn't make sense to let this through
- super(WSGIServer, self).__init__(**kw)
-
- if environ is None:
- environ = {}
-
- self.application = application
- self.environ = environ
- self.multithreaded = multithreaded
-
- # Used to force single-threadedness
- self._app_lock = thread.allocate_lock()
-
- def handler(self, req):
- """Special handler for WSGI."""
- if req.role != FCGI_RESPONDER:
- return FCGI_UNKNOWN_ROLE, 0
-
- # Mostly taken from example CGI gateway.
- environ = req.params
- environ.update(self.environ)
-
- environ['wsgi.version'] = (1,0)
- environ['wsgi.input'] = req.stdin
- if self._bindAddress is None:
- stderr = req.stderr
- else:
- stderr = TeeOutputStream((sys.stderr, req.stderr))
- environ['wsgi.errors'] = stderr
- environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \
- thread_available and self.multithreaded
- # Rationale for the following: If started by the web server
- # (self._bindAddress is None) in either FastCGI or CGI mode, the
- # possibility of being spawned multiple times simultaneously is quite
- # real. And, if started as an external server, multiple copies may be
- # spawned for load-balancing/redundancy. (Though I don't think
- # mod_fastcgi supports this?)
- environ['wsgi.multiprocess'] = True
- environ['wsgi.run_once'] = isinstance(req, CGIRequest)
-
- if environ.get('HTTPS', 'off') in ('on', '1'):
- environ['wsgi.url_scheme'] = 'https'
- else:
- environ['wsgi.url_scheme'] = 'http'
-
- self._sanitizeEnv(environ)
-
- headers_set = []
- headers_sent = []
- result = None
-
- def write(data):
- assert type(data) is str, 'write() argument must be string'
- assert headers_set, 'write() before start_response()'
-
- if not headers_sent:
- status, responseHeaders = headers_sent[:] = headers_set
- found = False
- for header,value in responseHeaders:
- if header.lower() == 'content-length':
- found = True
- break
- if not found and result is not None:
- try:
- if len(result) == 1:
- responseHeaders.append(('Content-Length',
- str(len(data))))
- except:
- pass
- s = 'Status: %s\r\n' % status
- for header in responseHeaders:
- s += '%s: %s\r\n' % header
- s += '\r\n'
- req.stdout.write(s)
-
- req.stdout.write(data)
- req.stdout.flush()
-
- def start_response(status, response_headers, exc_info=None):
- if exc_info:
- try:
- if headers_sent:
- # Re-raise if too late
- raise exc_info[0], exc_info[1], exc_info[2]
- finally:
- exc_info = None # avoid dangling circular ref
- else:
- assert not headers_set, 'Headers already set!'
-
- assert type(status) is str, 'Status must be a string'
- assert len(status) >= 4, 'Status must be at least 4 characters'
- assert int(status[:3]), 'Status must begin with 3-digit code'
- assert status[3] == ' ', 'Status must have a space after code'
- assert type(response_headers) is list, 'Headers must be a list'
- if __debug__:
- for name,val in response_headers:
- assert type(name) is str, 'Header names must be strings'
- assert type(val) is str, 'Header values must be strings'
-
- headers_set[:] = [status, response_headers]
- return write
-
- if not self.multithreaded:
- self._app_lock.acquire()
- try:
- result = self.application(environ, start_response)
- try:
- for data in result:
- if data:
- write(data)
- if not headers_sent:
- write('') # in case body was empty
- finally:
- if hasattr(result, 'close'):
- result.close()
- finally:
- if not self.multithreaded:
- self._app_lock.release()
-
- return FCGI_REQUEST_COMPLETE, 0
-
- def _sanitizeEnv(self, environ):
- """Ensure certain values are present, if required by WSGI."""
- if not environ.has_key('SCRIPT_NAME'):
- environ['SCRIPT_NAME'] = ''
- if not environ.has_key('PATH_INFO'):
- environ['PATH_INFO'] = ''
+ return ret
- # If any of these are missing, it probably signifies a broken
- # server...
- for name,default in [('REQUEST_METHOD', 'GET'),
- ('SERVER_NAME', 'localhost'),
- ('SERVER_PORT', '80'),
- ('SERVER_PROTOCOL', 'HTTP/1.0')]:
- if not environ.has_key(name):
- environ['wsgi.errors'].write('%s: missing FastCGI param %s '
- 'required by WSGI!\n' %
- (self.__class__.__name__, name))
- environ[name] = default
-
if __name__ == '__main__':
def test_app(environ, start_response):
"""Probably not the most efficient example."""
diff --git a/flup/server/fcgi_base.py b/flup/server/fcgi_base.py
new file mode 100644
index 0000000..33e13d3
--- /dev/null
+++ b/flup/server/fcgi_base.py
@@ -0,0 +1,1128 @@
+# Copyright (c) 2002, 2003, 2005 Allan Saddi <allan@saddi.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+#
+# $Id$
+
+__author__ = 'Allan Saddi <allan@saddi.com>'
+__version__ = '$Revision$'
+
+import sys
+import os
+import signal
+import struct
+import cStringIO as StringIO
+import select
+import socket
+import errno
+import traceback
+
+try:
+ import thread
+ import threading
+ thread_available = True
+except ImportError:
+ import dummy_thread as thread
+ import dummy_threading as threading
+ thread_available = False
+
+__all__ = ['BaseFCGIServer']
+
+# Constants from the spec.
+FCGI_LISTENSOCK_FILENO = 0
+
+FCGI_HEADER_LEN = 8
+
+FCGI_VERSION_1 = 1
+
+FCGI_BEGIN_REQUEST = 1
+FCGI_ABORT_REQUEST = 2
+FCGI_END_REQUEST = 3
+FCGI_PARAMS = 4
+FCGI_STDIN = 5
+FCGI_STDOUT = 6
+FCGI_STDERR = 7
+FCGI_DATA = 8
+FCGI_GET_VALUES = 9
+FCGI_GET_VALUES_RESULT = 10
+FCGI_UNKNOWN_TYPE = 11
+FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
+
+FCGI_NULL_REQUEST_ID = 0
+
+FCGI_KEEP_CONN = 1
+
+FCGI_RESPONDER = 1
+FCGI_AUTHORIZER = 2
+FCGI_FILTER = 3
+
+FCGI_REQUEST_COMPLETE = 0
+FCGI_CANT_MPX_CONN = 1
+FCGI_OVERLOADED = 2
+FCGI_UNKNOWN_ROLE = 3
+
+FCGI_MAX_CONNS = 'FCGI_MAX_CONNS'
+FCGI_MAX_REQS = 'FCGI_MAX_REQS'
+FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS'
+
+FCGI_Header = '!BBHHBx'
+FCGI_BeginRequestBody = '!HB5x'
+FCGI_EndRequestBody = '!LB3x'
+FCGI_UnknownTypeBody = '!B7x'
+
+FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody)
+FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody)
+
+if __debug__:
+ import time
+
+ # Set non-zero to write debug output to a file.
+ DEBUG = 0
+ DEBUGLOG = '/tmp/fcgi.log'
+
+ def _debug(level, msg):
+ if DEBUG < level:
+ return
+
+ try:
+ f = open(DEBUGLOG, 'a')
+ f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg))
+ f.close()
+ except:
+ pass
+
+class InputStream(object):
+ """
+ File-like object representing FastCGI input streams (FCGI_STDIN and
+ FCGI_DATA). Supports the minimum methods required by WSGI spec.
+ """
+ def __init__(self, conn):
+ self._conn = conn
+
+ # See Server.
+ self._shrinkThreshold = conn.server.inputStreamShrinkThreshold
+
+ self._buf = ''
+ self._bufList = []
+ self._pos = 0 # Current read position.
+ self._avail = 0 # Number of bytes currently available.
+
+ self._eof = False # True when server has sent EOF notification.
+
+ def _shrinkBuffer(self):
+ """Gets rid of already read data (since we can't rewind)."""
+ if self._pos >= self._shrinkThreshold:
+ self._buf = self._buf[self._pos:]
+ self._avail -= self._pos
+ self._pos = 0
+
+ assert self._avail >= 0
+
+ def _waitForData(self):
+ """Waits for more data to become available."""
+ self._conn.process_input()
+
+ def read(self, n=-1):
+ if self._pos == self._avail and self._eof:
+ return ''
+ while True:
+ if n < 0 or (self._avail - self._pos) < n:
+ # Not enough data available.
+ if self._eof:
+ # And there's no more coming.
+ newPos = self._avail
+ break
+ else:
+ # Wait for more data.
+ self._waitForData()
+ continue
+ else:
+ newPos = self._pos + n
+ break
+ # Merge buffer list, if necessary.
+ if self._bufList:
+ self._buf += ''.join(self._bufList)
+ self._bufList = []
+ r = self._buf[self._pos:newPos]
+ self._pos = newPos
+ self._shrinkBuffer()
+ return r
+
+ def readline(self, length=None):
+ if self._pos == self._avail and self._eof:
+ return ''
+ while True:
+ # Unfortunately, we need to merge the buffer list early.
+ if self._bufList:
+ self._buf += ''.join(self._bufList)
+ self._bufList = []
+ # Find newline.
+ i = self._buf.find('\n', self._pos)
+ if i < 0:
+ # Not found?
+ if self._eof:
+ # No more data coming.
+ newPos = self._avail
+ break
+ else:
+ # Wait for more to come.
+ self._waitForData()
+ continue
+ else:
+ newPos = i + 1
+ break
+ if length is not None:
+ if self._pos + length < newPos:
+ newPos = self._pos + length
+ r = self._buf[self._pos:newPos]
+ self._pos = newPos
+ self._shrinkBuffer()
+ return r
+
+ def readlines(self, sizehint=0):
+ total = 0
+ lines = []
+ line = self.readline()
+ while line:
+ lines.append(line)
+ total += len(line)
+ if 0 < sizehint <= total:
+ break
+ line = self.readline()
+ return lines
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ r = self.readline()
+ if not r:
+ raise StopIteration
+ return r
+
+ def add_data(self, data):
+ if not data:
+ self._eof = True
+ else:
+ self._bufList.append(data)
+ self._avail += len(data)
+
+class MultiplexedInputStream(InputStream):
+ """
+ A version of InputStream meant to be used with MultiplexedConnections.
+ Assumes the MultiplexedConnection (the producer) and the Request
+ (the consumer) are running in different threads.
+ """
+ def __init__(self, conn):
+ super(MultiplexedInputStream, self).__init__(conn)
+
+ # Arbitrates access to this InputStream (it's used simultaneously
+ # by a Request and its owning Connection object).
+ lock = threading.RLock()
+
+ # Notifies Request thread that there is new data available.
+ self._lock = threading.Condition(lock)
+
+ def _waitForData(self):
+ # Wait for notification from add_data().
+ self._lock.wait()
+
+ def read(self, n=-1):
+ self._lock.acquire()
+ try:
+ return super(MultiplexedInputStream, self).read(n)
+ finally:
+ self._lock.release()
+
+ def readline(self, length=None):
+ self._lock.acquire()
+ try:
+ return super(MultiplexedInputStream, self).readline(length)
+ finally:
+ self._lock.release()
+
+ def add_data(self, data):
+ self._lock.acquire()
+ try:
+ super(MultiplexedInputStream, self).add_data(data)
+ self._lock.notify()
+ finally:
+ self._lock.release()
+
+class OutputStream(object):
+ """
+ FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to
+ write() or writelines() immediately result in Records being sent back
+ to the server. Buffering should be done in a higher level!
+ """
+ def __init__(self, conn, req, type, buffered=False):
+ self._conn = conn
+ self._req = req
+ self._type = type
+ self._buffered = buffered
+ self._bufList = [] # Used if buffered is True
+ self.dataWritten = False
+ self.closed = False
+
+ def _write(self, data):
+ length = len(data)
+ while length:
+ toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN)
+
+ rec = Record(self._type, self._req.requestId)
+ rec.contentLength = toWrite
+ rec.contentData = data[:toWrite]
+ self._conn.writeRecord(rec)
+
+ data = data[toWrite:]
+ length -= toWrite
+
+ def write(self, data):
+ assert not self.closed
+
+ if not data:
+ return
+
+ self.dataWritten = True
+
+ if self._buffered:
+ self._bufList.append(data)
+ else:
+ self._write(data)
+
+ def writelines(self, lines):
+ assert not self.closed
+
+ for line in lines:
+ self.write(line)
+
+ def flush(self):
+ # Only need to flush if this OutputStream is actually buffered.
+ if self._buffered:
+ data = ''.join(self._bufList)
+ self._bufList = []
+ self._write(data)
+
+ # Though available, the following should NOT be called by WSGI apps.
+ def close(self):
+ """Sends end-of-stream notification, if necessary."""
+ if not self.closed and self.dataWritten:
+ self.flush()
+ rec = Record(self._type, self._req.requestId)
+ self._conn.writeRecord(rec)
+ self.closed = True
+
+class TeeOutputStream(object):
+ """
+ Simple wrapper around two or more output file-like objects that copies
+ written data to all streams.
+ """
+ def __init__(self, streamList):
+ self._streamList = streamList
+
+ def write(self, data):
+ for f in self._streamList:
+ f.write(data)
+
+ def writelines(self, lines):
+ for line in lines:
+ self.write(line)
+
+ def flush(self):
+ for f in self._streamList:
+ f.flush()
+
+class StdoutWrapper(object):
+ """
+ Wrapper for sys.stdout so we know if data has actually been written.
+ """
+ def __init__(self, stdout):
+ self._file = stdout
+ self.dataWritten = False
+
+ def write(self, data):
+ if data:
+ self.dataWritten = True
+ self._file.write(data)
+
+ def writelines(self, lines):
+ for line in lines:
+ self.write(line)
+
+ def __getattr__(self, name):
+ return getattr(self._file, name)
+
+def decode_pair(s, pos=0):
+ """
+ Decodes a name/value pair.
+
+ The number of bytes decoded as well as the name/value pair
+ are returned.
+ """
+ nameLength = ord(s[pos])
+ if nameLength & 128:
+ nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
+ pos += 4
+ else:
+ pos += 1
+
+ valueLength = ord(s[pos])
+ if valueLength & 128:
+ valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
+ pos += 4
+ else:
+ pos += 1
+
+ name = s[pos:pos+nameLength]
+ pos += nameLength
+ value = s[pos:pos+valueLength]
+ pos += valueLength
+
+ return (pos, (name, value))
+
+def encode_pair(name, value):
+ """
+ Encodes a name/value pair.
+
+ The encoded string is returned.
+ """
+ nameLength = len(name)
+ if nameLength < 128:
+ s = chr(nameLength)
+ else:
+ s = struct.pack('!L', nameLength | 0x80000000L)
+
+ valueLength = len(value)
+ if valueLength < 128:
+ s += chr(valueLength)
+ else:
+ s += struct.pack('!L', valueLength | 0x80000000L)
+
+ return s + name + value
+
+class Record(object):
+ """
+ A FastCGI Record.
+
+ Used for encoding/decoding records.
+ """
+ def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID):
+ self.version = FCGI_VERSION_1
+ self.type = type
+ self.requestId = requestId
+ self.contentLength = 0
+ self.paddingLength = 0
+ self.contentData = ''
+
+ def _recvall(sock, length):
+ """
+ Attempts to receive length bytes from a socket, blocking if necessary.
+ (Socket may be blocking or non-blocking.)
+ """
+ dataList = []
+ recvLen = 0
+ while length:
+ try:
+ data = sock.recv(length)
+ except socket.error, e:
+ if e[0] == errno.EAGAIN:
+ select.select([sock], [], [])
+ continue
+ else:
+ raise
+ if not data: # EOF
+ break
+ dataList.append(data)
+ dataLen = len(data)
+ recvLen += dataLen
+ length -= dataLen
+ return ''.join(dataList), recvLen
+ _recvall = staticmethod(_recvall)
+
+ def read(self, sock):
+ """Read and decode a Record from a socket."""
+ try:
+ header, length = self._recvall(sock, FCGI_HEADER_LEN)
+ except:
+ raise EOFError
+
+ if length < FCGI_HEADER_LEN:
+ raise EOFError
+
+ self.version, self.type, self.requestId, self.contentLength, \
+ self.paddingLength = struct.unpack(FCGI_Header, header)
+
+ if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, '
+ 'contentLength = %d' %
+ (sock.fileno(), self.type, self.requestId,
+ self.contentLength))
+
+ if self.contentLength:
+ try:
+ self.contentData, length = self._recvall(sock,
+ self.contentLength)
+ except:
+ raise EOFError
+
+ if length < self.contentLength:
+ raise EOFError
+
+ if self.paddingLength:
+ try:
+ self._recvall(sock, self.paddingLength)
+ except:
+ raise EOFError
+
+ def _sendall(sock, data):
+ """
+ Writes data to a socket and does not return until all the data is sent.
+ """
+ length = len(data)
+ while length:
+ try:
+ sent = sock.send(data)
+ except socket.error, e:
+ if e[0] == errno.EPIPE:
+ return # Don't bother raising an exception. Just ignore.
+ elif e[0] == errno.EAGAIN:
+ select.select([], [sock], [])
+ continue
+ else:
+ raise
+ data = data[sent:]
+ length -= sent
+ _sendall = staticmethod(_sendall)
+
+ def write(self, sock):
+ """Encode and write a Record to a socket."""
+ self.paddingLength = -self.contentLength & 7
+
+ if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, '
+ 'contentLength = %d' %
+ (sock.fileno(), self.type, self.requestId,
+ self.contentLength))
+
+ header = struct.pack(FCGI_Header, self.version, self.type,
+ self.requestId, self.contentLength,
+ self.paddingLength)
+ self._sendall(sock, header)
+ if self.contentLength:
+ self._sendall(sock, self.contentData)
+ if self.paddingLength:
+ self._sendall(sock, '\x00'*self.paddingLength)
+
+class Request(object):
+ """
+ Represents a single FastCGI request.
+
+ These objects are passed to your handler and is the main interface
+ between your handler and the fcgi module. The methods should not
+ be called by your handler. However, server, params, stdin, stdout,
+ stderr, and data are free for your handler's use.
+ """
+ def __init__(self, conn, inputStreamClass):
+ self._conn = conn
+
+ self.server = conn.server
+ self.params = {}
+ self.stdin = inputStreamClass(conn)
+ self.stdout = OutputStream(conn, self, FCGI_STDOUT)
+ self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True)
+ self.data = inputStreamClass(conn)
+
+ def run(self):
+ """Runs the handler, flushes the streams, and ends the request."""
+ try:
+ protocolStatus, appStatus = self.server.handler(self)
+ except:
+ traceback.print_exc(file=self.stderr)
+ self.stderr.flush()
+ if not self.stdout.dataWritten:
+ self.server.error(self)
+
+ protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0
+
+ if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' %
+ (protocolStatus, appStatus))
+
+ self._flush()
+ self._end(appStatus, protocolStatus)
+
+ def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
+ self._conn.end_request(self, appStatus, protocolStatus)
+
+ def _flush(self):
+ self.stdout.close()
+ self.stderr.close()
+
+class CGIRequest(Request):
+ """A normal CGI request disguised as a FastCGI request."""
+ def __init__(self, server):
+ # These are normally filled in by Connection.
+ self.requestId = 1
+ self.role = FCGI_RESPONDER
+ self.flags = 0
+ self.aborted = False
+
+ self.server = server
+ self.params = dict(os.environ)
+ self.stdin = sys.stdin
+ self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity!
+ self.stderr = sys.stderr
+ self.data = StringIO.StringIO()
+
+ def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
+ sys.exit(appStatus)
+
+ def _flush(self):
+ # Not buffered, do nothing.
+ pass
+
+class Connection(object):
+ """
+ A Connection with the web server.
+
+ Each Connection is associated with a single socket (which is
+ connected to the web server) and is responsible for handling all
+ the FastCGI message processing for that socket.
+ """
+ _multiplexed = False
+ _inputStreamClass = InputStream
+
+ def __init__(self, sock, addr, server):
+ self._sock = sock
+ self._addr = addr
+ self.server = server
+
+ # Active Requests for this Connection, mapped by request ID.
+ self._requests = {}
+
+ def _cleanupSocket(self):
+ """Close the Connection's socket."""
+ self._sock.close()
+
+ def run(self):
+ """Begin processing data from the socket."""
+ self._keepGoing = True
+ while self._keepGoing:
+ try:
+ self.process_input()
+ except (EOFError, KeyboardInterrupt):
+ break
+ except (select.error, socket.error), e:
+ if e[0] == errno.EBADF: # Socket was closed by Request.
+ break
+ raise
+
+ self._cleanupSocket()
+
+ def process_input(self):
+ """Attempt to read a single Record from the socket and process it."""
+ # Currently, any children Request threads notify this Connection
+ # that it is no longer needed by closing the Connection's socket.
+ # We need to put a timeout on select, otherwise we might get
+ # stuck in it indefinitely... (I don't like this solution.)
+ while self._keepGoing:
+ try:
+ r, w, e = select.select([self._sock], [], [], 1.0)
+ except ValueError:
+ # Sigh. ValueError gets thrown sometimes when passing select
+ # a closed socket.
+ raise EOFError
+ if r: break
+ if not self._keepGoing:
+ return
+ rec = Record()
+ rec.read(self._sock)
+
+ if rec.type == FCGI_GET_VALUES:
+ self._do_get_values(rec)
+ elif rec.type == FCGI_BEGIN_REQUEST:
+ self._do_begin_request(rec)
+ elif rec.type == FCGI_ABORT_REQUEST:
+ self._do_abort_request(rec)
+ elif rec.type == FCGI_PARAMS:
+ self._do_params(rec)
+ elif rec.type == FCGI_STDIN:
+ self._do_stdin(rec)
+ elif rec.type == FCGI_DATA:
+ self._do_data(rec)
+ elif rec.requestId == FCGI_NULL_REQUEST_ID:
+ self._do_unknown_type(rec)
+ else:
+ # Need to complain about this.
+ pass
+
+ def writeRecord(self, rec):
+ """
+ Write a Record to the socket.
+ """
+ rec.write(self._sock)
+
+ def end_request(self, req, appStatus=0L,
+ protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
+ """
+ End a Request.
+
+ Called by Request objects. An FCGI_END_REQUEST Record is
+ sent to the web server. If the web server no longer requires
+ the connection, the socket is closed, thereby ending this
+ Connection (run() returns).
+ """
+ rec = Record(FCGI_END_REQUEST, req.requestId)
+ rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus,
+ protocolStatus)
+ rec.contentLength = FCGI_EndRequestBody_LEN
+ self.writeRecord(rec)
+
+ if remove:
+ del self._requests[req.requestId]
+
+ if __debug__: _debug(2, 'end_request: flags = %d' % req.flags)
+
+ if not (req.flags & FCGI_KEEP_CONN) and not self._requests:
+ self._sock.close()
+ self._keepGoing = False
+
+ def _do_get_values(self, inrec):
+ """Handle an FCGI_GET_VALUES request from the web server."""
+ outrec = Record(FCGI_GET_VALUES_RESULT)
+
+ pos = 0
+ while pos < inrec.contentLength:
+ pos, (name, value) = decode_pair(inrec.contentData, pos)
+ cap = self.server.capability.get(name)
+ if cap is not None:
+ outrec.contentData += encode_pair(name, str(cap))
+
+ outrec.contentLength = len(outrec.contentData)
+ self.writeRecord(rec)
+
+ def _do_begin_request(self, inrec):
+ """Handle an FCGI_BEGIN_REQUEST from the web server."""
+ role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData)
+
+ req = self.server.request_class(self, self._inputStreamClass)
+ req.requestId, req.role, req.flags = inrec.requestId, role, flags
+ req.aborted = False
+
+ if not self._multiplexed and self._requests:
+ # Can't multiplex requests.
+ self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False)
+ else:
+ self._requests[inrec.requestId] = req
+
+ def _do_abort_request(self, inrec):
+ """
+ Handle an FCGI_ABORT_REQUEST from the web server.
+
+ We just mark a flag in the associated Request.
+ """
+ req = self._requests.get(inrec.requestId)
+ if req is not None:
+ req.aborted = True
+
+ def _start_request(self, req):
+ """Run the request."""
+ # Not multiplexed, so run it inline.
+ req.run()
+
+ def _do_params(self, inrec):
+ """
+ Handle an FCGI_PARAMS Record.
+
+ If the last FCGI_PARAMS Record is received, start the request.
+ """
+ req = self._requests.get(inrec.requestId)
+ if req is not None:
+ if inrec.contentLength:
+ pos = 0
+ while pos < inrec.contentLength:
+ pos, (name, value) = decode_pair(inrec.contentData, pos)
+ req.params[name] = value
+ else:
+ self._start_request(req)
+
+ def _do_stdin(self, inrec):
+ """Handle the FCGI_STDIN stream."""
+ req = self._requests.get(inrec.requestId)
+ if req is not None:
+ req.stdin.add_data(inrec.contentData)
+
+ def _do_data(self, inrec):
+ """Handle the FCGI_DATA stream."""
+ req = self._requests.get(inrec.requestId)
+ if req is not None:
+ req.data.add_data(inrec.contentData)
+
+ def _do_unknown_type(self, inrec):
+ """Handle an unknown request type. Respond accordingly."""
+ outrec = Record(FCGI_UNKNOWN_TYPE)
+ outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type)
+ outrec.contentLength = FCGI_UnknownTypeBody_LEN
+ self.writeRecord(rec)
+
+class MultiplexedConnection(Connection):
+ """
+ A version of Connection capable of handling multiple requests
+ simultaneously.
+ """
+ _multiplexed = True
+ _inputStreamClass = MultiplexedInputStream
+
+ def __init__(self, sock, addr, server):
+ super(MultiplexedConnection, self).__init__(sock, addr, server)
+
+ # Used to arbitrate access to self._requests.
+ lock = threading.RLock()
+
+ # Notification is posted everytime a request completes, allowing us
+ # to quit cleanly.
+ self._lock = threading.Condition(lock)
+
+ def _cleanupSocket(self):
+ # Wait for any outstanding requests before closing the socket.
+ self._lock.acquire()
+ while self._requests:
+ self._lock.wait()
+ self._lock.release()
+
+ super(MultiplexedConnection, self)._cleanupSocket()
+
+ def writeRecord(self, rec):
+ # Must use locking to prevent intermingling of Records from different
+ # threads.
+ self._lock.acquire()
+ try:
+ # Probably faster than calling super. ;)
+ rec.write(self._sock)
+ finally:
+ self._lock.release()
+
+ def end_request(self, req, appStatus=0L,
+ protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
+ self._lock.acquire()
+ try:
+ super(MultiplexedConnection, self).end_request(req, appStatus,
+ protocolStatus,
+ remove)
+ self._lock.notify()
+ finally:
+ self._lock.release()
+
+ def _do_begin_request(self, inrec):
+ self._lock.acquire()
+ try:
+ super(MultiplexedConnection, self)._do_begin_request(inrec)
+ finally:
+ self._lock.release()
+
+ def _do_abort_request(self, inrec):
+ self._lock.acquire()
+ try:
+ super(MultiplexedConnection, self)._do_abort_request(inrec)
+ finally:
+ self._lock.release()
+
+ def _start_request(self, req):
+ thread.start_new_thread(req.run, ())
+
+ def _do_params(self, inrec):
+ self._lock.acquire()
+ try:
+ super(MultiplexedConnection, self)._do_params(inrec)
+ finally:
+ self._lock.release()
+
+ def _do_stdin(self, inrec):
+ self._lock.acquire()
+ try:
+ super(MultiplexedConnection, self)._do_stdin(inrec)
+ finally:
+ self._lock.release()
+
+ def _do_data(self, inrec):
+ self._lock.acquire()
+ try:
+ super(MultiplexedConnection, self)._do_data(inrec)
+ finally:
+ self._lock.release()
+
+class BaseFCGIServer(object):
+ request_class = Request
+ cgirequest_class = CGIRequest
+
+ # The maximum number of bytes (per Record) to write to the server.
+ # I've noticed mod_fastcgi has a relatively small receive buffer (8K or
+ # so).
+ maxwrite = 8192
+
+ # Limits the size of the InputStream's string buffer to this size + the
+ # server's maximum Record size. Since the InputStream is not seekable,
+ # we throw away already-read data once this certain amount has been read.
+ inputStreamShrinkThreshold = 102400 - 8192
+
+ def __init__(self, application, environ=None, multithreaded=True,
+ bindAddress=None, multiplexed=False):
+ """
+ bindAddress, if present, must either be a string or a 2-tuple. If
+ present, run() will open its own listening socket. You would use
+ this if you wanted to run your application as an 'external' FastCGI
+ app. (i.e. the webserver would no longer be responsible for starting
+ your app) If a string, it will be interpreted as a filename and a UNIX
+ socket will be opened. If a tuple, the first element, a string,
+ is the interface name/IP to bind to, and the second element (an int)
+ is the port number.
+
+ Set multiplexed to True if you want to handle multiple requests
+ per connection. Some FastCGI backends (namely mod_fastcgi) don't
+ multiplex requests at all, so by default this is off (which saves
+ on thread creation/locking overhead). If threads aren't available,
+ this keyword is ignored; it's not possible to multiplex requests
+ at all.
+ """
+ if environ is None:
+ environ = {}
+
+ self.application = application
+ self.environ = environ
+ self.multithreaded = multithreaded
+
+ self._bindAddress = bindAddress
+
+ # Used to force single-threadedness
+ self._appLock = thread.allocate_lock()
+
+ if thread_available:
+ try:
+ import resource
+ # Attempt to glean the maximum number of connections
+ # from the OS.
+ maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
+ except ImportError:
+ maxConns = 100 # Just some made up number.
+ maxReqs = maxConns
+ if multiplexed:
+ self._connectionClass = MultiplexedConnection
+ maxReqs *= 5 # Another made up number.
+ else:
+ self._connectionClass = Connection
+ self.capability = {
+ FCGI_MAX_CONNS: maxConns,
+ FCGI_MAX_REQS: maxReqs,
+ FCGI_MPXS_CONNS: multiplexed and 1 or 0
+ }
+ else:
+ self._connectionClass = Connection
+ self.capability = {
+ # If threads aren't available, these are pretty much correct.
+ FCGI_MAX_CONNS: 1,
+ FCGI_MAX_REQS: 1,
+ FCGI_MPXS_CONNS: 0
+ }
+
+ def _setupSocket(self):
+ if self._bindAddress is None: # Run as a normal FastCGI?
+ isFCGI = True
+
+ sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET,
+ socket.SOCK_STREAM)
+ try:
+ sock.getpeername()
+ except socket.error, e:
+ if e[0] == errno.ENOTSOCK:
+ # Not a socket, assume CGI context.
+ isFCGI = False
+ elif e[0] != errno.ENOTCONN:
+ raise
+
+ # FastCGI/CGI discrimination is broken on Mac OS X.
+ # Set the environment variable FCGI_FORCE_CGI to "Y" or "y"
+ # if you want to run your app as a simple CGI. (You can do
+ # this with Apache's mod_env [not loaded by default in OS X
+ # client, ha ha] and the SetEnv directive.)
+ if not isFCGI or \
+ os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'):
+ req = self.cgirequest_class(self)
+ req.run()
+ sys.exit(0)
+ else:
+ # Run as a server
+ if type(self._bindAddress) is str:
+ # Unix socket
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ os.unlink(self._bindAddress)
+ except OSError:
+ pass
+ else:
+ # INET socket
+ assert type(self._bindAddress) is tuple
+ assert len(self._bindAddress) == 2
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ sock.bind(self._bindAddress)
+ sock.listen(socket.SOMAXCONN)
+
+ return sock
+
+ def _cleanupSocket(self, sock):
+ """Closes the main socket."""
+ sock.close()
+
+ def handler(self, req):
+ """Special handler for WSGI."""
+ if req.role != FCGI_RESPONDER:
+ return FCGI_UNKNOWN_ROLE, 0
+
+ # Mostly taken from example CGI gateway.
+ environ = req.params
+ environ.update(self.environ)
+
+ environ['wsgi.version'] = (1,0)
+ environ['wsgi.input'] = req.stdin
+ if self._bindAddress is None:
+ stderr = req.stderr
+ else:
+ stderr = TeeOutputStream((sys.stderr, req.stderr))
+ environ['wsgi.errors'] = stderr
+ environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \
+ thread_available and self.multithreaded
+ # Rationale for the following: If started by the web server
+ # (self._bindAddress is None) in either FastCGI or CGI mode, the
+ # possibility of being spawned multiple times simultaneously is quite
+ # real. And, if started as an external server, multiple copies may be
+ # spawned for load-balancing/redundancy. (Though I don't think
+ # mod_fastcgi supports this?)
+ environ['wsgi.multiprocess'] = True
+ environ['wsgi.run_once'] = isinstance(req, CGIRequest)
+
+ if environ.get('HTTPS', 'off') in ('on', '1'):
+ environ['wsgi.url_scheme'] = 'https'
+ else:
+ environ['wsgi.url_scheme'] = 'http'
+
+ self._sanitizeEnv(environ)
+
+ headers_set = []
+ headers_sent = []
+ result = None
+
+ def write(data):
+ assert type(data) is str, 'write() argument must be string'
+ assert headers_set, 'write() before start_response()'
+
+ if not headers_sent:
+ status, responseHeaders = headers_sent[:] = headers_set
+ found = False
+ for header,value in responseHeaders:
+ if header.lower() == 'content-length':
+ found = True
+ break
+ if not found and result is not None:
+ try:
+ if len(result) == 1:
+ responseHeaders.append(('Content-Length',
+ str(len(data))))
+ except:
+ pass
+ s = 'Status: %s\r\n' % status
+ for header in responseHeaders:
+ s += '%s: %s\r\n' % header
+ s += '\r\n'
+ req.stdout.write(s)
+
+ req.stdout.write(data)
+ req.stdout.flush()
+
+ def start_response(status, response_headers, exc_info=None):
+ if exc_info:
+ try:
+ if headers_sent:
+ # Re-raise if too late
+ raise exc_info[0], exc_info[1], exc_info[2]
+ finally:
+ exc_info = None # avoid dangling circular ref
+ else:
+ assert not headers_set, 'Headers already set!'
+
+ assert type(status) is str, 'Status must be a string'
+ assert len(status) >= 4, 'Status must be at least 4 characters'
+ assert int(status[:3]), 'Status must begin with 3-digit code'
+ assert status[3] == ' ', 'Status must have a space after code'
+ assert type(response_headers) is list, 'Headers must be a list'
+ if __debug__:
+ for name,val in response_headers:
+ assert type(name) is str, 'Header names must be strings'
+ assert type(val) is str, 'Header values must be strings'
+
+ headers_set[:] = [status, response_headers]
+ return write
+
+ if not self.multithreaded:
+ self._appLock.acquire()
+ try:
+ result = self.application(environ, start_response)
+ try:
+ for data in result:
+ if data:
+ write(data)
+ if not headers_sent:
+ write('') # in case body was empty
+ finally:
+ if hasattr(result, 'close'):
+ result.close()
+ finally:
+ if not self.multithreaded:
+ self._appLock.release()
+
+ return FCGI_REQUEST_COMPLETE, 0
+
+ def _sanitizeEnv(self, environ):
+ """Ensure certain values are present, if required by WSGI."""
+ if not environ.has_key('SCRIPT_NAME'):
+ environ['SCRIPT_NAME'] = ''
+ if not environ.has_key('PATH_INFO'):
+ environ['PATH_INFO'] = ''
+
+ # If any of these are missing, it probably signifies a broken
+ # server...
+ for name,default in [('REQUEST_METHOD', 'GET'),
+ ('SERVER_NAME', 'localhost'),
+ ('SERVER_PORT', '80'),
+ ('SERVER_PROTOCOL', 'HTTP/1.0')]:
+ if not environ.has_key(name):
+ environ['wsgi.errors'].write('%s: missing FastCGI param %s '
+ 'required by WSGI!\n' %
+ (self.__class__.__name__, name))
+ environ[name] = default
+
+ def error(self, req):
+ """
+ Called by Request if an exception occurs within the handler. May and
+ should be overridden.
+ """
+ import cgitb
+ req.stdout.write('Content-Type: text/html\r\n\r\n' +
+ cgitb.html(sys.exc_info()))
diff --git a/flup/server/fcgi_fork.py b/flup/server/fcgi_fork.py
index 2d3f1b2..ccbba6b 100644
--- a/flup/server/fcgi_fork.py
+++ b/flup/server/fcgi_fork.py
@@ -49,863 +49,26 @@ variable FCGI_FORCE_CGI to "Y" or "y".
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
-import sys
import os
-import signal
-import struct
-import cStringIO as StringIO
-import select
-import socket
-import errno
-import traceback
-import prefork
-__all__ = ['WSGIServer']
-
-# Constants from the spec.
-FCGI_LISTENSOCK_FILENO = 0
-
-FCGI_HEADER_LEN = 8
-
-FCGI_VERSION_1 = 1
-
-FCGI_BEGIN_REQUEST = 1
-FCGI_ABORT_REQUEST = 2
-FCGI_END_REQUEST = 3
-FCGI_PARAMS = 4
-FCGI_STDIN = 5
-FCGI_STDOUT = 6
-FCGI_STDERR = 7
-FCGI_DATA = 8
-FCGI_GET_VALUES = 9
-FCGI_GET_VALUES_RESULT = 10
-FCGI_UNKNOWN_TYPE = 11
-FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
-
-FCGI_NULL_REQUEST_ID = 0
-
-FCGI_KEEP_CONN = 1
-
-FCGI_RESPONDER = 1
-FCGI_AUTHORIZER = 2
-FCGI_FILTER = 3
-
-FCGI_REQUEST_COMPLETE = 0
-FCGI_CANT_MPX_CONN = 1
-FCGI_OVERLOADED = 2
-FCGI_UNKNOWN_ROLE = 3
-
-FCGI_MAX_CONNS = 'FCGI_MAX_CONNS'
-FCGI_MAX_REQS = 'FCGI_MAX_REQS'
-FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS'
-
-FCGI_Header = '!BBHHBx'
-FCGI_BeginRequestBody = '!HB5x'
-FCGI_EndRequestBody = '!LB3x'
-FCGI_UnknownTypeBody = '!B7x'
-
-FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody)
-FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody)
-
-if __debug__:
- import time
-
- # Set non-zero to write debug output to a file.
- DEBUG = 0
- DEBUGLOG = '/tmp/fcgi.log'
-
- def _debug(level, msg):
- if DEBUG < level:
- return
-
- try:
- f = open(DEBUGLOG, 'a')
- f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg))
- f.close()
- except:
- pass
-
-class InputStream(object):
- """
- File-like object representing FastCGI input streams (FCGI_STDIN and
- FCGI_DATA). Supports the minimum methods required by WSGI spec.
- """
- def __init__(self, conn):
- self._conn = conn
-
- # See Server.
- self._shrinkThreshold = conn.server.inputStreamShrinkThreshold
-
- self._buf = ''
- self._bufList = []
- self._pos = 0 # Current read position.
- self._avail = 0 # Number of bytes currently available.
-
- self._eof = False # True when server has sent EOF notification.
-
- def _shrinkBuffer(self):
- """Gets rid of already read data (since we can't rewind)."""
- if self._pos >= self._shrinkThreshold:
- self._buf = self._buf[self._pos:]
- self._avail -= self._pos
- self._pos = 0
-
- assert self._avail >= 0
-
- def _waitForData(self):
- """Waits for more data to become available."""
- self._conn.process_input()
-
- def read(self, n=-1):
- if self._pos == self._avail and self._eof:
- return ''
- while True:
- if n < 0 or (self._avail - self._pos) < n:
- # Not enough data available.
- if self._eof:
- # And there's no more coming.
- newPos = self._avail
- break
- else:
- # Wait for more data.
- self._waitForData()
- continue
- else:
- newPos = self._pos + n
- break
- # Merge buffer list, if necessary.
- if self._bufList:
- self._buf += ''.join(self._bufList)
- self._bufList = []
- r = self._buf[self._pos:newPos]
- self._pos = newPos
- self._shrinkBuffer()
- return r
-
- def readline(self, length=None):
- if self._pos == self._avail and self._eof:
- return ''
- while True:
- # Unfortunately, we need to merge the buffer list early.
- if self._bufList:
- self._buf += ''.join(self._bufList)
- self._bufList = []
- # Find newline.
- i = self._buf.find('\n', self._pos)
- if i < 0:
- # Not found?
- if self._eof:
- # No more data coming.
- newPos = self._avail
- break
- else:
- # Wait for more to come.
- self._waitForData()
- continue
- else:
- newPos = i + 1
- break
- if length is not None:
- if self._pos + length < newPos:
- newPos = self._pos + length
- r = self._buf[self._pos:newPos]
- self._pos = newPos
- self._shrinkBuffer()
- return r
-
- def readlines(self, sizehint=0):
- total = 0
- lines = []
- line = self.readline()
- while line:
- lines.append(line)
- total += len(line)
- if 0 < sizehint <= total:
- break
- line = self.readline()
- return lines
-
- def __iter__(self):
- return self
-
- def next(self):
- r = self.readline()
- if not r:
- raise StopIteration
- return r
-
- def add_data(self, data):
- if not data:
- self._eof = True
- else:
- self._bufList.append(data)
- self._avail += len(data)
-
-class MultiplexedInputStream(InputStream):
- """
- A version of InputStream meant to be used with MultiplexedConnections.
- Assumes the MultiplexedConnection (the producer) and the Request
- (the consumer) are running in different threads.
- """
- def __init__(self, conn):
- super(MultiplexedInputStream, self).__init__(conn)
-
- # Arbitrates access to this InputStream (it's used simultaneously
- # by a Request and its owning Connection object).
- lock = threading.RLock()
-
- # Notifies Request thread that there is new data available.
- self._lock = threading.Condition(lock)
-
- def _waitForData(self):
- # Wait for notification from add_data().
- self._lock.wait()
-
- def read(self, n=-1):
- self._lock.acquire()
- try:
- return super(MultiplexedInputStream, self).read(n)
- finally:
- self._lock.release()
-
- def readline(self, length=None):
- self._lock.acquire()
- try:
- return super(MultiplexedInputStream, self).readline(length)
- finally:
- self._lock.release()
-
- def add_data(self, data):
- self._lock.acquire()
- try:
- super(MultiplexedInputStream, self).add_data(data)
- self._lock.notify()
- finally:
- self._lock.release()
-
-class OutputStream(object):
- """
- FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to
- write() or writelines() immediately result in Records being sent back
- to the server. Buffering should be done in a higher level!
- """
- def __init__(self, conn, req, type, buffered=False):
- self._conn = conn
- self._req = req
- self._type = type
- self._buffered = buffered
- self._bufList = [] # Used if buffered is True
- self.dataWritten = False
- self.closed = False
-
- def _write(self, data):
- length = len(data)
- while length:
- toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN)
-
- rec = Record(self._type, self._req.requestId)
- rec.contentLength = toWrite
- rec.contentData = data[:toWrite]
- self._conn.writeRecord(rec)
-
- data = data[toWrite:]
- length -= toWrite
-
- def write(self, data):
- assert not self.closed
-
- if not data:
- return
-
- self.dataWritten = True
-
- if self._buffered:
- self._bufList.append(data)
- else:
- self._write(data)
-
- def writelines(self, lines):
- assert not self.closed
-
- for line in lines:
- self.write(line)
-
- def flush(self):
- # Only need to flush if this OutputStream is actually buffered.
- if self._buffered:
- data = ''.join(self._bufList)
- self._bufList = []
- self._write(data)
-
- # Though available, the following should NOT be called by WSGI apps.
- def close(self):
- """Sends end-of-stream notification, if necessary."""
- if not self.closed and self.dataWritten:
- self.flush()
- rec = Record(self._type, self._req.requestId)
- self._conn.writeRecord(rec)
- self.closed = True
-
-class TeeOutputStream(object):
- """
- Simple wrapper around two or more output file-like objects that copies
- written data to all streams.
- """
- def __init__(self, streamList):
- self._streamList = streamList
-
- def write(self, data):
- for f in self._streamList:
- f.write(data)
-
- def writelines(self, lines):
- for line in lines:
- self.write(line)
-
- def flush(self):
- for f in self._streamList:
- f.flush()
-
-class StdoutWrapper(object):
- """
- Wrapper for sys.stdout so we know if data has actually been written.
- """
- def __init__(self, stdout):
- self._file = stdout
- self.dataWritten = False
-
- def write(self, data):
- if data:
- self.dataWritten = True
- self._file.write(data)
-
- def writelines(self, lines):
- for line in lines:
- self.write(line)
-
- def __getattr__(self, name):
- return getattr(self._file, name)
-
-def decode_pair(s, pos=0):
- """
- Decodes a name/value pair.
-
- The number of bytes decoded as well as the name/value pair
- are returned.
- """
- nameLength = ord(s[pos])
- if nameLength & 128:
- nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
- pos += 4
- else:
- pos += 1
-
- valueLength = ord(s[pos])
- if valueLength & 128:
- valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
- pos += 4
- else:
- pos += 1
-
- name = s[pos:pos+nameLength]
- pos += nameLength
- value = s[pos:pos+valueLength]
- pos += valueLength
-
- return (pos, (name, value))
-
-def encode_pair(name, value):
- """
- Encodes a name/value pair.
-
- The encoded string is returned.
- """
- nameLength = len(name)
- if nameLength < 128:
- s = chr(nameLength)
- else:
- s = struct.pack('!L', nameLength | 0x80000000L)
-
- valueLength = len(value)
- if valueLength < 128:
- s += chr(valueLength)
- else:
- s += struct.pack('!L', valueLength | 0x80000000L)
-
- return s + name + value
-
-class Record(object):
- """
- A FastCGI Record.
-
- Used for encoding/decoding records.
- """
- def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID):
- self.version = FCGI_VERSION_1
- self.type = type
- self.requestId = requestId
- self.contentLength = 0
- self.paddingLength = 0
- self.contentData = ''
-
- def _recvall(sock, length):
- """
- Attempts to receive length bytes from a socket, blocking if necessary.
- (Socket may be blocking or non-blocking.)
- """
- dataList = []
- recvLen = 0
- while length:
- try:
- data = sock.recv(length)
- except socket.error, e:
- if e[0] == errno.EAGAIN:
- select.select([sock], [], [])
- continue
- else:
- raise
- if not data: # EOF
- break
- dataList.append(data)
- dataLen = len(data)
- recvLen += dataLen
- length -= dataLen
- return ''.join(dataList), recvLen
- _recvall = staticmethod(_recvall)
-
- def read(self, sock):
- """Read and decode a Record from a socket."""
- try:
- header, length = self._recvall(sock, FCGI_HEADER_LEN)
- except:
- raise EOFError
-
- if length < FCGI_HEADER_LEN:
- raise EOFError
-
- self.version, self.type, self.requestId, self.contentLength, \
- self.paddingLength = struct.unpack(FCGI_Header, header)
-
- if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, '
- 'contentLength = %d' %
- (sock.fileno(), self.type, self.requestId,
- self.contentLength))
-
- if self.contentLength:
- try:
- self.contentData, length = self._recvall(sock,
- self.contentLength)
- except:
- raise EOFError
-
- if length < self.contentLength:
- raise EOFError
-
- if self.paddingLength:
- try:
- self._recvall(sock, self.paddingLength)
- except:
- raise EOFError
-
- def _sendall(sock, data):
- """
- Writes data to a socket and does not return until all the data is sent.
- """
- length = len(data)
- while length:
- try:
- sent = sock.send(data)
- except socket.error, e:
- if e[0] == errno.EPIPE:
- return # Don't bother raising an exception. Just ignore.
- elif e[0] == errno.EAGAIN:
- select.select([], [sock], [])
- continue
- else:
- raise
- data = data[sent:]
- length -= sent
- _sendall = staticmethod(_sendall)
-
- def write(self, sock):
- """Encode and write a Record to a socket."""
- self.paddingLength = -self.contentLength & 7
-
- if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, '
- 'contentLength = %d' %
- (sock.fileno(), self.type, self.requestId,
- self.contentLength))
-
- header = struct.pack(FCGI_Header, self.version, self.type,
- self.requestId, self.contentLength,
- self.paddingLength)
- self._sendall(sock, header)
- if self.contentLength:
- self._sendall(sock, self.contentData)
- if self.paddingLength:
- self._sendall(sock, '\x00'*self.paddingLength)
-
-class Request(object):
- """
- Represents a single FastCGI request.
+from fcgi_base import BaseFCGIServer
+from fcgi_base import FCGI_MAX_CONNS, FCGI_MAX_REQS, FCGI_MPXS_CONNS
+from preforkserver import PreforkServer
- These objects are passed to your handler and is the main interface
- between your handler and the fcgi module. The methods should not
- be called by your handler. However, server, params, stdin, stdout,
- stderr, and data are free for your handler's use.
- """
- def __init__(self, conn, inputStreamClass):
- self._conn = conn
-
- self.server = conn.server
- self.params = {}
- self.stdin = inputStreamClass(conn)
- self.stdout = OutputStream(conn, self, FCGI_STDOUT)
- self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True)
- self.data = inputStreamClass(conn)
-
- def run(self):
- """Runs the handler, flushes the streams, and ends the request."""
- try:
- protocolStatus, appStatus = self.server.handler(self)
- except:
- traceback.print_exc(file=self.stderr)
- self.stderr.flush()
- if not self.stdout.dataWritten:
- self.server.error(self)
-
- protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0
-
- if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' %
- (protocolStatus, appStatus))
-
- self._flush()
- self._end(appStatus, protocolStatus)
-
- def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
- self._conn.end_request(self, appStatus, protocolStatus)
-
- def _flush(self):
- self.stdout.close()
- self.stderr.close()
-
-class CGIRequest(Request):
- """A normal CGI request disguised as a FastCGI request."""
- def __init__(self, server):
- # These are normally filled in by Connection.
- self.requestId = 1
- self.role = FCGI_RESPONDER
- self.flags = 0
- self.aborted = False
-
- self.server = server
- self.params = dict(os.environ)
- self.stdin = sys.stdin
- self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity!
- self.stderr = sys.stderr
- self.data = StringIO.StringIO()
-
- def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
- sys.exit(appStatus)
-
- def _flush(self):
- # Not buffered, do nothing.
- pass
-
-class Connection(object):
- """
- A Connection with the web server.
-
- Each Connection is associated with a single socket (which is
- connected to the web server) and is responsible for handling all
- the FastCGI message processing for that socket.
- """
- _multiplexed = False
- _inputStreamClass = InputStream
-
- def __init__(self, sock, addr, server):
- self._sock = sock
- self._addr = addr
- self.server = server
-
- # Active Requests for this Connection, mapped by request ID.
- self._requests = {}
-
- def _cleanupSocket(self):
- """Close the Connection's socket."""
- self._sock.close()
-
- def run(self):
- """Begin processing data from the socket."""
- self._keepGoing = True
- while self._keepGoing:
- try:
- self.process_input()
- except (EOFError, KeyboardInterrupt):
- break
- except (select.error, socket.error), e:
- if e[0] == errno.EBADF: # Socket was closed by Request.
- break
- raise
-
- self._cleanupSocket()
-
- def process_input(self):
- """Attempt to read a single Record from the socket and process it."""
- # Currently, any children Request threads notify this Connection
- # that it is no longer needed by closing the Connection's socket.
- # We need to put a timeout on select, otherwise we might get
- # stuck in it indefinitely... (I don't like this solution.)
- while self._keepGoing:
- try:
- r, w, e = select.select([self._sock], [], [], 1.0)
- except ValueError:
- # Sigh. ValueError gets thrown sometimes when passing select
- # a closed socket.
- raise EOFError
- if r: break
- if not self._keepGoing:
- return
- rec = Record()
- rec.read(self._sock)
-
- if rec.type == FCGI_GET_VALUES:
- self._do_get_values(rec)
- elif rec.type == FCGI_BEGIN_REQUEST:
- self._do_begin_request(rec)
- elif rec.type == FCGI_ABORT_REQUEST:
- self._do_abort_request(rec)
- elif rec.type == FCGI_PARAMS:
- self._do_params(rec)
- elif rec.type == FCGI_STDIN:
- self._do_stdin(rec)
- elif rec.type == FCGI_DATA:
- self._do_data(rec)
- elif rec.requestId == FCGI_NULL_REQUEST_ID:
- self._do_unknown_type(rec)
- else:
- # Need to complain about this.
- pass
-
- def writeRecord(self, rec):
- """
- Write a Record to the socket.
- """
- rec.write(self._sock)
-
- def end_request(self, req, appStatus=0L,
- protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
- """
- End a Request.
-
- Called by Request objects. An FCGI_END_REQUEST Record is
- sent to the web server. If the web server no longer requires
- the connection, the socket is closed, thereby ending this
- Connection (run() returns).
- """
- rec = Record(FCGI_END_REQUEST, req.requestId)
- rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus,
- protocolStatus)
- rec.contentLength = FCGI_EndRequestBody_LEN
- self.writeRecord(rec)
-
- if remove:
- del self._requests[req.requestId]
-
- if __debug__: _debug(2, 'end_request: flags = %d' % req.flags)
-
- if not (req.flags & FCGI_KEEP_CONN) and not self._requests:
- self._sock.close()
- self._keepGoing = False
-
- def _do_get_values(self, inrec):
- """Handle an FCGI_GET_VALUES request from the web server."""
- outrec = Record(FCGI_GET_VALUES_RESULT)
-
- pos = 0
- while pos < inrec.contentLength:
- pos, (name, value) = decode_pair(inrec.contentData, pos)
- cap = self.server.capability.get(name)
- if cap is not None:
- outrec.contentData += encode_pair(name, str(cap))
-
- outrec.contentLength = len(outrec.contentData)
- self.writeRecord(rec)
-
- def _do_begin_request(self, inrec):
- """Handle an FCGI_BEGIN_REQUEST from the web server."""
- role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData)
-
- req = self.server.request_class(self, self._inputStreamClass)
- req.requestId, req.role, req.flags = inrec.requestId, role, flags
- req.aborted = False
-
- if not self._multiplexed and self._requests:
- # Can't multiplex requests.
- self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False)
- else:
- self._requests[inrec.requestId] = req
-
- def _do_abort_request(self, inrec):
- """
- Handle an FCGI_ABORT_REQUEST from the web server.
-
- We just mark a flag in the associated Request.
- """
- req = self._requests.get(inrec.requestId)
- if req is not None:
- req.aborted = True
-
- def _start_request(self, req):
- """Run the request."""
- # Not multiplexed, so run it inline.
- req.run()
-
- def _do_params(self, inrec):
- """
- Handle an FCGI_PARAMS Record.
-
- If the last FCGI_PARAMS Record is received, start the request.
- """
- req = self._requests.get(inrec.requestId)
- if req is not None:
- if inrec.contentLength:
- pos = 0
- while pos < inrec.contentLength:
- pos, (name, value) = decode_pair(inrec.contentData, pos)
- req.params[name] = value
- else:
- self._start_request(req)
-
- def _do_stdin(self, inrec):
- """Handle the FCGI_STDIN stream."""
- req = self._requests.get(inrec.requestId)
- if req is not None:
- req.stdin.add_data(inrec.contentData)
-
- def _do_data(self, inrec):
- """Handle the FCGI_DATA stream."""
- req = self._requests.get(inrec.requestId)
- if req is not None:
- req.data.add_data(inrec.contentData)
-
- def _do_unknown_type(self, inrec):
- """Handle an unknown request type. Respond accordingly."""
- outrec = Record(FCGI_UNKNOWN_TYPE)
- outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type)
- outrec.contentLength = FCGI_UnknownTypeBody_LEN
- self.writeRecord(rec)
-
-class MultiplexedConnection(Connection):
- """
- A version of Connection capable of handling multiple requests
- simultaneously.
- """
- _multiplexed = True
- _inputStreamClass = MultiplexedInputStream
-
- def __init__(self, sock, addr, server):
- super(MultiplexedConnection, self).__init__(sock, addr, server)
-
- # Used to arbitrate access to self._requests.
- lock = threading.RLock()
-
- # Notification is posted everytime a request completes, allowing us
- # to quit cleanly.
- self._lock = threading.Condition(lock)
-
- def _cleanupSocket(self):
- # Wait for any outstanding requests before closing the socket.
- self._lock.acquire()
- while self._requests:
- self._lock.wait()
- self._lock.release()
-
- super(MultiplexedConnection, self)._cleanupSocket()
-
- def writeRecord(self, rec):
- # Must use locking to prevent intermingling of Records from different
- # threads.
- self._lock.acquire()
- try:
- # Probably faster than calling super. ;)
- rec.write(self._sock)
- finally:
- self._lock.release()
-
- def end_request(self, req, appStatus=0L,
- protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
- self._lock.acquire()
- try:
- super(MultiplexedConnection, self).end_request(req, appStatus,
- protocolStatus,
- remove)
- self._lock.notify()
- finally:
- self._lock.release()
-
- def _do_begin_request(self, inrec):
- self._lock.acquire()
- try:
- super(MultiplexedConnection, self)._do_begin_request(inrec)
- finally:
- self._lock.release()
-
- def _do_abort_request(self, inrec):
- self._lock.acquire()
- try:
- super(MultiplexedConnection, self)._do_abort_request(inrec)
- finally:
- self._lock.release()
-
- def _start_request(self, req):
- thread.start_new_thread(req.run, ())
-
- def _do_params(self, inrec):
- self._lock.acquire()
- try:
- super(MultiplexedConnection, self)._do_params(inrec)
- finally:
- self._lock.release()
-
- def _do_stdin(self, inrec):
- self._lock.acquire()
- try:
- super(MultiplexedConnection, self)._do_stdin(inrec)
- finally:
- self._lock.release()
+__all__ = ['WSGIServer']
- def _do_data(self, inrec):
- self._lock.acquire()
- try:
- super(MultiplexedConnection, self)._do_data(inrec)
- finally:
- self._lock.release()
-
-class WSGIServer(prefork.PreforkServer):
+class WSGIServer(BaseFCGIServer, PreforkServer):
"""
FastCGI server that supports the Web Server Gateway Interface. See
<http://www.python.org/peps/pep-0333.html>.
"""
- request_class = Request
- cgirequest_class = CGIRequest
-
- # Limits the size of the InputStream's string buffer to this size + the
- # server's maximum Record size. Since the InputStream is not seekable,
- # we throw away already-read data once this certain amount has been read.
- inputStreamShrinkThreshold = 102400 - 8192
-
def __init__(self, application, environ=None,
- maxwrite=8192, bindAddress=None, **kw):
+ bindAddress=None, multiplexed=False, **kw):
"""
environ, if present, must be a dictionary-like object. Its
contents will be copied into application's environ. Useful
for passing application-specific variables.
- maxwrite is the maximum number of bytes (per Record) to write
- to the server. I've noticed mod_fastcgi has a relatively small
- receive buffer (8K or so).
-
bindAddress, if present, must either be a string or a 2-tuple. If
present, run() will open its own listening socket. You would use
this if you wanted to run your application as an 'external' FastCGI
@@ -915,25 +78,24 @@ class WSGIServer(prefork.PreforkServer):
is the interface name/IP to bind to, and the second element (an int)
is the port number.
"""
- if kw.has_key('jobClass'):
- del kw['jobClass']
- if kw.has_key('jobArgs'):
- del kw['jobArgs']
- super(WSGIServer, self).__init__(jobClass=Connection,
- jobArgs=(self,), **kw)
-
- if environ is None:
- environ = {}
-
- self.application = application
- self.environ = environ
+ BaseFCGIServer.__init__(self, application,
+ environ=environ,
+ multithreaded=False,
+ bindAddress=bindAddress,
+ multiplexed=multiplexed)
+ for key in ('multithreaded', 'jobClass', 'jobArgs'):
+ if kw.has_key(key):
+ del kw[key]
+ PreforkServer.__init__(self, jobClass=self._connectionClass,
+ jobArgs=(self,), **kw)
- self.maxwrite = maxwrite
try:
import resource
# Attempt to glean the maximum number of connections
# from the OS.
- maxConns = resource.getrlimit(resource.RLIMIT_NPROC)[0]
+ maxProcs = resource.getrlimit(resource.RLIMIT_NPROC)[0]
+ maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
+ maxConns = min(maxConns, maxProcs)
except ImportError:
maxConns = 100 # Just some made up number.
maxReqs = maxConns
@@ -942,57 +104,6 @@ class WSGIServer(prefork.PreforkServer):
FCGI_MAX_REQS: maxReqs,
FCGI_MPXS_CONNS: 0
}
- self._bindAddress = bindAddress
-
- def _setupSocket(self):
- if self._bindAddress is None: # Run as a normal FastCGI?
- isFCGI = True
-
- sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET,
- socket.SOCK_STREAM)
- try:
- sock.getpeername()
- except socket.error, e:
- if e[0] == errno.ENOTSOCK:
- # Not a socket, assume CGI context.
- isFCGI = False
- elif e[0] != errno.ENOTCONN:
- raise
-
- # FastCGI/CGI discrimination is broken on Mac OS X.
- # Set the environment variable FCGI_FORCE_CGI to "Y" or "y"
- # if you want to run your app as a simple CGI. (You can do
- # this with Apache's mod_env [not loaded by default in OS X
- # client, ha ha] and the SetEnv directive.)
- if not isFCGI or \
- os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'):
- req = self.cgirequest_class(self)
- req.run()
- sys.exit(0)
- else:
- # Run as a server
- if type(self._bindAddress) is str:
- # Unix socket
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- try:
- os.unlink(self._bindAddress)
- except OSError:
- pass
- else:
- # INET socket
- assert type(self._bindAddress) is tuple
- assert len(self._bindAddress) == 2
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- sock.bind(self._bindAddress)
- sock.listen(socket.SOMAXCONN)
-
- return sock
-
- def _cleanupSocket(self, sock):
- """Closes the main socket."""
- sock.close()
def _isClientAllowed(self, addr):
return self._web_server_addrs is None or \
@@ -1010,135 +121,12 @@ class WSGIServer(prefork.PreforkServer):
sock = self._setupSocket()
- ret = super(WSGIServer, self).run(sock)
+ ret = PreforkServer.run(self, sock)
self._cleanupSocket(sock)
return ret
- def handler(self, req):
- """Special handler for WSGI."""
- if req.role != FCGI_RESPONDER:
- return FCGI_UNKNOWN_ROLE, 0
-
- # Mostly taken from example CGI gateway.
- environ = req.params
- environ.update(self.environ)
-
- environ['wsgi.version'] = (1,0)
- environ['wsgi.input'] = req.stdin
- if self._bindAddress is None:
- stderr = req.stderr
- else:
- stderr = TeeOutputStream((sys.stderr, req.stderr))
- environ['wsgi.errors'] = stderr
- environ['wsgi.multithread'] = False
- environ['wsgi.multiprocess'] = True
- environ['wsgi.run_once'] = isinstance(req, CGIRequest)
-
- if environ.get('HTTPS', 'off') in ('on', '1'):
- environ['wsgi.url_scheme'] = 'https'
- else:
- environ['wsgi.url_scheme'] = 'http'
-
- self._sanitizeEnv(environ)
-
- headers_set = []
- headers_sent = []
- result = None
-
- def write(data):
- assert type(data) is str, 'write() argument must be string'
- assert headers_set, 'write() before start_response()'
-
- if not headers_sent:
- status, responseHeaders = headers_sent[:] = headers_set
- found = False
- for header,value in responseHeaders:
- if header.lower() == 'content-length':
- found = True
- break
- if not found and result is not None:
- try:
- if len(result) == 1:
- responseHeaders.append(('Content-Length',
- str(len(data))))
- except:
- pass
- s = 'Status: %s\r\n' % status
- for header in responseHeaders:
- s += '%s: %s\r\n' % header
- s += '\r\n'
- req.stdout.write(s)
-
- req.stdout.write(data)
- req.stdout.flush()
-
- def start_response(status, response_headers, exc_info=None):
- if exc_info:
- try:
- if headers_sent:
- # Re-raise if too late
- raise exc_info[0], exc_info[1], exc_info[2]
- finally:
- exc_info = None # avoid dangling circular ref
- else:
- assert not headers_set, 'Headers already set!'
-
- assert type(status) is str, 'Status must be a string'
- assert len(status) >= 4, 'Status must be at least 4 characters'
- assert int(status[:3]), 'Status must begin with 3-digit code'
- assert status[3] == ' ', 'Status must have a space after code'
- assert type(response_headers) is list, 'Headers must be a list'
- if __debug__:
- for name,val in response_headers:
- assert type(name) is str, 'Header names must be strings'
- assert type(val) is str, 'Header values must be strings'
-
- headers_set[:] = [status, response_headers]
- return write
-
- result = self.application(environ, start_response)
- try:
- for data in result:
- if data:
- write(data)
- if not headers_sent:
- write('') # in case body was empty
- finally:
- if hasattr(result, 'close'):
- result.close()
-
- return FCGI_REQUEST_COMPLETE, 0
-
- def _sanitizeEnv(self, environ):
- """Ensure certain values are present, if required by WSGI."""
- if not environ.has_key('SCRIPT_NAME'):
- environ['SCRIPT_NAME'] = ''
- if not environ.has_key('PATH_INFO'):
- environ['PATH_INFO'] = ''
-
- # If any of these are missing, it probably signifies a broken
- # server...
- for name,default in [('REQUEST_METHOD', 'GET'),
- ('SERVER_NAME', 'localhost'),
- ('SERVER_PORT', '80'),
- ('SERVER_PROTOCOL', 'HTTP/1.0')]:
- if not environ.has_key(name):
- environ['wsgi.errors'].write('%s: missing FastCGI param %s '
- 'required by WSGI!\n' %
- (self.__class__.__name__, name))
- environ[name] = default
-
- def error(self, req):
- """
- Called by Request if an exception occurs within the handler. May and
- should be overridden.
- """
- import cgitb
- req.stdout.write('Content-Type: text/html\r\n\r\n' +
- cgitb.html(sys.exc_info()))
-
if __name__ == '__main__':
def test_app(environ, start_response):
"""Probably not the most efficient example."""
diff --git a/flup/server/prefork.py b/flup/server/preforkserver.py
index 191a651..9e16527 100644
--- a/flup/server/prefork.py
+++ b/flup/server/preforkserver.py
@@ -333,11 +333,10 @@ class PreforkServer(object):
def _installSignalHandlers(self):
"""Installs signal handlers."""
self._oldSIGs = [(x,signal.getsignal(x)) for x in
- (signal.SIGHUP, signal.SIGINT, signal.SIGQUIT,
- signal.SIGTERM, signal.SIGCHLD)]
+ (signal.SIGHUP, signal.SIGINT, signal.SIGTERM,
+ signal.SIGCHLD)]
signal.signal(signal.SIGHUP, self._hupHandler)
signal.signal(signal.SIGINT, self._intHandler)
- signal.signal(signal.SIGQUIT, self._intHandler)
signal.signal(signal.SIGTERM, self._intHandler)
def _restoreSignalHandlers(self):
diff --git a/flup/server/scgi.py b/flup/server/scgi.py
index c4bb20a..f8d0ed5 100644
--- a/flup/server/scgi.py
+++ b/flup/server/scgi.py
@@ -63,327 +63,15 @@ Example wrapper script:
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
-import sys
import logging
import socket
-import select
-import errno
-import cStringIO as StringIO
-import signal
-import datetime
-# Threads are required. If you want a non-threaded (forking) version, look at
-# SWAP <http://www.idyll.org/~t/www-tools/wsgi/>.
-import thread
-import threading
+from scgi_base import BaseSCGIServer, Connection
+from threadedserver import ThreadedServer
__all__ = ['WSGIServer']
-# The main classes use this name for logging.
-LoggerName = 'scgi-wsgi'
-
-# Set up module-level logger.
-console = logging.StreamHandler()
-console.setLevel(logging.DEBUG)
-console.setFormatter(logging.Formatter('%(asctime)s : %(message)s',
- '%Y-%m-%d %H:%M:%S'))
-logging.getLogger(LoggerName).addHandler(console)
-del console
-
-class ProtocolError(Exception):
- """
- Exception raised when the server does something unexpected or
- sends garbled data. Usually leads to a Connection closing.
- """
- pass
-
-def recvall(sock, length):
- """
- Attempts to receive length bytes from a socket, blocking if necessary.
- (Socket may be blocking or non-blocking.)
- """
- dataList = []
- recvLen = 0
- while length:
- try:
- data = sock.recv(length)
- except socket.error, e:
- if e[0] == errno.EAGAIN:
- select.select([sock], [], [])
- continue
- else:
- raise
- if not data: # EOF
- break
- dataList.append(data)
- dataLen = len(data)
- recvLen += dataLen
- length -= dataLen
- return ''.join(dataList), recvLen
-
-def readNetstring(sock):
- """
- Attempt to read a netstring from a socket.
- """
- # First attempt to read the length.
- size = ''
- while True:
- try:
- c = sock.recv(1)
- except socket.error, e:
- if e[0] == errno.EAGAIN:
- select.select([sock], [], [])
- continue
- else:
- raise
- if c == ':':
- break
- if not c:
- raise EOFError
- size += c
-
- # Try to decode the length.
- try:
- size = int(size)
- if size < 0:
- raise ValueError
- except ValueError:
- raise ProtocolError, 'invalid netstring length'
-
- # Now read the string.
- s, length = recvall(sock, size)
-
- if length < size:
- raise EOFError
-
- # Lastly, the trailer.
- trailer, length = recvall(sock, 1)
-
- if length < 1:
- raise EOFError
-
- if trailer != ',':
- raise ProtocolError, 'invalid netstring trailer'
-
- return s
-
-class StdoutWrapper(object):
- """
- Wrapper for sys.stdout so we know if data has actually been written.
- """
- def __init__(self, stdout):
- self._file = stdout
- self.dataWritten = False
-
- def write(self, data):
- if data:
- self.dataWritten = True
- self._file.write(data)
-
- def writelines(self, lines):
- for line in lines:
- self.write(line)
-
- def __getattr__(self, name):
- return getattr(self._file, name)
-
-class Request(object):
- """
- Encapsulates data related to a single request.
-
- Public attributes:
- environ - Environment variables from web server.
- stdin - File-like object representing the request body.
- stdout - File-like object for writing the response.
- """
- def __init__(self, conn, environ, input, output):
- self._conn = conn
- self.environ = environ
- self.stdin = input
- self.stdout = StdoutWrapper(output)
-
- self.logger = logging.getLogger(LoggerName)
-
- def run(self):
- self.logger.info('%s %s%s',
- self.environ['REQUEST_METHOD'],
- self.environ.get('SCRIPT_NAME', ''),
- self.environ.get('PATH_INFO', ''))
-
- start = datetime.datetime.now()
-
- try:
- self._conn.server.handler(self)
- except:
- self.logger.exception('Exception caught from handler')
- if not self.stdout.dataWritten:
- self._conn.server.error(self)
-
- end = datetime.datetime.now()
-
- handlerTime = end - start
- self.logger.debug('%s %s%s done (%.3f secs)',
- self.environ['REQUEST_METHOD'],
- self.environ.get('SCRIPT_NAME', ''),
- self.environ.get('PATH_INFO', ''),
- handlerTime.seconds +
- handlerTime.microseconds / 1000000.0)
-
-class Connection(object):
- """
- Represents a single client (web server) connection. A single request
- is handled, after which the socket is closed.
- """
- def __init__(self, sock, addr, server):
- self._sock = sock
- self._addr = addr
- self.server = server
-
- self.logger = logging.getLogger(LoggerName)
-
- def run(self):
- self.logger.debug('Connection starting up (%s:%d)',
- self._addr[0], self._addr[1])
-
- try:
- self.processInput()
- except EOFError:
- pass
- except ProtocolError, e:
- self.logger.error("Protocol error '%s'", str(e))
- except:
- self.logger.exception('Exception caught in Connection')
-
- self.logger.debug('Connection shutting down (%s:%d)',
- self._addr[0], self._addr[1])
-
- # All done!
- self._sock.close()
-
- def processInput(self):
- # Read headers
- headers = readNetstring(self._sock)
- headers = headers.split('\x00')[:-1]
- if len(headers) % 2 != 0:
- raise ProtocolError, 'invalid headers'
- environ = {}
- for i in range(len(headers) / 2):
- environ[headers[2*i]] = headers[2*i+1]
-
- clen = environ.get('CONTENT_LENGTH')
- if clen is None:
- raise ProtocolError, 'missing CONTENT_LENGTH'
- try:
- clen = int(clen)
- if clen < 0:
- raise ValueError
- except ValueError:
- raise ProtocolError, 'invalid CONTENT_LENGTH'
-
- self._sock.setblocking(1)
- if clen:
- input = self._sock.makefile('r')
- else:
- # Empty input.
- input = StringIO.StringIO()
-
- # stdout
- output = self._sock.makefile('w')
-
- # Allocate Request
- req = Request(self, environ, input, output)
-
- # Run it.
- req.run()
-
- output.close()
- input.close()
-
-class ThreadPool(object):
- """
- Thread pool that maintains the number of idle threads between
- minSpare and maxSpare inclusive. By default, there is no limit on
- the number of threads that can be started, but this can be controlled
- by maxThreads.
- """
- def __init__(self, minSpare=1, maxSpare=5, maxThreads=sys.maxint):
- self._minSpare = minSpare
- self._maxSpare = maxSpare
- self._maxThreads = max(minSpare, maxThreads)
-
- self._lock = threading.Condition()
- self._workQueue = []
- self._idleCount = self._workerCount = maxSpare
-
- # Start the minimum number of worker threads.
- for i in range(maxSpare):
- thread.start_new_thread(self._worker, ())
-
- def addJob(self, job, allowQueuing=True):
- """
- Adds a job to the work queue. The job object should have a run()
- method. If allowQueuing is True (the default), the job will be
- added to the work queue regardless if there are any idle threads
- ready. (The only way for there to be no idle threads is if maxThreads
- is some reasonable, finite limit.)
-
- Otherwise, if allowQueuing is False, and there are no more idle
- threads, the job will not be queued.
-
- Returns True if the job was queued, False otherwise.
- """
- self._lock.acquire()
- try:
- # Maintain minimum number of spares.
- while self._idleCount < self._minSpare and \
- self._workerCount < self._maxThreads:
- self._workerCount += 1
- self._idleCount += 1
- thread.start_new_thread(self._worker, ())
-
- # Hand off the job.
- if self._idleCount or allowQueuing:
- self._workQueue.append(job)
- self._lock.notify()
- return True
- else:
- return False
- finally:
- self._lock.release()
-
- def _worker(self):
- """
- Worker thread routine. Waits for a job, executes it, repeat.
- """
- self._lock.acquire()
- while True:
- while not self._workQueue:
- self._lock.wait()
-
- # We have a job to do...
- job = self._workQueue.pop(0)
-
- assert self._idleCount > 0
- self._idleCount -= 1
-
- self._lock.release()
-
- job.run()
-
- self._lock.acquire()
-
- if self._idleCount == self._maxSpare:
- break # NB: lock still held
- self._idleCount += 1
- assert self._idleCount <= self._maxSpare
-
- # Die off...
- assert self._workerCount > self._maxSpare
- self._workerCount -= 1
-
- self._lock.release()
-
-class WSGIServer(object):
+class WSGIServer(BaseSCGIServer, ThreadedServer):
"""
SCGI/WSGI server. For information about SCGI (Simple Common Gateway
Interface), see <http://www.mems-exchange.org/software/scgi/>.
@@ -399,9 +87,6 @@ class WSGIServer(object):
of preforking to be quite superior. So if your application really doesn't
mind running in multiple processes, go use SWAP. ;)
"""
- # What Request class to use.
- requestClass = Request
-
def __init__(self, application, environ=None,
multithreaded=True,
bindAddress=('localhost', 4000), allowedServers=None,
@@ -410,8 +95,6 @@ class WSGIServer(object):
environ, which must be a dictionary, can contain any additional
environment variables you want to pass to your application.
- Set multithreaded to False if your application is not thread-safe.
-
bindAddress is the address to bind to, which must be a tuple of
length 2. The first element is a string, which is the host name
or IPv4 address of a local interface. The 2nd element is the port
@@ -422,66 +105,24 @@ class WSGIServer(object):
connections from anywhere.
loggingLevel sets the logging level of the module-level logger.
-
- Any additional keyword arguments are passed to the underlying
- ThreadPool.
"""
- if environ is None:
- environ = {}
-
- self.application = application
- self.environ = environ
- self.multithreaded = multithreaded
- self._bindAddress = bindAddress
- self._allowedServers = allowedServers
-
- # Used to force single-threadedness.
- self._appLock = thread.allocate_lock()
-
- self._threadPool = ThreadPool(**kw)
-
- self.logger = logging.getLogger(LoggerName)
- self.logger.setLevel(loggingLevel)
-
- def _setupSocket(self):
- """Creates and binds the socket for communication with the server."""
- sock = socket.socket()
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(self._bindAddress)
- sock.listen(socket.SOMAXCONN)
- return sock
-
- def _cleanupSocket(self, sock):
- """Closes the main socket."""
- sock.close()
-
- def _isServerAllowed(self, addr):
- return self._allowedServers is None or \
- addr[0] in self._allowedServers
-
- def _installSignalHandlers(self):
- self._oldSIGs = [(x,signal.getsignal(x)) for x in
- (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)]
- signal.signal(signal.SIGHUP, self._hupHandler)
- signal.signal(signal.SIGINT, self._intHandler)
- signal.signal(signal.SIGTERM, self._intHandler)
-
- def _restoreSignalHandlers(self):
- for signum,handler in self._oldSIGs:
- signal.signal(signum, handler)
-
- def _hupHandler(self, signum, frame):
- self._hupReceived = True
- self._keepGoing = False
-
- def _intHandler(self, signum, frame):
- self._keepGoing = False
+ BaseSCGIServer.__init__(self, application,
+ environ=environ,
+ multithreaded=multithreaded,
+ bindAddress=bindAddress,
+ allowedServers=allowedServers,
+ loggingLevel=loggingLevel)
+ for key in ('jobClass', 'jobArgs'):
+ if kw.has_key(key):
+ del kw[key]
+ ThreadedServer.__init__(self, jobClass=Connection, jobArgs=(self,),
+ **kw)
- def run(self, timeout=1.0):
+ def run(self):
"""
Main loop. Call this after instantiating WSGIServer. SIGHUP, SIGINT,
- SIGTERM cause it to cleanup and return. (If a SIGHUP is caught, this
- method returns True. Returns False otherwise.)
+ SIGQUIT, SIGTERM cause it to cleanup and return. (If a SIGHUP
+ is caught, this method returns True. Returns False otherwise.)
"""
self.logger.info('%s starting up', self.__class__.__name__)
@@ -491,182 +132,14 @@ class WSGIServer(object):
self.logger.error('Failed to bind socket (%s), exiting', e[1])
return False
- self._keepGoing = True
- self._hupReceived = False
-
- # Install signal handlers.
- self._installSignalHandlers()
-
- while self._keepGoing:
- try:
- r, w, e = select.select([sock], [], [], timeout)
- except select.error, e:
- if e[0] == errno.EINTR:
- continue
- raise
-
- if r:
- try:
- clientSock, addr = sock.accept()
- except socket.error, e:
- if e[0] in (errno.EINTR, errno.EAGAIN):
- continue
- raise
-
- if not self._isServerAllowed(addr):
- self.logger.warning('Server connection from %s disallowed',
- addr[0])
- clientSock.close()
- continue
-
- # Hand off to Connection.
- conn = Connection(clientSock, addr, self)
- if not self._threadPool.addJob(conn, allowQueuing=False):
- # No thread left, immediately close the socket to hopefully
- # indicate to the web server that we're at our limit...
- # and to prevent having too many opened (and useless)
- # files.
- clientSock.close()
-
- self._mainloopPeriodic()
-
- # Restore old signal handlers.
- self._restoreSignalHandlers()
+ ret = ThreadedServer.run(self, sock)
self._cleanupSocket(sock)
self.logger.info('%s shutting down%s', self.__class__.__name__,
self._hupReceived and ' (reload requested)' or '')
- return self._hupReceived
-
- def _mainloopPeriodic(self):
- """
- Called with just about each iteration of the main loop. Meant to
- be overridden.
- """
- pass
-
- def _exit(self, reload=False):
- """
- Protected convenience method for subclasses to force an exit. Not
- really thread-safe, which is why it isn't public.
- """
- if self._keepGoing:
- self._keepGoing = False
- self._hupReceived = reload
-
- def handler(self, request):
- """
- WSGI handler. Sets up WSGI environment, calls the application,
- and sends the application's response.
- """
- environ = request.environ
- environ.update(self.environ)
-
- environ['wsgi.version'] = (1,0)
- environ['wsgi.input'] = request.stdin
- environ['wsgi.errors'] = sys.stderr
- environ['wsgi.multithread'] = self.multithreaded
- # AFAIK, the current mod_scgi does not do load-balancing/fail-over.
- # So a single application deployment will only run in one process
- # at a time, on this server.
- environ['wsgi.multiprocess'] = False
- environ['wsgi.run_once'] = False
-
- if environ.get('HTTPS', 'off') in ('on', '1'):
- environ['wsgi.url_scheme'] = 'https'
- else:
- environ['wsgi.url_scheme'] = 'http'
-
- headers_set = []
- headers_sent = []
- result = None
-
- def write(data):
- assert type(data) is str, 'write() argument must be string'
- assert headers_set, 'write() before start_response()'
-
- if not headers_sent:
- status, responseHeaders = headers_sent[:] = headers_set
- found = False
- for header,value in responseHeaders:
- if header.lower() == 'content-length':
- found = True
- break
- if not found and result is not None:
- try:
- if len(result) == 1:
- responseHeaders.append(('Content-Length',
- str(len(data))))
- except:
- pass
- s = 'Status: %s\r\n' % status
- for header in responseHeaders:
- s += '%s: %s\r\n' % header
- s += '\r\n'
- try:
- request.stdout.write(s)
- except socket.error, e:
- if e[0] != errno.EPIPE:
- raise
-
- try:
- request.stdout.write(data)
- request.stdout.flush()
- except socket.error, e:
- if e[0] != errno.EPIPE:
- raise
-
- def start_response(status, response_headers, exc_info=None):
- if exc_info:
- try:
- if headers_sent:
- # Re-raise if too late
- raise exc_info[0], exc_info[1], exc_info[2]
- finally:
- exc_info = None # avoid dangling circular ref
- else:
- assert not headers_set, 'Headers already set!'
-
- assert type(status) is str, 'Status must be a string'
- assert len(status) >= 4, 'Status must be at least 4 characters'
- assert int(status[:3]), 'Status must begin with 3-digit code'
- assert status[3] == ' ', 'Status must have a space after code'
- assert type(response_headers) is list, 'Headers must be a list'
- if __debug__:
- for name,val in response_headers:
- assert type(name) is str, 'Header names must be strings'
- assert type(val) is str, 'Header values must be strings'
-
- headers_set[:] = [status, response_headers]
- return write
-
- if not self.multithreaded:
- self._appLock.acquire()
- try:
- result = self.application(environ, start_response)
- try:
- for data in result:
- if data:
- write(data)
- if not headers_sent:
- write('') # in case body was empty
- finally:
- if hasattr(result, 'close'):
- result.close()
- finally:
- if not self.multithreaded:
- self._appLock.release()
-
- def error(self, request):
- """
- Override to provide custom error handling. Ideally, however,
- all errors should be caught at the application level.
- """
- import cgitb
- request.stdout.write('Content-Type: text/html\r\n\r\n' +
- cgitb.html(sys.exc_info()))
+ return ret
if __name__ == '__main__':
def test_app(environ, start_response):
diff --git a/flup/server/scgi_base.py b/flup/server/scgi_base.py
new file mode 100644
index 0000000..52648e9
--- /dev/null
+++ b/flup/server/scgi_base.py
@@ -0,0 +1,435 @@
+# Copyright (c) 2005 Allan Saddi <allan@saddi.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+#
+# $Id$
+
+__author__ = 'Allan Saddi <allan@saddi.com>'
+__version__ = '$Revision$'
+
+import sys
+import logging
+import socket
+import select
+import errno
+import cStringIO as StringIO
+import signal
+import datetime
+
+# Threads are required. If you want a non-threaded (forking) version, look at
+# SWAP <http://www.idyll.org/~t/www-tools/wsgi/>.
+import thread
+import threading
+
+__all__ = ['BaseSCGIServer']
+
+# The main classes use this name for logging.
+LoggerName = 'scgi-wsgi'
+
+# Set up module-level logger.
+console = logging.StreamHandler()
+console.setLevel(logging.DEBUG)
+console.setFormatter(logging.Formatter('%(asctime)s : %(message)s',
+ '%Y-%m-%d %H:%M:%S'))
+logging.getLogger(LoggerName).addHandler(console)
+del console
+
+class ProtocolError(Exception):
+ """
+ Exception raised when the server does something unexpected or
+ sends garbled data. Usually leads to a Connection closing.
+ """
+ pass
+
+def recvall(sock, length):
+ """
+ Attempts to receive length bytes from a socket, blocking if necessary.
+ (Socket may be blocking or non-blocking.)
+ """
+ dataList = []
+ recvLen = 0
+ while length:
+ try:
+ data = sock.recv(length)
+ except socket.error, e:
+ if e[0] == errno.EAGAIN:
+ select.select([sock], [], [])
+ continue
+ else:
+ raise
+ if not data: # EOF
+ break
+ dataList.append(data)
+ dataLen = len(data)
+ recvLen += dataLen
+ length -= dataLen
+ return ''.join(dataList), recvLen
+
+def readNetstring(sock):
+ """
+ Attempt to read a netstring from a socket.
+ """
+ # First attempt to read the length.
+ size = ''
+ while True:
+ try:
+ c = sock.recv(1)
+ except socket.error, e:
+ if e[0] == errno.EAGAIN:
+ select.select([sock], [], [])
+ continue
+ else:
+ raise
+ if c == ':':
+ break
+ if not c:
+ raise EOFError
+ size += c
+
+ # Try to decode the length.
+ try:
+ size = int(size)
+ if size < 0:
+ raise ValueError
+ except ValueError:
+ raise ProtocolError, 'invalid netstring length'
+
+ # Now read the string.
+ s, length = recvall(sock, size)
+
+ if length < size:
+ raise EOFError
+
+ # Lastly, the trailer.
+ trailer, length = recvall(sock, 1)
+
+ if length < 1:
+ raise EOFError
+
+ if trailer != ',':
+ raise ProtocolError, 'invalid netstring trailer'
+
+ return s
+
+class StdoutWrapper(object):
+ """
+ Wrapper for sys.stdout so we know if data has actually been written.
+ """
+ def __init__(self, stdout):
+ self._file = stdout
+ self.dataWritten = False
+
+ def write(self, data):
+ if data:
+ self.dataWritten = True
+ self._file.write(data)
+
+ def writelines(self, lines):
+ for line in lines:
+ self.write(line)
+
+ def __getattr__(self, name):
+ return getattr(self._file, name)
+
+class Request(object):
+ """
+ Encapsulates data related to a single request.
+
+ Public attributes:
+ environ - Environment variables from web server.
+ stdin - File-like object representing the request body.
+ stdout - File-like object for writing the response.
+ """
+ def __init__(self, conn, environ, input, output):
+ self._conn = conn
+ self.environ = environ
+ self.stdin = input
+ self.stdout = StdoutWrapper(output)
+
+ self.logger = logging.getLogger(LoggerName)
+
+ def run(self):
+ self.logger.info('%s %s%s',
+ self.environ['REQUEST_METHOD'],
+ self.environ.get('SCRIPT_NAME', ''),
+ self.environ.get('PATH_INFO', ''))
+
+ start = datetime.datetime.now()
+
+ try:
+ self._conn.server.handler(self)
+ except:
+ self.logger.exception('Exception caught from handler')
+ if not self.stdout.dataWritten:
+ self._conn.server.error(self)
+
+ end = datetime.datetime.now()
+
+ handlerTime = end - start
+ self.logger.debug('%s %s%s done (%.3f secs)',
+ self.environ['REQUEST_METHOD'],
+ self.environ.get('SCRIPT_NAME', ''),
+ self.environ.get('PATH_INFO', ''),
+ handlerTime.seconds +
+ handlerTime.microseconds / 1000000.0)
+
+class Connection(object):
+ """
+ Represents a single client (web server) connection. A single request
+ is handled, after which the socket is closed.
+ """
+ def __init__(self, sock, addr, server):
+ self._sock = sock
+ self._addr = addr
+ self.server = server
+
+ self.logger = logging.getLogger(LoggerName)
+
+ def run(self):
+ self.logger.debug('Connection starting up (%s:%d)',
+ self._addr[0], self._addr[1])
+
+ try:
+ self.processInput()
+ except (EOFError, KeyboardInterrupt):
+ pass
+ except ProtocolError, e:
+ self.logger.error("Protocol error '%s'", str(e))
+ except:
+ self.logger.exception('Exception caught in Connection')
+
+ self.logger.debug('Connection shutting down (%s:%d)',
+ self._addr[0], self._addr[1])
+
+ # All done!
+ self._sock.close()
+
+ def processInput(self):
+ # Read headers
+ headers = readNetstring(self._sock)
+ headers = headers.split('\x00')[:-1]
+ if len(headers) % 2 != 0:
+ raise ProtocolError, 'invalid headers'
+ environ = {}
+ for i in range(len(headers) / 2):
+ environ[headers[2*i]] = headers[2*i+1]
+
+ clen = environ.get('CONTENT_LENGTH')
+ if clen is None:
+ raise ProtocolError, 'missing CONTENT_LENGTH'
+ try:
+ clen = int(clen)
+ if clen < 0:
+ raise ValueError
+ except ValueError:
+ raise ProtocolError, 'invalid CONTENT_LENGTH'
+
+ self._sock.setblocking(1)
+ if clen:
+ input = self._sock.makefile('r')
+ else:
+ # Empty input.
+ input = StringIO.StringIO()
+
+ # stdout
+ output = self._sock.makefile('w')
+
+ # Allocate Request
+ req = Request(self, environ, input, output)
+
+ # Run it.
+ req.run()
+
+ output.close()
+ input.close()
+
+class BaseSCGIServer(object):
+ # What Request class to use.
+ requestClass = Request
+
+ def __init__(self, application, environ=None,
+ multithreaded=True,
+ bindAddress=('localhost', 4000), allowedServers=None,
+ loggingLevel=logging.INFO):
+ """
+ environ, which must be a dictionary, can contain any additional
+ environment variables you want to pass to your application.
+
+ Set multithreaded to False if your application is not thread-safe.
+
+ bindAddress is the address to bind to, which must be a tuple of
+ length 2. The first element is a string, which is the host name
+ or IPv4 address of a local interface. The 2nd element is the port
+ number.
+
+ allowedServers must be None or a list of strings representing the
+ IPv4 addresses of servers allowed to connect. None means accept
+ connections from anywhere.
+
+ loggingLevel sets the logging level of the module-level logger.
+ """
+ if environ is None:
+ environ = {}
+
+ self.application = application
+ self.environ = environ
+ self.multithreaded = multithreaded
+ self._bindAddress = bindAddress
+ self._allowedServers = allowedServers
+
+ # Used to force single-threadedness.
+ self._appLock = thread.allocate_lock()
+
+ self.logger = logging.getLogger(LoggerName)
+ self.logger.setLevel(loggingLevel)
+
+ def _setupSocket(self):
+ """Creates and binds the socket for communication with the server."""
+ sock = socket.socket()
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(self._bindAddress)
+ sock.listen(socket.SOMAXCONN)
+ return sock
+
+ def _cleanupSocket(self, sock):
+ """Closes the main socket."""
+ sock.close()
+
+ def _isClientAllowed(self, addr):
+ ret = self._allowedServers is None or addr[0] in self._allowedServers
+ if not ret:
+ self.logger.warning('Server connection from %s disallowed',
+ addr[0])
+ return ret
+
+ def handler(self, request):
+ """
+ WSGI handler. Sets up WSGI environment, calls the application,
+ and sends the application's response.
+ """
+ environ = request.environ
+ environ.update(self.environ)
+
+ environ['wsgi.version'] = (1,0)
+ environ['wsgi.input'] = request.stdin
+ environ['wsgi.errors'] = sys.stderr
+ environ['wsgi.multithread'] = self.multithreaded
+ # AFAIK, the current mod_scgi does not do load-balancing/fail-over.
+ # So a single application deployment will only run in one process
+ # at a time, on this server.
+ environ['wsgi.multiprocess'] = False
+ environ['wsgi.run_once'] = False
+
+ if environ.get('HTTPS', 'off') in ('on', '1'):
+ environ['wsgi.url_scheme'] = 'https'
+ else:
+ environ['wsgi.url_scheme'] = 'http'
+
+ headers_set = []
+ headers_sent = []
+ result = None
+
+ def write(data):
+ assert type(data) is str, 'write() argument must be string'
+ assert headers_set, 'write() before start_response()'
+
+ if not headers_sent:
+ status, responseHeaders = headers_sent[:] = headers_set
+ found = False
+ for header,value in responseHeaders:
+ if header.lower() == 'content-length':
+ found = True
+ break
+ if not found and result is not None:
+ try:
+ if len(result) == 1:
+ responseHeaders.append(('Content-Length',
+ str(len(data))))
+ except:
+ pass
+ s = 'Status: %s\r\n' % status
+ for header in responseHeaders:
+ s += '%s: %s\r\n' % header
+ s += '\r\n'
+ try:
+ request.stdout.write(s)
+ except socket.error, e:
+ if e[0] != errno.EPIPE:
+ raise
+
+ try:
+ request.stdout.write(data)
+ request.stdout.flush()
+ except socket.error, e:
+ if e[0] != errno.EPIPE:
+ raise
+
+ def start_response(status, response_headers, exc_info=None):
+ if exc_info:
+ try:
+ if headers_sent:
+ # Re-raise if too late
+ raise exc_info[0], exc_info[1], exc_info[2]
+ finally:
+ exc_info = None # avoid dangling circular ref
+ else:
+ assert not headers_set, 'Headers already set!'
+
+ assert type(status) is str, 'Status must be a string'
+ assert len(status) >= 4, 'Status must be at least 4 characters'
+ assert int(status[:3]), 'Status must begin with 3-digit code'
+ assert status[3] == ' ', 'Status must have a space after code'
+ assert type(response_headers) is list, 'Headers must be a list'
+ if __debug__:
+ for name,val in response_headers:
+ assert type(name) is str, 'Header names must be strings'
+ assert type(val) is str, 'Header values must be strings'
+
+ headers_set[:] = [status, response_headers]
+ return write
+
+ if not self.multithreaded:
+ self._appLock.acquire()
+ try:
+ result = self.application(environ, start_response)
+ try:
+ for data in result:
+ if data:
+ write(data)
+ if not headers_sent:
+ write('') # in case body was empty
+ finally:
+ if hasattr(result, 'close'):
+ result.close()
+ finally:
+ if not self.multithreaded:
+ self._appLock.release()
+
+ def error(self, request):
+ """
+ Override to provide custom error handling. Ideally, however,
+ all errors should be caught at the application level.
+ """
+ import cgitb
+ request.stdout.write('Content-Type: text/html\r\n\r\n' +
+ cgitb.html(sys.exc_info()))
diff --git a/flup/server/scgi_fork.py b/flup/server/scgi_fork.py
index 05a527c..52dd481 100644
--- a/flup/server/scgi_fork.py
+++ b/flup/server/scgi_fork.py
@@ -63,239 +63,15 @@ Example wrapper script:
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
-import sys
import logging
import socket
-import select
-import errno
-import cStringIO as StringIO
-import signal
-import datetime
-import prefork
-__all__ = ['WSGIServer']
-
-# The main classes use this name for logging.
-LoggerName = 'scgi-wsgi'
-
-# Set up module-level logger.
-console = logging.StreamHandler()
-console.setLevel(logging.DEBUG)
-console.setFormatter(logging.Formatter('%(asctime)s : %(message)s',
- '%Y-%m-%d %H:%M:%S'))
-logging.getLogger(LoggerName).addHandler(console)
-del console
-
-class ProtocolError(Exception):
- """
- Exception raised when the server does something unexpected or
- sends garbled data. Usually leads to a Connection closing.
- """
- pass
-
-def recvall(sock, length):
- """
- Attempts to receive length bytes from a socket, blocking if necessary.
- (Socket may be blocking or non-blocking.)
- """
- dataList = []
- recvLen = 0
- while length:
- try:
- data = sock.recv(length)
- except socket.error, e:
- if e[0] == errno.EAGAIN:
- select.select([sock], [], [])
- continue
- else:
- raise
- if not data: # EOF
- break
- dataList.append(data)
- dataLen = len(data)
- recvLen += dataLen
- length -= dataLen
- return ''.join(dataList), recvLen
-
-def readNetstring(sock):
- """
- Attempt to read a netstring from a socket.
- """
- # First attempt to read the length.
- size = ''
- while True:
- try:
- c = sock.recv(1)
- except socket.error, e:
- if e[0] == errno.EAGAIN:
- select.select([sock], [], [])
- continue
- else:
- raise
- if c == ':':
- break
- if not c:
- raise EOFError
- size += c
-
- # Try to decode the length.
- try:
- size = int(size)
- if size < 0:
- raise ValueError
- except ValueError:
- raise ProtocolError, 'invalid netstring length'
-
- # Now read the string.
- s, length = recvall(sock, size)
-
- if length < size:
- raise EOFError
-
- # Lastly, the trailer.
- trailer, length = recvall(sock, 1)
-
- if length < 1:
- raise EOFError
-
- if trailer != ',':
- raise ProtocolError, 'invalid netstring trailer'
+from scgi_base import BaseSCGIServer, Connection
+from preforkserver import PreforkServer
- return s
-
-class StdoutWrapper(object):
- """
- Wrapper for sys.stdout so we know if data has actually been written.
- """
- def __init__(self, stdout):
- self._file = stdout
- self.dataWritten = False
-
- def write(self, data):
- if data:
- self.dataWritten = True
- self._file.write(data)
-
- def writelines(self, lines):
- for line in lines:
- self.write(line)
-
- def __getattr__(self, name):
- return getattr(self._file, name)
-
-class Request(object):
- """
- Encapsulates data related to a single request.
-
- Public attributes:
- environ - Environment variables from web server.
- stdin - File-like object representing the request body.
- stdout - File-like object for writing the response.
- """
- def __init__(self, conn, environ, input, output):
- self._conn = conn
- self.environ = environ
- self.stdin = input
- self.stdout = StdoutWrapper(output)
-
- self.logger = logging.getLogger(LoggerName)
-
- def run(self):
- self.logger.info('%s %s%s',
- self.environ['REQUEST_METHOD'],
- self.environ.get('SCRIPT_NAME', ''),
- self.environ.get('PATH_INFO', ''))
-
- start = datetime.datetime.now()
-
- try:
- self._conn.server.handler(self)
- except:
- self.logger.exception('Exception caught from handler')
- if not self.stdout.dataWritten:
- self._conn.server.error(self)
-
- end = datetime.datetime.now()
-
- handlerTime = end - start
- self.logger.debug('%s %s%s done (%.3f secs)',
- self.environ['REQUEST_METHOD'],
- self.environ.get('SCRIPT_NAME', ''),
- self.environ.get('PATH_INFO', ''),
- handlerTime.seconds +
- handlerTime.microseconds / 1000000.0)
-
-class Connection(object):
- """
- Represents a single client (web server) connection. A single request
- is handled, after which the socket is closed.
- """
- def __init__(self, sock, addr, server):
- self._sock = sock
- self._addr = addr
- self.server = server
-
- self.logger = logging.getLogger(LoggerName)
-
- def run(self):
- self.logger.debug('Connection starting up (%s:%d)',
- self._addr[0], self._addr[1])
-
- try:
- self.processInput()
- except EOFError:
- pass
- except ProtocolError, e:
- self.logger.error("Protocol error '%s'", str(e))
- except:
- self.logger.exception('Exception caught in Connection')
-
- self.logger.debug('Connection shutting down (%s:%d)',
- self._addr[0], self._addr[1])
-
- # All done!
- self._sock.close()
-
- def processInput(self):
- # Read headers
- headers = readNetstring(self._sock)
- headers = headers.split('\x00')[:-1]
- if len(headers) % 2 != 0:
- raise ProtocolError, 'invalid headers'
- environ = {}
- for i in range(len(headers) / 2):
- environ[headers[2*i]] = headers[2*i+1]
-
- clen = environ.get('CONTENT_LENGTH')
- if clen is None:
- raise ProtocolError, 'missing CONTENT_LENGTH'
- try:
- clen = int(clen)
- if clen < 0:
- raise ValueError
- except ValueError:
- raise ProtocolError, 'invalid CONTENT_LENGTH'
-
- self._sock.setblocking(1)
- if clen:
- input = self._sock.makefile('r')
- else:
- # Empty input.
- input = StringIO.StringIO()
-
- # stdout
- output = self._sock.makefile('w')
-
- # Allocate Request
- req = Request(self, environ, input, output)
-
- # Run it.
- req.run()
-
- output.close()
- input.close()
+__all__ = ['WSGIServer']
-class WSGIServer(prefork.PreforkServer):
+class WSGIServer(BaseSCGIServer, PreforkServer):
"""
SCGI/WSGI server. For information about SCGI (Simple Common Gateway
Interface), see <http://www.mems-exchange.org/software/scgi/>.
@@ -311,9 +87,6 @@ class WSGIServer(prefork.PreforkServer):
of preforking to be quite superior. So if your application really doesn't
mind running in multiple processes, go use SWAP. ;)
"""
- # What Request class to use.
- requestClass = Request
-
def __init__(self, application, environ=None,
bindAddress=('localhost', 4000), allowedServers=None,
loggingLevel=logging.INFO, **kw):
@@ -331,46 +104,17 @@ class WSGIServer(prefork.PreforkServer):
connections from anywhere.
loggingLevel sets the logging level of the module-level logger.
-
- Any additional keyword arguments are passed to the underlying
- ThreadPool.
"""
- if kw.has_key('jobClass'):
- del kw['jobClass']
- if kw.has_key('jobArgs'):
- del kw['jobArgs']
- super(WSGIServer, self).__init__(jobClass=Connection,
- jobArgs=(self,), **kw)
-
- if environ is None:
- environ = {}
-
- self.application = application
- self.environ = environ
- self._bindAddress = bindAddress
- self._allowedServers = allowedServers
-
- self.logger = logging.getLogger(LoggerName)
- self.logger.setLevel(loggingLevel)
-
- def _setupSocket(self):
- """Creates and binds the socket for communication with the server."""
- sock = socket.socket()
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(self._bindAddress)
- sock.listen(socket.SOMAXCONN)
- return sock
-
- def _cleanupSocket(self, sock):
- """Closes the main socket."""
- sock.close()
-
- def _isClientAllowed(self, addr):
- ret = self._allowedServers is None or addr[0] in self._allowedServers
- if not ret:
- self.logger.warning('Server connection from %s disallowed',
- addr[0])
- return ret
+ BaseSCGIServer.__init__(self, application,
+ environ=environ,
+ multithreaded=False,
+ bindAddress=bindAddress,
+ allowedServers=allowedServers,
+ loggingLevel=loggingLevel)
+ for key in ('multithreaded', 'jobClass', 'jobArgs'):
+ if kw.has_key(key):
+ del kw[key]
+ PreforkServer.__init__(self, jobClass=Connection, jobArgs=(self,), **kw)
def run(self):
"""
@@ -386,117 +130,15 @@ class WSGIServer(prefork.PreforkServer):
self.logger.error('Failed to bind socket (%s), exiting', e[1])
return False
- ret = super(WSGIServer, self).run(sock)
+ ret = PreforkServer.run(self, sock)
self._cleanupSocket(sock)
- self.logger.info('%s shutting down', self.__class__.__name__)
+ self.logger.info('%s shutting down%s', self.__class__.__name__,
+ self._hupReceived and ' (reload requested)' or '')
return ret
- def handler(self, request):
- """
- WSGI handler. Sets up WSGI environment, calls the application,
- and sends the application's response.
- """
- environ = request.environ
- environ.update(self.environ)
-
- environ['wsgi.version'] = (1,0)
- environ['wsgi.input'] = request.stdin
- environ['wsgi.errors'] = sys.stderr
- environ['wsgi.multithread'] = False
- environ['wsgi.multiprocess'] = True
- environ['wsgi.run_once'] = False
-
- if environ.get('HTTPS', 'off') in ('on', '1'):
- environ['wsgi.url_scheme'] = 'https'
- else:
- environ['wsgi.url_scheme'] = 'http'
-
- headers_set = []
- headers_sent = []
- result = None
-
- def write(data):
- assert type(data) is str, 'write() argument must be string'
- assert headers_set, 'write() before start_response()'
-
- if not headers_sent:
- status, responseHeaders = headers_sent[:] = headers_set
- found = False
- for header,value in responseHeaders:
- if header.lower() == 'content-length':
- found = True
- break
- if not found and result is not None:
- try:
- if len(result) == 1:
- responseHeaders.append(('Content-Length',
- str(len(data))))
- except:
- pass
- s = 'Status: %s\r\n' % status
- for header in responseHeaders:
- s += '%s: %s\r\n' % header
- s += '\r\n'
- try:
- request.stdout.write(s)
- except socket.error, e:
- if e[0] != errno.EPIPE:
- raise
-
- try:
- request.stdout.write(data)
- request.stdout.flush()
- except socket.error, e:
- if e[0] != errno.EPIPE:
- raise
-
- def start_response(status, response_headers, exc_info=None):
- if exc_info:
- try:
- if headers_sent:
- # Re-raise if too late
- raise exc_info[0], exc_info[1], exc_info[2]
- finally:
- exc_info = None # avoid dangling circular ref
- else:
- assert not headers_set, 'Headers already set!'
-
- assert type(status) is str, 'Status must be a string'
- assert len(status) >= 4, 'Status must be at least 4 characters'
- assert int(status[:3]), 'Status must begin with 3-digit code'
- assert status[3] == ' ', 'Status must have a space after code'
- assert type(response_headers) is list, 'Headers must be a list'
- if __debug__:
- for name,val in response_headers:
- assert type(name) is str, 'Header names must be strings'
- assert type(val) is str, 'Header values must be strings'
-
- headers_set[:] = [status, response_headers]
- return write
-
- result = self.application(environ, start_response)
- try:
- for data in result:
- if data:
- write(data)
- if not headers_sent:
- write('') # in case body was empty
- finally:
- if hasattr(result, 'close'):
- result.close()
-
- def error(self, request):
- """
- Override to provide custom error handling. Ideally, however,
- all errors should be caught at the application level.
- """
- import cgitb
- request.stdout.write('Content-Type: text/html\r\n\r\n' +
- cgitb.html(sys.exc_info()))
-
if __name__ == '__main__':
def test_app(environ, start_response):
"""Probably not the most efficient example."""
diff --git a/flup/server/threadedserver.py b/flup/server/threadedserver.py
new file mode 100644
index 0000000..a61db19
--- /dev/null
+++ b/flup/server/threadedserver.py
@@ -0,0 +1,151 @@
+# Copyright (c) 2005 Allan Saddi <allan@saddi.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+#
+# $Id$
+
+__author__ = 'Allan Saddi <allan@saddi.com>'
+__version__ = '$Revision$'
+
+import socket
+import select
+import signal
+import errno
+
+from threadpool import ThreadPool
+
+__all__ = ['ThreadedServer']
+
+class ThreadedServer(object):
+ def __init__(self, jobClass=None, jobArgs=(), **kw):
+ self._jobClass = jobClass
+ self._jobArgs = jobArgs
+
+ self._threadPool = ThreadPool(**kw)
+
+ def run(self, sock, timeout=1.0):
+ """
+ The main loop. Pass a socket that is ready to accept() client
+ connections. Return value will be True or False indiciating whether
+ or not the loop was exited due to SIGHUP.
+ """
+ # Set up signal handlers.
+ self._keepGoing = True
+ self._hupReceived = False
+ self._installSignalHandlers()
+
+ # Main loop.
+ while self._keepGoing:
+ try:
+ r, w, e = select.select([sock], [], [], timeout)
+ except select.error, e:
+ if e[0] == errno.EINTR:
+ continue
+ raise
+
+ if r:
+ try:
+ clientSock, addr = sock.accept()
+ except socket.error, e:
+ if e[0] in (errno.EINTR, errno.EAGAIN):
+ continue
+ raise
+
+ if not self._isClientAllowed(addr):
+ clientSock.close()
+ continue
+
+ # Hand off to Connection.
+ conn = self._jobClass(clientSock, addr, *self._jobArgs)
+ if not self._threadPool.addJob(conn, allowQueuing=False):
+ # No thread left, immediately close the socket to hopefully
+ # indicate to the web server that we're at our limit...
+ # and to prevent having too many opened (and useless)
+ # files.
+ clientSock.close()
+
+ self._mainloopPeriodic()
+
+ # Restore signal handlers.
+ self._restoreSignalHandlers()
+
+ # Return bool based on whether or not SIGHUP was received.
+ return self._hupReceived
+
+ def _mainloopPeriodic(self):
+ """
+ Called with just about each iteration of the main loop. Meant to
+ be overridden.
+ """
+ pass
+
+ def _exit(self, reload=False):
+ """
+ Protected convenience method for subclasses to force an exit. Not
+ really thread-safe, which is why it isn't public.
+ """
+ if self._keepGoing:
+ self._keepGoing = False
+ self._hupReceived = reload
+
+ def _isClientAllowed(self, addr):
+ """Override to provide access control."""
+ return True
+
+ # Signal handlers
+
+ def _hupHandler(self, signum, frame):
+ self._hupReceived = True
+ self._keepGoing = False
+
+ def _intHandler(self, signum, frame):
+ self._keepGoing = False
+
+ def _installSignalHandlers(self):
+ self._oldSIGs = [(x,signal.getsignal(x)) for x in
+ (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)]
+ signal.signal(signal.SIGHUP, self._hupHandler)
+ signal.signal(signal.SIGINT, self._intHandler)
+ signal.signal(signal.SIGTERM, self._intHandler)
+
+ def _restoreSignalHandlers(self):
+ for signum,handler in self._oldSIGs:
+ signal.signal(signum, handler)
+
+if __name__ == '__main__':
+ class TestJob(object):
+ def __init__(self, sock, addr):
+ self._sock = sock
+ self._addr = addr
+ def run(self):
+ print "Client connection opened from %s:%d" % self._addr
+ self._sock.send('Hello World!\n')
+ self._sock.setblocking(1)
+ self._sock.recv(1)
+ self._sock.close()
+ print "Client connection closed from %s:%d" % self._addr
+ sock = socket.socket()
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(('', 8080))
+ sock.listen(socket.SOMAXCONN)
+ ThreadedServer(maxThreads=10, jobClass=TestJob).run(sock)