summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp11
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp2
4 files changed, 16 insertions, 1 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index ec9ec30880..f3b81c386e 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -197,7 +197,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 956001;
+const uint32_t Cluster::CLUSTER_VERSION = 964709;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index ee296d7f35..923d66ad34 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -304,10 +304,17 @@ size_t Connection::decode(const char* data, size_t size) {
const char* ptr = data;
const char* end = data + size;
if (catchUp) { // Handle catch-up locally.
+ bool wasOpen = connection->isOpen();
Buffer buf(const_cast<char*>(ptr), size);
ptr += size;
while (localDecoder.decode(buf))
received(localDecoder.getFrame());
+ if (!wasOpen && connection->isOpen()) {
+ // Connections marked as federation links are allowed to proxy
+ // messages with user-ID that doesn't match the connection's
+ // authenticated ID. This is important for updates.
+ connection->setFederationLink(isCatchUp());
+ }
}
else { // Multicast local connections.
assert(isLocalClient());
@@ -384,6 +391,10 @@ void Connection::shadowPrepare(const std::string& mgmtId) {
updateIn.nextShadowMgmtId = mgmtId;
}
+void Connection::shadowSetUser(const std::string& userId) {
+ connection->setUserId(userId);
+}
+
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
{
broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index aec18d73a4..24b8c8532f 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -114,6 +114,8 @@ class Connection :
// State update methods.
void shadowPrepare(const std::string&);
+ void shadowSetUser(const std::string&);
+
void sessionState(const framing::SequenceNumber& replayStart,
const framing::SequenceNumber& sendCommandPoint,
const framing::SequenceSet& sentIncomplete,
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index cb296ab8da..54c5fa0bcc 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -365,6 +365,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
connectionSettings.maxFrameSize = bc.getFrameMax();
shadowConnection.open(updateeUrl, connectionSettings);
+ ClusterConnectionProxy(shadowConnection).shadowSetUser(bc.getUserId());
+
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
// Safe to use decoder here because we are stalled for update.
std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment();