summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h6
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h3
-rw-r--r--qpid/cpp/src/tests/failing-amqp1.0-python-tests1
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py29
7 files changed, 35 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index ecc48123cf..8cab18f37b 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -21,6 +21,7 @@
* under the License.
*
*/
+#include "OwnershipToken.h"
#include <map>
#include <string>
@@ -34,15 +35,12 @@ class Variant;
namespace broker {
-class OwnershipToken;
-
/**
* Protocol independent connection abstraction.
*/
-class Connection {
+class Connection : public OwnershipToken {
public:
virtual ~Connection() {}
- virtual const OwnershipToken* getOwnership() const = 0;
virtual const management::ObjectId getObjectId() const = 0;
virtual const std::string& getUserId() const = 0;
virtual const std::string& getMgmtId() const = 0;
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 21d9f8c875..049b583335 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -203,7 +203,7 @@ void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
const Connection* Message::getPublisher() const { return publisher; }
void Message::setPublisher(const Connection& p) { publisher = &p; }
-bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher && token->isLocal(publisher->getOwnership()); }
+bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher && token->isLocal(publisher); }
qpid::framing::SequenceNumber Message::getSequence() const
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
index 8f29833c8a..1ce0ed23c2 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
@@ -148,10 +148,6 @@ qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementO
std::string ManagedConnection::getId() const { return id; }
-const OwnershipToken* ManagedConnection::getOwnership() const
-{
- return this;
-}
const management::ObjectId ManagedConnection::getObjectId() const
{
return GetManagementObject()->getObjectId();
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
index 225aab9eed..77483b5630 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
@@ -23,7 +23,6 @@
*/
#include "qpid/management/Manageable.h"
#include "qpid/broker/Connection.h"
-#include "qpid/broker/OwnershipToken.h"
#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/Connection.h"
@@ -36,7 +35,7 @@ namespace broker {
class Broker;
namespace amqp {
-class ManagedConnection : public qpid::management::Manageable, public OwnershipToken, public qpid::broker::Connection
+class ManagedConnection : public qpid::management::Manageable, public qpid::broker::Connection
{
public:
ManagedConnection(Broker& broker, const std::string id, bool brokerInitiated);
@@ -56,7 +55,6 @@ class ManagedConnection : public qpid::management::Manageable, public OwnershipT
void outgoingMessageSent();
//ConnectionIdentity
- const OwnershipToken* getOwnership() const;
const management::ObjectId getObjectId() const;
const std::string& getUserId() const;
const std::string& getMgmtId() const;
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
index 39a24adc1b..b53c2f3e7b 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
@@ -68,7 +68,7 @@ namespace amqp_0_10 {
struct ConnectionTimeoutTask;
class Connection : public sys::ConnectionInputHandler, public qpid::broker::Connection,
- public OwnershipToken, public management::Manageable,
+ public management::Manageable,
public RefCounted
{
public:
@@ -81,7 +81,6 @@ class Connection : public sys::ConnectionInputHandler, public qpid::broker::Conn
void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
- const OwnershipToken* getOwnership() const { return this; };
const management::ObjectId getObjectId() const { return GetManagementObject()->getObjectId(); };
const std::string& getUserId() const { return userId; }
diff --git a/qpid/cpp/src/tests/failing-amqp1.0-python-tests b/qpid/cpp/src/tests/failing-amqp1.0-python-tests
index 19e27af562..c15aa3fe13 100644
--- a/qpid/cpp/src/tests/failing-amqp1.0-python-tests
+++ b/qpid/cpp/src/tests/failing-amqp1.0-python-tests
@@ -19,3 +19,4 @@
qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange_2_consumers
qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange
+qpid_tests.broker_0_10.new_api.GeneralTests.test_nolocal_rerouted
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py b/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
index 4140307266..282b23c1c7 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
@@ -149,6 +149,35 @@ class GeneralTests(Base):
expected.remove(c)
self.ssn.acknowledge()
+ def test_nolocal_rerouted(self):
+ conn2 = Connection.establish(self.broker, **self.connection_options())
+ ssn2 = conn2.session()
+
+ s1 = self.ssn.sender("holding_q; {create:always, delete:always, node:{x-declare:{alternate-exchange:'amq.fanout'}}}");
+ s2 = ssn2.sender("holding_q");
+
+ s2.send(Message("a"));
+ s1.send(Message("b"));
+ s2.send(Message("c"));
+
+ r = self.ssn.receiver("amq.fanout; {link:{x-declare:{arguments:{'no-local':True}}}}")
+
+ # close connection of one of the publishers
+ conn2.close()
+
+ # close sender which should cause the orphaned messages on
+ # holding_q to be rerouted through alternate exchange onto the
+ # subscription queue of the receiver above
+ s1.close()
+
+ received = []
+ try:
+ while True:
+ received.append(r.fetch(0).content)
+ except Empty: pass
+ self.assertEqual(received, ["a", "c"])
+
+
class SequenceNumberTests(Base):
"""
Tests of ring queue sequence number