summaryrefslogtreecommitdiff
path: root/paste/httpserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'paste/httpserver.py')
-rwxr-xr-xpaste/httpserver.py155
1 files changed, 146 insertions, 9 deletions
diff --git a/paste/httpserver.py b/paste/httpserver.py
index 7d91056..4fcf35e 100755
--- a/paste/httpserver.py
+++ b/paste/httpserver.py
@@ -17,9 +17,10 @@ if pyOpenSSL is installed, it also provides SSL capabilities.
# @@: add support for chunked encoding, this is not a 1.1 server
# till this is completed.
+import errno, socket, sys, threading, urlparse, Queue
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SocketServer import ThreadingMixIn
-import urlparse, sys, socket
+from paste.deploy import converters
__all__ = ['WSGIHandlerMixin','WSGIServer','WSGIHandler', 'serve']
__version__ = "0.5"
@@ -326,9 +327,118 @@ class WSGIHandler(WSGIHandlerMixin, BaseHTTPRequestHandler):
except SocketErrors, exce:
self.wsgi_connection_drop(exce)
-class WSGIServer(ThreadingMixIn, SecureHTTPServer):
- daemon_threads = False
+class ThreadPool(object):
+ """
+ Generic thread pool with a queue of callables to consume.
+ """
+ SHUTDOWN = object()
+
+ def __init__(self, nworkers, name="ThreadPool"):
+ """
+ Create thread pool with `nworkers` worker threads.
+ """
+ self.nworkers = nworkers
+ self.name = name
+ self.queue = Queue.Queue()
+ self.workers = []
+ for i in range(self.nworkers):
+ worker = threading.Thread(target=self.worker_thread_callback,
+ name=("%s worker %d" % (self.name, i)))
+ worker.start()
+ self.workers.append(worker)
+
+ def worker_thread_callback(self):
+ """
+ Worker thread should call this method to get and process queued
+ callables.
+ """
+ while True:
+ runnable = self.queue.get()
+ if runnable is ThreadPool.SHUTDOWN:
+ return
+ else:
+ runnable()
+
+ def shutdown(self):
+ """
+ Shutdown the queue (after finishing any pending requests).
+ """
+ # Add a shutdown request for every worker
+ for i in range(self.nworkers):
+ self.queue.put(ThreadPool.SHUTDOWN)
+ # Wait for each thread to terminate
+ for worker in self.workers:
+ worker.join()
+
+class ThreadPoolMixIn:
+ """
+ Mix-in class to process requests from a thread pool
+ """
+ def __init__(self, nworkers):
+ # Create and start the workers
+ self.running = True
+ assert nworkers > 0, "ThreadPoolMixin servers must have at least one worker"
+ self.thread_pool = ThreadPool(nworkers,
+ "ThreadPoolMixin HTTP server on %s:%d"
+ % (self.server_name, self.server_port))
+
+ def process_request(self, request, client_address):
+ """
+ Queue the request to be processed by on of the thread pool threads
+ """
+ # This sets the socket to blocking mode (and no timeout) since it
+ # may take the thread pool a little while to get back to it. (This
+ # is the default but since we set a timeout on the parent socket so
+ # that we can trap interrupts we need to restore this,.)
+ request.setblocking(1)
+ # Queue processing of the request
+ self.thread_pool.queue.put(
+ lambda: self.process_request_in_thread(request, client_address))
+
+ def process_request_in_thread(self, request, client_address):
+ """
+ The worker thread should call back here to do the rest of the
+ request processing. Error handling normaller done in 'handle_request'
+ must be done here.
+ """
+ try:
+ self.finish_request(request, client_address)
+ self.close_request(request)
+ except:
+ self.handle_error(request, client_address)
+ self.close_request(request)
+ def serve_forever(self):
+ """
+ Overrides `serve_forever` to shut the threadpool down cleanly.
+ """
+ try:
+ while self.running:
+ try:
+ self.handle_request()
+ except socket.timeout:
+ # Timeout is expected, gives interrupts a chance to
+ # propogate, just keep handling
+ pass
+ finally:
+ self.thread_pool.shutdown()
+
+ def server_activate(self):
+ """
+ Overrides server_activate to set timeout on our listener socket.
+ """
+ # We set the timeout here so that we can trap interrupts on windows
+ self.socket.settimeout(1)
+ self.socket.listen(self.request_queue_size)
+
+ def server_close(self):
+ """
+ Finish pending requests and shutdown the server.
+ """
+ self.running = False
+ self.socket.close()
+
+class WSGIServerBase(SecureHTTPServer):
def __init__(self, wsgi_application, server_address,
RequestHandlerClass=None, ssl_context=None):
SecureHTTPServer.__init__(self, server_address,
@@ -343,9 +453,20 @@ class WSGIServer(ThreadingMixIn, SecureHTTPServer):
conn.settimeout(self.wsgi_socket_timeout)
return (conn, info)
+class WSGIServer(ThreadingMixIn, WSGIServerBase):
+ daemon_threads = False
+
+class WSGIThreadPoolServer(ThreadPoolMixIn, WSGIServerBase):
+ def __init__(self, wsgi_application, server_address,
+ RequestHandlerClass=None, ssl_context=None, nworkers=10):
+ WSGIServerBase.__init__(self, wsgi_application, server_address,
+ RequestHandlerClass, ssl_context)
+ ThreadPoolMixIn.__init__(self, nworkers)
+
def serve(application, host=None, port=None, handler=None, ssl_pem=None,
server_version=None, protocol_version=None, start_loop=True,
- daemon_threads=None, socket_timeout=None):
+ daemon_threads=None, socket_timeout=None, use_threadpool=True,
+ threadpool_workers=10):
"""
Serves your ``application`` over HTTP(S) via WSGI interface
@@ -414,6 +535,17 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None,
disconnect, but at a later time it might follow the RFC a bit
more closely.
+ ``use_threadpool``
+
+ Server requests from a pool of worker threads (``threadpool_workers``)
+ rather than creating a new thread for each request. This can
+ substantially reduce latency since there is a high cost associated
+ with thread creation.
+
+ ``threadpool_workers``
+
+ Number of worker threads to create when ``use_threadpool`` is true. This
+ can be a string or an integer value.
"""
ssl_context = None
if ssl_pem:
@@ -440,13 +572,19 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None,
assert protocol_version in ('HTTP/0.9','HTTP/1.0','HTTP/1.1')
handler.protocol_version = protocol_version
- server = WSGIServer(application, server_address, handler, ssl_context)
- if daemon_threads:
- server.daemon_threads = daemon_threads
+
+ if converters.asbool(use_threadpool):
+ server = WSGIThreadPoolServer(application, server_address, handler,
+ ssl_context, int(threadpool_workers))
+ else:
+ server = WSGIServer(application, server_address, handler, ssl_context)
+ if daemon_threads:
+ server.daemon_threads = daemon_threads
+
if socket_timeout:
server.wsgi_socket_timeout = int(socket_timeout)
- if start_loop:
+ if converters.asbool(start_loop):
print "serving on %s:%s" % server.server_address
try:
server.serve_forever()
@@ -472,4 +610,3 @@ if __name__ == '__main__':
#serve(dump_environ, ssl_pem="test.pem")
serve(dump_environ, server_version="Wombles/1.0",
protocol_version="HTTP/1.1", port="8888")
-