diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-01-19 21:29:33 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-01-19 21:29:33 +0000 |
| commit | 89e6f4eea1d172046c806ca3c01244bb0b9cfeee (patch) | |
| tree | dac0125945c56e9a8a62406c6d180cf7735014a8 /python | |
| parent | 07b7a61c7876b92b6ea0d6355d97cae4437df5c2 (diff) | |
| download | qpid-python-89e6f4eea1d172046c806ca3c01244bb0b9cfeee.tar.gz | |
fixed bug in destination/receiver correlation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@900967 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rw-r--r-- | python/qpid/driver.py | 32 | ||||
| -rw-r--r-- | python/qpid/messaging.py | 17 | ||||
| -rw-r--r-- | python/qpid/tests/messaging.py | 22 |
3 files changed, 51 insertions, 20 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 345869cc48..858e2922ea 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -101,6 +101,8 @@ class SessionState: # XXX: need to periodically exchange completion/known_completion + self.destinations = {} + def write_query(self, query, handler): id = self.sent self.write_cmd(query, lambda: handler(self.results.pop(id))) @@ -500,6 +502,8 @@ class Driver: _rcv = self._attachments.get(rcv) if _rcv is None and not rcv.closing and not rcv.closed: _rcv = Attachment(rcv) + _rcv.destination = str(rcv.id) + sst.destinations[_rcv.destination] = _rcv _rcv.canceled = False _rcv.draining = False @@ -525,7 +529,7 @@ class Driver: def do_link(type, subtype): if type == "topic": - _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) + _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) filter = _rcv.options.get("filter") if _rcv.subject is None and filter is None: @@ -543,8 +547,8 @@ class Driver: elif type == "queue": _rcv._queue = _rcv.name - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) - sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination)) + sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit)) rcv.linked = True self.resolve_declare(sst, _rcv, "receiver", do_link) @@ -554,12 +558,13 @@ class Driver: if not _rcv.canceled: def do_unlink(): del self._attachments[rcv] + del sst.destinations[_rcv.destination] rcv.closed = True if _rcv.options.get("delete") in ("always", "receiver"): - sst.write_cmd(MessageCancel(rcv.destination)) + sst.write_cmd(MessageCancel(_rcv.destination)) self.delete(sst, _rcv.name, do_unlink) else: - sst.write_cmd(MessageCancel(rcv.destination), do_unlink) + sst.write_cmd(MessageCancel(_rcv.destination), do_unlink) _rcv.canceled = True def resolve_declare(self, sst, lnk, dir, action): @@ -725,7 +730,8 @@ class Driver: sst.aborting = False for rcv in ssn.receivers: - sst.write_cmd(MessageStop(rcv.destination)) + _rcv = self._attachments[rcv] + sst.write_cmd(MessageStop(_rcv.destination)) sst.write_cmd(ExecutionSync(), do_rb) def grant(self, rcv): @@ -745,12 +751,12 @@ class Driver: delta = max(rcv.granted, rcv.received) - rcv.impending if delta is UNLIMITED: - sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value)) - sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, UNLIMITED.value)) + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) rcv.impending = UNLIMITED elif delta > 0: - sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value)) - sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta)) + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) rcv.impending += delta elif delta < 0 and not rcv.draining: _rcv.draining = True @@ -758,7 +764,7 @@ class Driver: rcv.impending = rcv.received _rcv.draining = False self.grant(rcv) - sst.write_cmd(MessageStop(rcv.destination), do_stop) + sst.write_cmd(MessageStop(_rcv.destination), do_stop) if rcv.draining: _rcv.draining = True @@ -767,7 +773,7 @@ class Driver: rcv.granted = rcv.impending _rcv.draining = False rcv.draining = False - sst.write_cmd(MessageFlush(rcv.destination), do_flush) + sst.write_cmd(MessageFlush(_rcv.destination), do_flush) def process_receiver(self, rcv): @@ -821,7 +827,7 @@ class Driver: ssn = sst.session msg = self._decode(xfr) - rcv = ssn.receivers[int(xfr.destination)] + rcv = sst.destinations[xfr.destination].target msg._receiver = rcv if rcv.impending is not UNLIMITED: assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 817f919000..9ec38ad45c 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -432,7 +432,9 @@ class Session: self.aborting = False self.aborted = False + self.next_sender_id = 0 self.senders = [] + self.next_receiver_id = 0 self.receivers = [] self.outgoing = [] self.incoming = [] @@ -477,7 +479,8 @@ class Session: @rtype: Sender @return: a new Sender for the specified target """ - sender = Sender(self, len(self.senders), target, options) + sender = Sender(self, self.next_sender_id, target, options) + self.next_sender_id += 1 self.senders.append(sender) self._wakeup() # XXX: because of the lack of waiting here we can end up getting @@ -497,7 +500,8 @@ class Session: @rtype: Receiver @return: a new Receiver for the specified source """ - receiver = Receiver(self, len(self.receivers), source, options) + receiver = Receiver(self, self.next_receiver_id, source, options) + self.next_receiver_id += 1 self.receivers.append(receiver) self._wakeup() return receiver @@ -630,9 +634,9 @@ class Sender: Sends outgoing messages. """ - def __init__(self, session, index, target, options): + def __init__(self, session, id, target, options): self.session = session - self.index = index + self.id = id self.target = target self.options = options self.capacity = options.get("capacity", UNLIMITED) @@ -753,10 +757,9 @@ class Receiver(object): fetched with L{fetch}. """ - def __init__(self, session, index, source, options): + def __init__(self, session, id, source, options): self.session = session - self.index = index - self.destination = str(self.index) + self.id = id self.source = source self.options = options diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index f2a270192e..9a8a4c807c 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -541,6 +541,28 @@ class ReceiverTests(Base): self.ssn.acknowledge() + def testDoubleClose(self): + m1 = self.content("testDoubleClose", 1) + m2 = self.content("testDoubleClose", 2) + + snd = self.ssn.sender("""test-double-close; { + create: always, + delete: sender, + node-properties: { + type: topic + } +} +""") + r1 = self.ssn.receiver(snd.target) + r2 = self.ssn.receiver(snd.target) + snd.send(m1) + self.drain(r1, expected=[m1]) + self.drain(r2, expected=[m1]) + r1.close() + snd.send(m2) + self.drain(r2, expected=[m2]) + r2.close() + # XXX: need testClose class AddressTests(Base): |
