summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-04-03 15:51:57 +0000
committerKeith Wall <kwall@apache.org>2015-04-03 15:51:57 +0000
commite212206ee08bf84a6ad8bc93ab681f951607e573 (patch)
tree615603392e4a5d8e331ec7fd3b0bd70b22c5ecaa /python
parent9c56a9d95c20e68d20ace10d2623f1f76e0e3bcf (diff)
downloadqpid-python-e212206ee08bf84a6ad8bc93ab681f951607e573.tar.gz
QPID-6474: [Python Client] Prevent client connection thread leaks on the 08..09 code path
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1671094 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/client.py3
-rw-r--r--python/qpid/connection08.py4
-rw-r--r--python/qpid/peer.py43
3 files changed, 42 insertions, 8 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py
index fbec7ccc7d..7ae0106b51 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -126,7 +126,8 @@ class Client:
return self.channel(id)
def close(self):
- self.socket.close()
+ if self.peer:
+ self.peer.stop()
class ClientDelegate(Delegate):
diff --git a/python/qpid/connection08.py b/python/qpid/connection08.py
index ae4ccfc7e0..9565937a6e 100644
--- a/python/qpid/connection08.py
+++ b/python/qpid/connection08.py
@@ -118,6 +118,7 @@ class Connection:
self.FRAME_END = self.spec.constants.byname["frame_end"].id
self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor))
self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor))
+ self.io = io
def flush(self):
self.codec.flush()
@@ -250,6 +251,9 @@ class Connection:
def read_99_0(self):
return self.read_0_10()
+ def close(self):
+ self.io.close();
+
class Frame:
DECODERS = {}
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 851435cd69..a49dc3e661 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -24,7 +24,7 @@ sorts incoming frames to their intended channels, and dispatches
incoming method frames to a delegate.
"""
-import thread, threading, traceback, socket, sys, logging
+import threading, traceback, socket, sys
from connection08 import EOF, Method, Header, Body, Request, Response, VersionError
from message import Message
from queue import Queue, Closed as QueueClosed
@@ -32,6 +32,9 @@ from content import Content
from cStringIO import StringIO
from time import time
from exceptions import Closed
+from logging import getLogger
+
+log = getLogger("qpid.peer")
class Sequence:
@@ -39,7 +42,7 @@ class Sequence:
# we should keep start for wrap around
self._next = start
self.step = step
- self.lock = thread.allocate_lock()
+ self.lock = threading.Lock()
def next(self):
self.lock.acquire()
@@ -58,7 +61,7 @@ class Peer:
self.outgoing = Queue(0)
self.work = Queue(0)
self.channels = {}
- self.lock = thread.allocate_lock()
+ self.lock = threading.Lock()
if channel_factory:
self.channel_factory = channel_factory
else:
@@ -77,13 +80,20 @@ class Peer:
return ch
def start(self):
- thread.start_new_thread(self.writer, ())
- thread.start_new_thread(self.reader, ())
- thread.start_new_thread(self.worker, ())
+ self.writer_thread = threading.Thread(target=self.writer)
+ self.writer_thread.daemon = True
+ self.writer_thread.start()
+
+ self.reader_thread = threading.Thread(target=self.reader)
+ self.reader_thread.daemon = True
+ self.reader_thread.start()
+
+ self.worker_thread = threading.Thread(target=self.worker)
+ self.worker_thread.daemon = True
+ self.worker_thread.start()
def fatal(self, message=None):
"""Call when an unexpected exception occurs that will kill a thread."""
- if message: print >> sys.stderr, message
self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
def reader(self):
@@ -119,6 +129,8 @@ class Peer:
self.closed(e)
break
self.conn.flush()
+ except QueueClosed:
+ pass
except:
self.fatal()
@@ -139,6 +151,23 @@ class Peer:
except:
self.fatal()
+ def stop(self):
+ try:
+ self.work.close();
+ self.outgoing.close();
+ self.conn.close();
+ finally:
+ timeout = 1;
+ self.worker_thread.join(timeout);
+ if self.worker_thread.isAlive():
+ log.warn("Worker thread failed to shutdown within timeout")
+ self.reader_thread.join(timeout);
+ if self.reader_thread.isAlive():
+ log.warn("Reader thread failed to shutdown within timeout")
+ self.writer_thread.join(timeout);
+ if self.writer_thread.isAlive():
+ log.warn("Writer thread failed to shutdown within timeout")
+
class Requester:
def __init__(self, writer):