summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-01-19 21:29:33 +0000
committerRafael H. Schloming <rhs@apache.org>2010-01-19 21:29:33 +0000
commit89e6f4eea1d172046c806ca3c01244bb0b9cfeee (patch)
treedac0125945c56e9a8a62406c6d180cf7735014a8 /python
parent07b7a61c7876b92b6ea0d6355d97cae4437df5c2 (diff)
downloadqpid-python-89e6f4eea1d172046c806ca3c01244bb0b9cfeee.tar.gz
fixed bug in destination/receiver correlation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@900967 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/driver.py32
-rw-r--r--python/qpid/messaging.py17
-rw-r--r--python/qpid/tests/messaging.py22
3 files changed, 51 insertions, 20 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index 345869cc48..858e2922ea 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -101,6 +101,8 @@ class SessionState:
# XXX: need to periodically exchange completion/known_completion
+ self.destinations = {}
+
def write_query(self, query, handler):
id = self.sent
self.write_cmd(query, lambda: handler(self.results.pop(id)))
@@ -500,6 +502,8 @@ class Driver:
_rcv = self._attachments.get(rcv)
if _rcv is None and not rcv.closing and not rcv.closed:
_rcv = Attachment(rcv)
+ _rcv.destination = str(rcv.id)
+ sst.destinations[_rcv.destination] = _rcv
_rcv.canceled = False
_rcv.draining = False
@@ -525,7 +529,7 @@ class Driver:
def do_link(type, subtype):
if type == "topic":
- _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
+ _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
filter = _rcv.options.get("filter")
if _rcv.subject is None and filter is None:
@@ -543,8 +547,8 @@ class Driver:
elif type == "queue":
_rcv._queue = _rcv.name
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
- sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination))
+ sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit))
rcv.linked = True
self.resolve_declare(sst, _rcv, "receiver", do_link)
@@ -554,12 +558,13 @@ class Driver:
if not _rcv.canceled:
def do_unlink():
del self._attachments[rcv]
+ del sst.destinations[_rcv.destination]
rcv.closed = True
if _rcv.options.get("delete") in ("always", "receiver"):
- sst.write_cmd(MessageCancel(rcv.destination))
+ sst.write_cmd(MessageCancel(_rcv.destination))
self.delete(sst, _rcv.name, do_unlink)
else:
- sst.write_cmd(MessageCancel(rcv.destination), do_unlink)
+ sst.write_cmd(MessageCancel(_rcv.destination), do_unlink)
_rcv.canceled = True
def resolve_declare(self, sst, lnk, dir, action):
@@ -725,7 +730,8 @@ class Driver:
sst.aborting = False
for rcv in ssn.receivers:
- sst.write_cmd(MessageStop(rcv.destination))
+ _rcv = self._attachments[rcv]
+ sst.write_cmd(MessageStop(_rcv.destination))
sst.write_cmd(ExecutionSync(), do_rb)
def grant(self, rcv):
@@ -745,12 +751,12 @@ class Driver:
delta = max(rcv.granted, rcv.received) - rcv.impending
if delta is UNLIMITED:
- sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
- sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value))
rcv.impending = UNLIMITED
elif delta > 0:
- sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
- sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta))
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
rcv.impending += delta
elif delta < 0 and not rcv.draining:
_rcv.draining = True
@@ -758,7 +764,7 @@ class Driver:
rcv.impending = rcv.received
_rcv.draining = False
self.grant(rcv)
- sst.write_cmd(MessageStop(rcv.destination), do_stop)
+ sst.write_cmd(MessageStop(_rcv.destination), do_stop)
if rcv.draining:
_rcv.draining = True
@@ -767,7 +773,7 @@ class Driver:
rcv.granted = rcv.impending
_rcv.draining = False
rcv.draining = False
- sst.write_cmd(MessageFlush(rcv.destination), do_flush)
+ sst.write_cmd(MessageFlush(_rcv.destination), do_flush)
def process_receiver(self, rcv):
@@ -821,7 +827,7 @@ class Driver:
ssn = sst.session
msg = self._decode(xfr)
- rcv = ssn.receivers[int(xfr.destination)]
+ rcv = sst.destinations[xfr.destination].target
msg._receiver = rcv
if rcv.impending is not UNLIMITED:
assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 817f919000..9ec38ad45c 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -432,7 +432,9 @@ class Session:
self.aborting = False
self.aborted = False
+ self.next_sender_id = 0
self.senders = []
+ self.next_receiver_id = 0
self.receivers = []
self.outgoing = []
self.incoming = []
@@ -477,7 +479,8 @@ class Session:
@rtype: Sender
@return: a new Sender for the specified target
"""
- sender = Sender(self, len(self.senders), target, options)
+ sender = Sender(self, self.next_sender_id, target, options)
+ self.next_sender_id += 1
self.senders.append(sender)
self._wakeup()
# XXX: because of the lack of waiting here we can end up getting
@@ -497,7 +500,8 @@ class Session:
@rtype: Receiver
@return: a new Receiver for the specified source
"""
- receiver = Receiver(self, len(self.receivers), source, options)
+ receiver = Receiver(self, self.next_receiver_id, source, options)
+ self.next_receiver_id += 1
self.receivers.append(receiver)
self._wakeup()
return receiver
@@ -630,9 +634,9 @@ class Sender:
Sends outgoing messages.
"""
- def __init__(self, session, index, target, options):
+ def __init__(self, session, id, target, options):
self.session = session
- self.index = index
+ self.id = id
self.target = target
self.options = options
self.capacity = options.get("capacity", UNLIMITED)
@@ -753,10 +757,9 @@ class Receiver(object):
fetched with L{fetch}.
"""
- def __init__(self, session, index, source, options):
+ def __init__(self, session, id, source, options):
self.session = session
- self.index = index
- self.destination = str(self.index)
+ self.id = id
self.source = source
self.options = options
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py
index f2a270192e..9a8a4c807c 100644
--- a/python/qpid/tests/messaging.py
+++ b/python/qpid/tests/messaging.py
@@ -541,6 +541,28 @@ class ReceiverTests(Base):
self.ssn.acknowledge()
+ def testDoubleClose(self):
+ m1 = self.content("testDoubleClose", 1)
+ m2 = self.content("testDoubleClose", 2)
+
+ snd = self.ssn.sender("""test-double-close; {
+ create: always,
+ delete: sender,
+ node-properties: {
+ type: topic
+ }
+}
+""")
+ r1 = self.ssn.receiver(snd.target)
+ r2 = self.ssn.receiver(snd.target)
+ snd.send(m1)
+ self.drain(r1, expected=[m1])
+ self.drain(r2, expected=[m1])
+ r1.close()
+ snd.send(m2)
+ self.drain(r2, expected=[m2])
+ r2.close()
+
# XXX: need testClose
class AddressTests(Base):