summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/cluster
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp29
-rw-r--r--cpp/src/qpid/cluster/Connection.h10
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.cpp14
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp54
-rw-r--r--cpp/src/qpid/cluster/Cpg.h10
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp18
-rw-r--r--cpp/src/qpid/cluster/types.h23
7 files changed, 93 insertions, 65 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 512e0f03cb..ff855eef18 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -58,6 +58,8 @@
namespace qpid {
namespace cluster {
+using std::string;
+
using namespace framing;
using namespace framing::cluster;
using amqp_0_10::ListCodec;
@@ -83,7 +85,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& mgmtId,
const ConnectionId& id, const qpid::sys::SecuritySettings& external)
: cluster(c), self(id), catchUp(false), announced(false), output(*this, out),
- connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true),
+ connectionCtor(&output, cluster.getBroker(), mgmtId, external,
+ false/*isLink*/, 0/*objectId*/, true/*shadow*/, false/*delayManagement*/,
+ false/*authenticated*/),
expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
updateIn(c.getUpdateReceiver()),
@@ -100,9 +104,10 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
external,
isLink,
isCatchUp ? ++catchUpId : 0,
- // The first catch-up connection is not considered a shadow
- // as it needs to be authenticated.
- isCatchUp && self.second > 1),
+ // The first catch-up connection is not a shadow
+ isCatchUp && self.second > 1,
+ false, // delayManagement
+ true), // catch up connecytions are authenticated
expectProtocolHeader(isLink),
mcastFrameHandler(cluster.getMulticast(), self),
updateIn(c.getUpdateReceiver()),
@@ -272,6 +277,8 @@ void Connection::closed() {
if (announced)
cluster.getMulticast().mcastControl(
ClusterConnectionDeliverCloseBody(), self);
+ else
+ close();
}
}
catch (const std::exception& e) {
@@ -404,11 +411,12 @@ void Connection::shadowSetUser(const std::string& userId) {
}
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position,
- uint32_t usedMsgCredit, uint32_t usedByteCredit)
+ uint32_t usedMsgCredit, uint32_t usedByteCredit, const uint32_t deliveryCount)
{
broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
c->setPosition(position);
c->setBlocked(blocked);
+ c->setDeliveryCount(deliveryCount);
if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit);
if (notifyEnabled) c->enableNotify(); else c->disableNotify();
updateIn.consumerNumbering.add(c);
@@ -522,6 +530,7 @@ broker::QueuedMessage Connection::getUpdateMessage() {
boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
assert(!updateq->isDurable());
broker::QueuedMessage m = updateq->get();
+ updateq->dequeue(0, m);
if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
return m;
}
@@ -782,16 +791,18 @@ void Connection::managementSetupState(
void Connection::config(const std::string& encoded) {
Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
string kind;
+ uint32_t p = buf.getPosition();
buf.getShortString (kind);
- if (kind == "link") {
+ buf.setPosition(p);
+ if (broker::Link::isEncodedLink(kind)) {
broker::Link::shared_ptr link =
- broker::Link::decode(cluster.getBroker().getLinks(), buf);
+ broker::Link::decode(cluster.getBroker().getLinks(), buf);
QPID_LOG(debug, cluster << " updated link "
<< link->getHost() << ":" << link->getPort());
}
- else if (kind == "bridge") {
+ else if (broker::Bridge::isEncodedBridge(kind)) {
broker::Bridge::shared_ptr bridge =
- broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
+ broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
QPID_LOG(debug, cluster << " updated bridge " << bridge->getName());
}
else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 26514c76e2..b0e7b3bd9e 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -110,7 +110,7 @@ class Connection :
void deliveredFrame(const EventFrame&);
void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position,
- uint32_t usedMsgCredit, uint32_t usedByteCredit);
+ uint32_t usedMsgCredit, uint32_t usedByteCredit, const uint32_t deliveryCount);
// ==== Used in catch-up mode to build initial state.
//
@@ -228,6 +228,7 @@ class Connection :
uint64_t objectId;
bool shadow;
bool delayManagement;
+ bool authenticated;
ConnectionCtor(
sys::ConnectionOutputHandler* out_,
@@ -237,17 +238,18 @@ class Connection :
bool isLink_=false,
uint64_t objectId_=0,
bool shadow_=false,
- bool delayManagement_=false
+ bool delayManagement_=false,
+ bool authenticated_=true
) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_),
isLink(isLink_), objectId(objectId_), shadow(shadow_),
- delayManagement(delayManagement_)
+ delayManagement(delayManagement_), authenticated(authenticated_)
{}
std::auto_ptr<broker::Connection> construct() {
return std::auto_ptr<broker::Connection>(
new broker::Connection(
out, broker, mgmtId, external, isLink, objectId,
- shadow, delayManagement)
+ shadow, delayManagement, authenticated)
);
}
};
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp
index d0ba8abfb3..54327fbfe2 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@
#include "qpid/cluster/Connection.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ProxyInputHandler.h"
+#include "qpid/broker/AclModule.h"
#include "qpid/broker/Connection.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
@@ -40,17 +41,10 @@ ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out,
const std::string& id,
const qpid::sys::SecuritySettings& external)
{
- broker::Broker& broker = cluster.getBroker();
- if (broker.getConnectionCounter().allowConnection())
- {
- QPID_LOG(error, "Client max connection count limit exceeded: "
- << broker.getOptions().maxConnections << " connection refused");
- return 0;
- }
if (v == ProtocolVersion(0, 10))
return new ConnectionCodec(v, out, id, cluster, false, false, external);
else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection
- return new ConnectionCodec(v, out, id, cluster, true, false, external);
+ return new ConnectionCodec(v, out, id, cluster, true, false, external);
return 0;
}
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 0856bcd824..6e9e22a42f 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -32,7 +32,7 @@
// This is a macro instead of a function because we don't want to
// evaluate the MSG argument unless there is an error.
#define CPG_CHECK(RESULT, MSG) \
- if ((RESULT) != CPG_OK) throw Exception(errorStr((RESULT), (MSG)))
+ if ((RESULT) != CS_OK) throw Exception(errorStr((RESULT), (MSG)))
namespace qpid {
namespace cluster {
@@ -50,13 +50,13 @@ Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) {
// Applies the same retry-logic to all cpg calls that need it.
void Cpg::callCpg ( CpgOp & c ) {
- cpg_error_t result;
+ cs_error_t result;
unsigned int snooze = 10;
for ( unsigned int nth_try = 0; nth_try < cpgRetries; ++ nth_try ) {
- if ( CPG_OK == (result = c.op(handle, & group))) {
+ if ( CS_OK == (result = c.op(handle, & group))) {
break;
}
- else if ( result == CPG_ERR_TRY_AGAIN ) {
+ else if ( result == CS_ERR_TRY_AGAIN ) {
QPID_LOG(info, "Retrying " << c.opName );
sys::usleep ( snooze );
snooze *= 10;
@@ -65,7 +65,7 @@ void Cpg::callCpg ( CpgOp & c ) {
else break; // Don't retry unless CPG tells us to.
}
- if ( result != CPG_OK )
+ if ( result != CS_OK )
CPG_CHECK(result, c.msg(group));
}
@@ -127,9 +127,9 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow
callbacks.cpg_confchg_fn = &globalConfigChange;
QPID_LOG(notice, "Initializing CPG");
- cpg_error_t err = cpg_initialize(&handle, &callbacks);
+ cs_error_t err = cpg_initialize(&handle, &callbacks);
int retries = 6; // FIXME aconway 2009-08-06: make this configurable.
- while (err == CPG_ERR_TRY_AGAIN && --retries) {
+ while (err == CS_ERR_TRY_AGAIN && --retries) {
QPID_LOG(notice, "Re-trying CPG initialization.");
sys::sleep(5);
err = cpg_initialize(&handle, &callbacks);
@@ -169,11 +169,11 @@ bool Cpg::mcast(const iovec* iov, int iovLen) {
if (flowState == CPG_FLOW_CONTROL_ENABLED)
return false;
- cpg_error_t result;
+ cs_error_t result;
do {
result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen);
- if (result != CPG_ERR_TRY_AGAIN) CPG_CHECK(result, cantMcastMsg(group));
- } while(result == CPG_ERR_TRY_AGAIN);
+ if (result != CS_ERR_TRY_AGAIN) CPG_CHECK(result, cantMcastMsg(group));
+ } while(result == CS_ERR_TRY_AGAIN);
return true;
}
@@ -187,34 +187,34 @@ void Cpg::shutdown() {
}
void Cpg::dispatchOne() {
- CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_ONE), "Error in CPG dispatch");
+ CPG_CHECK(cpg_dispatch(handle,CS_DISPATCH_ONE), "Error in CPG dispatch");
}
void Cpg::dispatchAll() {
- CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_ALL), "Error in CPG dispatch");
+ CPG_CHECK(cpg_dispatch(handle,CS_DISPATCH_ALL), "Error in CPG dispatch");
}
void Cpg::dispatchBlocking() {
- CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_BLOCKING), "Error in CPG dispatch");
+ CPG_CHECK(cpg_dispatch(handle,CS_DISPATCH_BLOCKING), "Error in CPG dispatch");
}
-string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
+string Cpg::errorStr(cs_error_t err, const std::string& msg) {
std::ostringstream os;
os << msg << ": ";
switch (err) {
- case CPG_OK: os << "ok"; break;
- case CPG_ERR_LIBRARY: os << "library"; break;
- case CPG_ERR_TIMEOUT: os << "timeout"; break;
- case CPG_ERR_TRY_AGAIN: os << "try again"; break;
- case CPG_ERR_INVALID_PARAM: os << "invalid param"; break;
- case CPG_ERR_NO_MEMORY: os << "no memory"; break;
- case CPG_ERR_BAD_HANDLE: os << "bad handle"; break;
- case CPG_ERR_ACCESS: os << "access denied. You may need to set your group ID to 'ais'"; break;
- case CPG_ERR_NOT_EXIST: os << "not exist"; break;
- case CPG_ERR_EXIST: os << "exist"; break;
- case CPG_ERR_NOT_SUPPORTED: os << "not supported"; break;
- case CPG_ERR_SECURITY: os << "security"; break;
- case CPG_ERR_TOO_MANY_GROUPS: os << "too many groups"; break;
+ case CS_OK: os << "ok"; break;
+ case CS_ERR_LIBRARY: os << "library"; break;
+ case CS_ERR_TIMEOUT: os << "timeout"; break;
+ case CS_ERR_TRY_AGAIN: os << "try again"; break;
+ case CS_ERR_INVALID_PARAM: os << "invalid param"; break;
+ case CS_ERR_NO_MEMORY: os << "no memory"; break;
+ case CS_ERR_BAD_HANDLE: os << "bad handle"; break;
+ case CS_ERR_ACCESS: os << "access denied. You may need to set your group ID to 'ais'"; break;
+ case CS_ERR_NOT_EXIST: os << "not exist"; break;
+ case CS_ERR_EXIST: os << "exist"; break;
+ case CS_ERR_NOT_SUPPORTED: os << "not supported"; break;
+ case CS_ERR_SECURITY: os << "security"; break;
+ case CS_ERR_TOO_MANY_GROUPS: os << "too many groups"; break;
default: os << ": unknown cpg error " << err;
};
os << " (" << err << ")";
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index 6b81c602bd..1afbce8d75 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -131,7 +131,7 @@ class Cpg : public sys::IOHandle {
CpgOp ( std::string opName )
: opName(opName) { }
- virtual cpg_error_t op ( cpg_handle_t handle, struct cpg_name * ) = 0;
+ virtual cs_error_t op ( cpg_handle_t handle, struct cpg_name * ) = 0;
virtual std::string msg(const Name&) = 0;
virtual ~CpgOp ( ) { }
};
@@ -141,7 +141,7 @@ class Cpg : public sys::IOHandle {
CpgJoinOp ( )
: CpgOp ( std::string("cpg_join") ) { }
- cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) {
+ cs_error_t op(cpg_handle_t handle, struct cpg_name * group) {
return cpg_join ( handle, group );
}
@@ -152,7 +152,7 @@ class Cpg : public sys::IOHandle {
CpgLeaveOp ( )
: CpgOp ( std::string("cpg_leave") ) { }
- cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) {
+ cs_error_t op(cpg_handle_t handle, struct cpg_name * group) {
return cpg_leave ( handle, group );
}
@@ -163,7 +163,7 @@ class Cpg : public sys::IOHandle {
CpgFinalizeOp ( )
: CpgOp ( std::string("cpg_finalize") ) { }
- cpg_error_t op(cpg_handle_t handle, struct cpg_name *) {
+ cs_error_t op(cpg_handle_t handle, struct cpg_name *) {
return cpg_finalize ( handle );
}
@@ -177,7 +177,7 @@ class Cpg : public sys::IOHandle {
CpgLeaveOp cpgLeaveOp;
CpgFinalizeOp cpgFinalizeOp;
- static std::string errorStr(cpg_error_t err, const std::string& msg);
+ static std::string errorStr(cs_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
static std::string cantLeaveMsg(const Name&);
static std::string cantMcastMsg(const Name&);
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 20684fd8a7..8737418570 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -74,6 +74,8 @@
namespace qpid {
namespace cluster {
+using std::string;
+
using amqp_0_10::ListCodec;
using broker::Broker;
using broker::Exchange;
@@ -87,6 +89,8 @@ using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
+namespace _qmf = qmf::org::apache::qpid::broker;
+
// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
const std::string UpdateClient::UPDATE("x-qpid.cluster-update");
// Name for header used to carry expiration information.
@@ -226,14 +230,6 @@ template <class T> std::string encode(const T& t) {
t.encode(buf);
return encoded;
}
-
-template <class T> std::string encode(const T& t, bool encodeKind) {
- std::string encoded;
- encoded.resize(t.encodedSize());
- framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
- t.encode(buf, encodeKind);
- return encoded;
-}
} // namespace
@@ -377,13 +373,14 @@ class MessageUpdater {
void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<Queue>& q) {
broker::Exchange::shared_ptr alternateExchange = q->getAlternateExchange();
+ _qmf::Queue* mgmtQueue = dynamic_cast<_qmf::Queue*>(q->GetManagementObject());
s.queueDeclare(
arg::queue = q->getName(),
arg::durable = q->isDurable(),
arg::autoDelete = q->isAutoDelete(),
arg::alternateExchange = alternateExchange ? alternateExchange->getName() : "",
arg::arguments = q->getSettings(),
- arg::exclusive = q->hasExclusiveOwner()
+ arg::exclusive = mgmtQueue && mgmtQueue->get_exclusive()
);
MessageUpdater updater(q->getName(), s, expiry);
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
@@ -545,7 +542,8 @@ void UpdateClient::updateConsumer(
ci->isNotifyEnabled(),
ci->getPosition(),
ci->getCredit().used().messages,
- ci->getCredit().used().bytes
+ ci->getCredit().used().bytes,
+ ci->getDeliveryCount()
);
consumerNumbering.add(ci.get());
diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h
index bfb4fd5b9e..c8ffb0b804 100644
--- a/cpp/src/qpid/cluster/types.h
+++ b/cpp/src/qpid/cluster/types.h
@@ -34,6 +34,29 @@
extern "C" {
#if defined (HAVE_OPENAIS_CPG_H)
# include <openais/cpg.h>
+
+// Provide translations back to the deprecated definitions in openais
+typedef cpg_error_t cs_error_t;
+#define CS_DISPATCH_ONE CPG_DISPATCH_ONE
+#define CS_DISPATCH_ALL CPG_DISPATCH_ALL
+#define CS_DISPATCH_BLOCKING CPG_DISPATCH_BLOCKING
+#define CS_FLOW_CONTROL_DISABLED CPG_FLOW_CONTROL_DISABLED
+#define CS_FLOW_CONTROL_ENABLED CPG_FLOW_CONTROL_ENABLED
+#define CS_OK CPG_OK
+#define CS_ERR_LIBRARY CPG_ERR_LIBRARY
+#define CS_ERR_TIMEOUT CPG_ERR_TIMEOUT
+#define CS_ERR_TRY_AGAIN CPG_ERR_TRY_AGAIN
+#define CS_ERR_INVALID_PARAM CPG_ERR_INVALID_PARAM
+#define CS_ERR_NO_MEMORY CPG_ERR_NO_MEMORY
+#define CS_ERR_BAD_HANDLE CPG_ERR_BAD_HANDLE
+#define CS_ERR_BUSY CPG_ERR_BUSY
+#define CS_ERR_ACCESS CPG_ERR_ACCESS
+#define CS_ERR_NOT_EXIST CPG_ERR_NOT_EXIST
+#define CS_ERR_EXIST CPG_ERR_EXIST
+#define CS_ERR_NOT_SUPPORTED CPG_ERR_NOT_SUPPORTED
+#define CS_ERR_SECURITY CPG_ERR_SECURITY
+#define CS_ERR_TOO_MANY_GROUPS CPG_ERR_TOO_MANY_GROUPS
+
#elif defined (HAVE_COROSYNC_CPG_H)
# include <corosync/cpg.h>
#else