summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp11
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp2
-rwxr-xr-xcpp/src/tests/cluster_tests.py13
5 files changed, 29 insertions, 1 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index ec9ec30880..f3b81c386e 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -197,7 +197,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 956001;
+const uint32_t Cluster::CLUSTER_VERSION = 964709;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index ee296d7f35..923d66ad34 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -304,10 +304,17 @@ size_t Connection::decode(const char* data, size_t size) {
const char* ptr = data;
const char* end = data + size;
if (catchUp) { // Handle catch-up locally.
+ bool wasOpen = connection->isOpen();
Buffer buf(const_cast<char*>(ptr), size);
ptr += size;
while (localDecoder.decode(buf))
received(localDecoder.getFrame());
+ if (!wasOpen && connection->isOpen()) {
+ // Connections marked as federation links are allowed to proxy
+ // messages with user-ID that doesn't match the connection's
+ // authenticated ID. This is important for updates.
+ connection->setFederationLink(isCatchUp());
+ }
}
else { // Multicast local connections.
assert(isLocalClient());
@@ -384,6 +391,10 @@ void Connection::shadowPrepare(const std::string& mgmtId) {
updateIn.nextShadowMgmtId = mgmtId;
}
+void Connection::shadowSetUser(const std::string& userId) {
+ connection->setUserId(userId);
+}
+
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
{
broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index aec18d73a4..24b8c8532f 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -114,6 +114,8 @@ class Connection :
// State update methods.
void shadowPrepare(const std::string&);
+ void shadowSetUser(const std::string&);
+
void sessionState(const framing::SequenceNumber& replayStart,
const framing::SequenceNumber& sendCommandPoint,
const framing::SequenceSet& sentIncomplete,
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index cb296ab8da..54c5fa0bcc 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -365,6 +365,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
connectionSettings.maxFrameSize = bc.getFrameMax();
shadowConnection.open(updateeUrl, connectionSettings);
+ ClusterConnectionProxy(shadowConnection).shadowSetUser(bc.getUserId());
+
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
// Safe to use decoder here because we are stalled for update.
std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment();
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 46bef2b3c2..06a8dcee50 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -157,6 +157,19 @@ acl allow all all
self.fail("Expected exception")
except messaging.exceptions.NotFound: pass
+ def test_user_id_update(self):
+ """Ensure that user-id of an open session is updated to new cluster members"""
+ sasl_config=os.path.join(self.rootdir, "sasl_config")
+ cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config,])
+ c = cluster[0].connect(username="zig", password="zig")
+ s = c.session().sender("q;{create:always}")
+ s.send(Message("x", user_id="zig")) # Message sent before start new broker
+ cluster.start()
+ s.send(Message("y", user_id="zig")) # Messsage sent after start of new broker
+ # Verify brokers are healthy and messages are on the queue.
+ self.assertEqual("x", cluster[0].get_message("q").content)
+ self.assertEqual("y", cluster[1].get_message("q").content)
+
def test_link_events(self):
"""Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543"""
args = ["--mgmt-pub-interval", 1] # Publish management information every second.