From e212206ee08bf84a6ad8bc93ab681f951607e573 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Fri, 3 Apr 2015 15:51:57 +0000 Subject: QPID-6474: [Python Client] Prevent client connection thread leaks on the 08..09 code path git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1671094 13f79535-47bb-0310-9956-ffa450edef68 --- python/qpid/client.py | 3 ++- python/qpid/connection08.py | 4 ++++ python/qpid/peer.py | 43 ++++++++++++++++++++++++++++++++++++------- 3 files changed, 42 insertions(+), 8 deletions(-) (limited to 'python') diff --git a/python/qpid/client.py b/python/qpid/client.py index fbec7ccc7d..7ae0106b51 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -126,7 +126,8 @@ class Client: return self.channel(id) def close(self): - self.socket.close() + if self.peer: + self.peer.stop() class ClientDelegate(Delegate): diff --git a/python/qpid/connection08.py b/python/qpid/connection08.py index ae4ccfc7e0..9565937a6e 100644 --- a/python/qpid/connection08.py +++ b/python/qpid/connection08.py @@ -118,6 +118,7 @@ class Connection: self.FRAME_END = self.spec.constants.byname["frame_end"].id self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor)) self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor)) + self.io = io def flush(self): self.codec.flush() @@ -250,6 +251,9 @@ class Connection: def read_99_0(self): return self.read_0_10() + def close(self): + self.io.close(); + class Frame: DECODERS = {} diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 851435cd69..a49dc3e661 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -24,7 +24,7 @@ sorts incoming frames to their intended channels, and dispatches incoming method frames to a delegate. """ -import thread, threading, traceback, socket, sys, logging +import threading, traceback, socket, sys from connection08 import EOF, Method, Header, Body, Request, Response, VersionError from message import Message from queue import Queue, Closed as QueueClosed @@ -32,6 +32,9 @@ from content import Content from cStringIO import StringIO from time import time from exceptions import Closed +from logging import getLogger + +log = getLogger("qpid.peer") class Sequence: @@ -39,7 +42,7 @@ class Sequence: # we should keep start for wrap around self._next = start self.step = step - self.lock = thread.allocate_lock() + self.lock = threading.Lock() def next(self): self.lock.acquire() @@ -58,7 +61,7 @@ class Peer: self.outgoing = Queue(0) self.work = Queue(0) self.channels = {} - self.lock = thread.allocate_lock() + self.lock = threading.Lock() if channel_factory: self.channel_factory = channel_factory else: @@ -77,13 +80,20 @@ class Peer: return ch def start(self): - thread.start_new_thread(self.writer, ()) - thread.start_new_thread(self.reader, ()) - thread.start_new_thread(self.worker, ()) + self.writer_thread = threading.Thread(target=self.writer) + self.writer_thread.daemon = True + self.writer_thread.start() + + self.reader_thread = threading.Thread(target=self.reader) + self.reader_thread.daemon = True + self.reader_thread.start() + + self.worker_thread = threading.Thread(target=self.worker) + self.worker_thread.daemon = True + self.worker_thread.start() def fatal(self, message=None): """Call when an unexpected exception occurs that will kill a thread.""" - if message: print >> sys.stderr, message self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc())) def reader(self): @@ -119,6 +129,8 @@ class Peer: self.closed(e) break self.conn.flush() + except QueueClosed: + pass except: self.fatal() @@ -139,6 +151,23 @@ class Peer: except: self.fatal() + def stop(self): + try: + self.work.close(); + self.outgoing.close(); + self.conn.close(); + finally: + timeout = 1; + self.worker_thread.join(timeout); + if self.worker_thread.isAlive(): + log.warn("Worker thread failed to shutdown within timeout") + self.reader_thread.join(timeout); + if self.reader_thread.isAlive(): + log.warn("Reader thread failed to shutdown within timeout") + self.writer_thread.join(timeout); + if self.writer_thread.isAlive(): + log.warn("Writer thread failed to shutdown within timeout") + class Requester: def __init__(self, writer): -- cgit v1.2.1