diff options
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/failing-amqp1.0-python-tests | 1 | ||||
| -rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py | 29 |
7 files changed, 35 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index ecc48123cf..8cab18f37b 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -21,6 +21,7 @@ * under the License. * */ +#include "OwnershipToken.h" #include <map> #include <string> @@ -34,15 +35,12 @@ class Variant; namespace broker { -class OwnershipToken; - /** * Protocol independent connection abstraction. */ -class Connection { +class Connection : public OwnershipToken { public: virtual ~Connection() {} - virtual const OwnershipToken* getOwnership() const = 0; virtual const management::ObjectId getObjectId() const = 0; virtual const std::string& getUserId() const = 0; virtual const std::string& getMgmtId() const = 0; diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 21d9f8c875..049b583335 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -203,7 +203,7 @@ void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } const Connection* Message::getPublisher() const { return publisher; } void Message::setPublisher(const Connection& p) { publisher = &p; } -bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher && token->isLocal(publisher->getOwnership()); } +bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher && token->isLocal(publisher); } qpid::framing::SequenceNumber Message::getSequence() const diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp index 8f29833c8a..1ce0ed23c2 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp @@ -148,10 +148,6 @@ qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementO std::string ManagedConnection::getId() const { return id; } -const OwnershipToken* ManagedConnection::getOwnership() const -{ - return this; -} const management::ObjectId ManagedConnection::getObjectId() const { return GetManagementObject()->getObjectId(); diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h index 225aab9eed..77483b5630 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h @@ -23,7 +23,6 @@ */ #include "qpid/management/Manageable.h" #include "qpid/broker/Connection.h" -#include "qpid/broker/OwnershipToken.h" #include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/Connection.h" @@ -36,7 +35,7 @@ namespace broker { class Broker; namespace amqp { -class ManagedConnection : public qpid::management::Manageable, public OwnershipToken, public qpid::broker::Connection +class ManagedConnection : public qpid::management::Manageable, public qpid::broker::Connection { public: ManagedConnection(Broker& broker, const std::string id, bool brokerInitiated); @@ -56,7 +55,6 @@ class ManagedConnection : public qpid::management::Manageable, public OwnershipT void outgoingMessageSent(); //ConnectionIdentity - const OwnershipToken* getOwnership() const; const management::ObjectId getObjectId() const; const std::string& getUserId() const; const std::string& getMgmtId() const; diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h index 39a24adc1b..b53c2f3e7b 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h @@ -68,7 +68,7 @@ namespace amqp_0_10 { struct ConnectionTimeoutTask; class Connection : public sys::ConnectionInputHandler, public qpid::broker::Connection, - public OwnershipToken, public management::Manageable, + public management::Manageable, public RefCounted { public: @@ -81,7 +81,6 @@ class Connection : public sys::ConnectionInputHandler, public qpid::broker::Conn void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; } - const OwnershipToken* getOwnership() const { return this; }; const management::ObjectId getObjectId() const { return GetManagementObject()->getObjectId(); }; const std::string& getUserId() const { return userId; } diff --git a/qpid/cpp/src/tests/failing-amqp1.0-python-tests b/qpid/cpp/src/tests/failing-amqp1.0-python-tests index 19e27af562..c15aa3fe13 100644 --- a/qpid/cpp/src/tests/failing-amqp1.0-python-tests +++ b/qpid/cpp/src/tests/failing-amqp1.0-python-tests @@ -19,3 +19,4 @@ qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange_2_consumers qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange +qpid_tests.broker_0_10.new_api.GeneralTests.test_nolocal_rerouted diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py b/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py index 4140307266..282b23c1c7 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py @@ -149,6 +149,35 @@ class GeneralTests(Base): expected.remove(c) self.ssn.acknowledge() + def test_nolocal_rerouted(self): + conn2 = Connection.establish(self.broker, **self.connection_options()) + ssn2 = conn2.session() + + s1 = self.ssn.sender("holding_q; {create:always, delete:always, node:{x-declare:{alternate-exchange:'amq.fanout'}}}"); + s2 = ssn2.sender("holding_q"); + + s2.send(Message("a")); + s1.send(Message("b")); + s2.send(Message("c")); + + r = self.ssn.receiver("amq.fanout; {link:{x-declare:{arguments:{'no-local':True}}}}") + + # close connection of one of the publishers + conn2.close() + + # close sender which should cause the orphaned messages on + # holding_q to be rerouted through alternate exchange onto the + # subscription queue of the receiver above + s1.close() + + received = [] + try: + while True: + received.append(r.fetch(0).content) + except Empty: pass + self.assertEqual(received, ["a", "c"]) + + class SequenceNumberTests(Base): """ Tests of ring queue sequence number |
