diff options
Diffstat (limited to 'paste/httpserver.py')
| -rwxr-xr-x | paste/httpserver.py | 155 |
1 files changed, 146 insertions, 9 deletions
diff --git a/paste/httpserver.py b/paste/httpserver.py index 7d91056..4fcf35e 100755 --- a/paste/httpserver.py +++ b/paste/httpserver.py @@ -17,9 +17,10 @@ if pyOpenSSL is installed, it also provides SSL capabilities. # @@: add support for chunked encoding, this is not a 1.1 server # till this is completed. +import errno, socket, sys, threading, urlparse, Queue from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer from SocketServer import ThreadingMixIn -import urlparse, sys, socket +from paste.deploy import converters __all__ = ['WSGIHandlerMixin','WSGIServer','WSGIHandler', 'serve'] __version__ = "0.5" @@ -326,9 +327,118 @@ class WSGIHandler(WSGIHandlerMixin, BaseHTTPRequestHandler): except SocketErrors, exce: self.wsgi_connection_drop(exce) -class WSGIServer(ThreadingMixIn, SecureHTTPServer): - daemon_threads = False +class ThreadPool(object): + """ + Generic thread pool with a queue of callables to consume. + """ + SHUTDOWN = object() + + def __init__(self, nworkers, name="ThreadPool"): + """ + Create thread pool with `nworkers` worker threads. + """ + self.nworkers = nworkers + self.name = name + self.queue = Queue.Queue() + self.workers = [] + for i in range(self.nworkers): + worker = threading.Thread(target=self.worker_thread_callback, + name=("%s worker %d" % (self.name, i))) + worker.start() + self.workers.append(worker) + + def worker_thread_callback(self): + """ + Worker thread should call this method to get and process queued + callables. + """ + while True: + runnable = self.queue.get() + if runnable is ThreadPool.SHUTDOWN: + return + else: + runnable() + + def shutdown(self): + """ + Shutdown the queue (after finishing any pending requests). + """ + # Add a shutdown request for every worker + for i in range(self.nworkers): + self.queue.put(ThreadPool.SHUTDOWN) + # Wait for each thread to terminate + for worker in self.workers: + worker.join() + +class ThreadPoolMixIn: + """ + Mix-in class to process requests from a thread pool + """ + def __init__(self, nworkers): + # Create and start the workers + self.running = True + assert nworkers > 0, "ThreadPoolMixin servers must have at least one worker" + self.thread_pool = ThreadPool(nworkers, + "ThreadPoolMixin HTTP server on %s:%d" + % (self.server_name, self.server_port)) + + def process_request(self, request, client_address): + """ + Queue the request to be processed by on of the thread pool threads + """ + # This sets the socket to blocking mode (and no timeout) since it + # may take the thread pool a little while to get back to it. (This + # is the default but since we set a timeout on the parent socket so + # that we can trap interrupts we need to restore this,.) + request.setblocking(1) + # Queue processing of the request + self.thread_pool.queue.put( + lambda: self.process_request_in_thread(request, client_address)) + + def process_request_in_thread(self, request, client_address): + """ + The worker thread should call back here to do the rest of the + request processing. Error handling normaller done in 'handle_request' + must be done here. + """ + try: + self.finish_request(request, client_address) + self.close_request(request) + except: + self.handle_error(request, client_address) + self.close_request(request) + def serve_forever(self): + """ + Overrides `serve_forever` to shut the threadpool down cleanly. + """ + try: + while self.running: + try: + self.handle_request() + except socket.timeout: + # Timeout is expected, gives interrupts a chance to + # propogate, just keep handling + pass + finally: + self.thread_pool.shutdown() + + def server_activate(self): + """ + Overrides server_activate to set timeout on our listener socket. + """ + # We set the timeout here so that we can trap interrupts on windows + self.socket.settimeout(1) + self.socket.listen(self.request_queue_size) + + def server_close(self): + """ + Finish pending requests and shutdown the server. + """ + self.running = False + self.socket.close() + +class WSGIServerBase(SecureHTTPServer): def __init__(self, wsgi_application, server_address, RequestHandlerClass=None, ssl_context=None): SecureHTTPServer.__init__(self, server_address, @@ -343,9 +453,20 @@ class WSGIServer(ThreadingMixIn, SecureHTTPServer): conn.settimeout(self.wsgi_socket_timeout) return (conn, info) +class WSGIServer(ThreadingMixIn, WSGIServerBase): + daemon_threads = False + +class WSGIThreadPoolServer(ThreadPoolMixIn, WSGIServerBase): + def __init__(self, wsgi_application, server_address, + RequestHandlerClass=None, ssl_context=None, nworkers=10): + WSGIServerBase.__init__(self, wsgi_application, server_address, + RequestHandlerClass, ssl_context) + ThreadPoolMixIn.__init__(self, nworkers) + def serve(application, host=None, port=None, handler=None, ssl_pem=None, server_version=None, protocol_version=None, start_loop=True, - daemon_threads=None, socket_timeout=None): + daemon_threads=None, socket_timeout=None, use_threadpool=True, + threadpool_workers=10): """ Serves your ``application`` over HTTP(S) via WSGI interface @@ -414,6 +535,17 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None, disconnect, but at a later time it might follow the RFC a bit more closely. + ``use_threadpool`` + + Server requests from a pool of worker threads (``threadpool_workers``) + rather than creating a new thread for each request. This can + substantially reduce latency since there is a high cost associated + with thread creation. + + ``threadpool_workers`` + + Number of worker threads to create when ``use_threadpool`` is true. This + can be a string or an integer value. """ ssl_context = None if ssl_pem: @@ -440,13 +572,19 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None, assert protocol_version in ('HTTP/0.9','HTTP/1.0','HTTP/1.1') handler.protocol_version = protocol_version - server = WSGIServer(application, server_address, handler, ssl_context) - if daemon_threads: - server.daemon_threads = daemon_threads + + if converters.asbool(use_threadpool): + server = WSGIThreadPoolServer(application, server_address, handler, + ssl_context, int(threadpool_workers)) + else: + server = WSGIServer(application, server_address, handler, ssl_context) + if daemon_threads: + server.daemon_threads = daemon_threads + if socket_timeout: server.wsgi_socket_timeout = int(socket_timeout) - if start_loop: + if converters.asbool(start_loop): print "serving on %s:%s" % server.server_address try: server.serve_forever() @@ -472,4 +610,3 @@ if __name__ == '__main__': #serve(dump_environ, ssl_pem="test.pem") serve(dump_environ, server_version="Wombles/1.0", protocol_version="HTTP/1.1", port="8888") - |
