diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
| commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
| tree | ce493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/cluster | |
| parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
| download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz | |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 29 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.cpp | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 54 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/types.h | 23 |
7 files changed, 93 insertions, 65 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 512e0f03cb..ff855eef18 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -58,6 +58,8 @@ namespace qpid { namespace cluster { +using std::string; + using namespace framing; using namespace framing::cluster; using amqp_0_10::ListCodec; @@ -83,7 +85,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, const qpid::sys::SecuritySettings& external) : cluster(c), self(id), catchUp(false), announced(false), output(*this, out), - connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true), + connectionCtor(&output, cluster.getBroker(), mgmtId, external, + false/*isLink*/, 0/*objectId*/, true/*shadow*/, false/*delayManagement*/, + false/*authenticated*/), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), updateIn(c.getUpdateReceiver()), @@ -100,9 +104,10 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, external, isLink, isCatchUp ? ++catchUpId : 0, - // The first catch-up connection is not considered a shadow - // as it needs to be authenticated. - isCatchUp && self.second > 1), + // The first catch-up connection is not a shadow + isCatchUp && self.second > 1, + false, // delayManagement + true), // catch up connecytions are authenticated expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), updateIn(c.getUpdateReceiver()), @@ -272,6 +277,8 @@ void Connection::closed() { if (announced) cluster.getMulticast().mcastControl( ClusterConnectionDeliverCloseBody(), self); + else + close(); } } catch (const std::exception& e) { @@ -404,11 +411,12 @@ void Connection::shadowSetUser(const std::string& userId) { } void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position, - uint32_t usedMsgCredit, uint32_t usedByteCredit) + uint32_t usedMsgCredit, uint32_t usedByteCredit, const uint32_t deliveryCount) { broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); c->setPosition(position); c->setBlocked(blocked); + c->setDeliveryCount(deliveryCount); if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit); if (notifyEnabled) c->enableNotify(); else c->disableNotify(); updateIn.consumerNumbering.add(c); @@ -522,6 +530,7 @@ broker::QueuedMessage Connection::getUpdateMessage() { boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE); assert(!updateq->isDurable()); broker::QueuedMessage m = updateq->get(); + updateq->dequeue(0, m); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } @@ -782,16 +791,18 @@ void Connection::managementSetupState( void Connection::config(const std::string& encoded) { Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); string kind; + uint32_t p = buf.getPosition(); buf.getShortString (kind); - if (kind == "link") { + buf.setPosition(p); + if (broker::Link::isEncodedLink(kind)) { broker::Link::shared_ptr link = - broker::Link::decode(cluster.getBroker().getLinks(), buf); + broker::Link::decode(cluster.getBroker().getLinks(), buf); QPID_LOG(debug, cluster << " updated link " << link->getHost() << ":" << link->getPort()); } - else if (kind == "bridge") { + else if (broker::Bridge::isEncodedBridge(kind)) { broker::Bridge::shared_ptr bridge = - broker::Bridge::decode(cluster.getBroker().getLinks(), buf); + broker::Bridge::decode(cluster.getBroker().getLinks(), buf); QPID_LOG(debug, cluster << " updated bridge " << bridge->getName()); } else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 26514c76e2..b0e7b3bd9e 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -110,7 +110,7 @@ class Connection : void deliveredFrame(const EventFrame&); void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position, - uint32_t usedMsgCredit, uint32_t usedByteCredit); + uint32_t usedMsgCredit, uint32_t usedByteCredit, const uint32_t deliveryCount); // ==== Used in catch-up mode to build initial state. // @@ -228,6 +228,7 @@ class Connection : uint64_t objectId; bool shadow; bool delayManagement; + bool authenticated; ConnectionCtor( sys::ConnectionOutputHandler* out_, @@ -237,17 +238,18 @@ class Connection : bool isLink_=false, uint64_t objectId_=0, bool shadow_=false, - bool delayManagement_=false + bool delayManagement_=false, + bool authenticated_=true ) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_), isLink(isLink_), objectId(objectId_), shadow(shadow_), - delayManagement(delayManagement_) + delayManagement(delayManagement_), authenticated(authenticated_) {} std::auto_ptr<broker::Connection> construct() { return std::auto_ptr<broker::Connection>( new broker::Connection( out, broker, mgmtId, external, isLink, objectId, - shadow, delayManagement) + shadow, delayManagement, authenticated) ); } }; diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index d0ba8abfb3..54327fbfe2 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,7 @@ #include "qpid/cluster/Connection.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ProxyInputHandler.h" +#include "qpid/broker/AclModule.h" #include "qpid/broker/Connection.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" @@ -40,17 +41,10 @@ ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, const qpid::sys::SecuritySettings& external) { - broker::Broker& broker = cluster.getBroker(); - if (broker.getConnectionCounter().allowConnection()) - { - QPID_LOG(error, "Client max connection count limit exceeded: " - << broker.getOptions().maxConnections << " connection refused"); - return 0; - } if (v == ProtocolVersion(0, 10)) return new ConnectionCodec(v, out, id, cluster, false, false, external); else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection - return new ConnectionCodec(v, out, id, cluster, true, false, external); + return new ConnectionCodec(v, out, id, cluster, true, false, external); return 0; } diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 0856bcd824..6e9e22a42f 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -32,7 +32,7 @@ // This is a macro instead of a function because we don't want to // evaluate the MSG argument unless there is an error. #define CPG_CHECK(RESULT, MSG) \ - if ((RESULT) != CPG_OK) throw Exception(errorStr((RESULT), (MSG))) + if ((RESULT) != CS_OK) throw Exception(errorStr((RESULT), (MSG))) namespace qpid { namespace cluster { @@ -50,13 +50,13 @@ Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) { // Applies the same retry-logic to all cpg calls that need it. void Cpg::callCpg ( CpgOp & c ) { - cpg_error_t result; + cs_error_t result; unsigned int snooze = 10; for ( unsigned int nth_try = 0; nth_try < cpgRetries; ++ nth_try ) { - if ( CPG_OK == (result = c.op(handle, & group))) { + if ( CS_OK == (result = c.op(handle, & group))) { break; } - else if ( result == CPG_ERR_TRY_AGAIN ) { + else if ( result == CS_ERR_TRY_AGAIN ) { QPID_LOG(info, "Retrying " << c.opName ); sys::usleep ( snooze ); snooze *= 10; @@ -65,7 +65,7 @@ void Cpg::callCpg ( CpgOp & c ) { else break; // Don't retry unless CPG tells us to. } - if ( result != CPG_OK ) + if ( result != CS_OK ) CPG_CHECK(result, c.msg(group)); } @@ -127,9 +127,9 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow callbacks.cpg_confchg_fn = &globalConfigChange; QPID_LOG(notice, "Initializing CPG"); - cpg_error_t err = cpg_initialize(&handle, &callbacks); + cs_error_t err = cpg_initialize(&handle, &callbacks); int retries = 6; // FIXME aconway 2009-08-06: make this configurable. - while (err == CPG_ERR_TRY_AGAIN && --retries) { + while (err == CS_ERR_TRY_AGAIN && --retries) { QPID_LOG(notice, "Re-trying CPG initialization."); sys::sleep(5); err = cpg_initialize(&handle, &callbacks); @@ -169,11 +169,11 @@ bool Cpg::mcast(const iovec* iov, int iovLen) { if (flowState == CPG_FLOW_CONTROL_ENABLED) return false; - cpg_error_t result; + cs_error_t result; do { result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen); - if (result != CPG_ERR_TRY_AGAIN) CPG_CHECK(result, cantMcastMsg(group)); - } while(result == CPG_ERR_TRY_AGAIN); + if (result != CS_ERR_TRY_AGAIN) CPG_CHECK(result, cantMcastMsg(group)); + } while(result == CS_ERR_TRY_AGAIN); return true; } @@ -187,34 +187,34 @@ void Cpg::shutdown() { } void Cpg::dispatchOne() { - CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_ONE), "Error in CPG dispatch"); + CPG_CHECK(cpg_dispatch(handle,CS_DISPATCH_ONE), "Error in CPG dispatch"); } void Cpg::dispatchAll() { - CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_ALL), "Error in CPG dispatch"); + CPG_CHECK(cpg_dispatch(handle,CS_DISPATCH_ALL), "Error in CPG dispatch"); } void Cpg::dispatchBlocking() { - CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_BLOCKING), "Error in CPG dispatch"); + CPG_CHECK(cpg_dispatch(handle,CS_DISPATCH_BLOCKING), "Error in CPG dispatch"); } -string Cpg::errorStr(cpg_error_t err, const std::string& msg) { +string Cpg::errorStr(cs_error_t err, const std::string& msg) { std::ostringstream os; os << msg << ": "; switch (err) { - case CPG_OK: os << "ok"; break; - case CPG_ERR_LIBRARY: os << "library"; break; - case CPG_ERR_TIMEOUT: os << "timeout"; break; - case CPG_ERR_TRY_AGAIN: os << "try again"; break; - case CPG_ERR_INVALID_PARAM: os << "invalid param"; break; - case CPG_ERR_NO_MEMORY: os << "no memory"; break; - case CPG_ERR_BAD_HANDLE: os << "bad handle"; break; - case CPG_ERR_ACCESS: os << "access denied. You may need to set your group ID to 'ais'"; break; - case CPG_ERR_NOT_EXIST: os << "not exist"; break; - case CPG_ERR_EXIST: os << "exist"; break; - case CPG_ERR_NOT_SUPPORTED: os << "not supported"; break; - case CPG_ERR_SECURITY: os << "security"; break; - case CPG_ERR_TOO_MANY_GROUPS: os << "too many groups"; break; + case CS_OK: os << "ok"; break; + case CS_ERR_LIBRARY: os << "library"; break; + case CS_ERR_TIMEOUT: os << "timeout"; break; + case CS_ERR_TRY_AGAIN: os << "try again"; break; + case CS_ERR_INVALID_PARAM: os << "invalid param"; break; + case CS_ERR_NO_MEMORY: os << "no memory"; break; + case CS_ERR_BAD_HANDLE: os << "bad handle"; break; + case CS_ERR_ACCESS: os << "access denied. You may need to set your group ID to 'ais'"; break; + case CS_ERR_NOT_EXIST: os << "not exist"; break; + case CS_ERR_EXIST: os << "exist"; break; + case CS_ERR_NOT_SUPPORTED: os << "not supported"; break; + case CS_ERR_SECURITY: os << "security"; break; + case CS_ERR_TOO_MANY_GROUPS: os << "too many groups"; break; default: os << ": unknown cpg error " << err; }; os << " (" << err << ")"; diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index 6b81c602bd..1afbce8d75 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -131,7 +131,7 @@ class Cpg : public sys::IOHandle { CpgOp ( std::string opName ) : opName(opName) { } - virtual cpg_error_t op ( cpg_handle_t handle, struct cpg_name * ) = 0; + virtual cs_error_t op ( cpg_handle_t handle, struct cpg_name * ) = 0; virtual std::string msg(const Name&) = 0; virtual ~CpgOp ( ) { } }; @@ -141,7 +141,7 @@ class Cpg : public sys::IOHandle { CpgJoinOp ( ) : CpgOp ( std::string("cpg_join") ) { } - cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) { + cs_error_t op(cpg_handle_t handle, struct cpg_name * group) { return cpg_join ( handle, group ); } @@ -152,7 +152,7 @@ class Cpg : public sys::IOHandle { CpgLeaveOp ( ) : CpgOp ( std::string("cpg_leave") ) { } - cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) { + cs_error_t op(cpg_handle_t handle, struct cpg_name * group) { return cpg_leave ( handle, group ); } @@ -163,7 +163,7 @@ class Cpg : public sys::IOHandle { CpgFinalizeOp ( ) : CpgOp ( std::string("cpg_finalize") ) { } - cpg_error_t op(cpg_handle_t handle, struct cpg_name *) { + cs_error_t op(cpg_handle_t handle, struct cpg_name *) { return cpg_finalize ( handle ); } @@ -177,7 +177,7 @@ class Cpg : public sys::IOHandle { CpgLeaveOp cpgLeaveOp; CpgFinalizeOp cpgFinalizeOp; - static std::string errorStr(cpg_error_t err, const std::string& msg); + static std::string errorStr(cs_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); static std::string cantLeaveMsg(const Name&); static std::string cantMcastMsg(const Name&); diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 20684fd8a7..8737418570 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -74,6 +74,8 @@ namespace qpid { namespace cluster { +using std::string; + using amqp_0_10::ListCodec; using broker::Broker; using broker::Exchange; @@ -87,6 +89,8 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; +namespace _qmf = qmf::org::apache::qpid::broker; + // Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. const std::string UpdateClient::UPDATE("x-qpid.cluster-update"); // Name for header used to carry expiration information. @@ -226,14 +230,6 @@ template <class T> std::string encode(const T& t) { t.encode(buf); return encoded; } - -template <class T> std::string encode(const T& t, bool encodeKind) { - std::string encoded; - encoded.resize(t.encodedSize()); - framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); - t.encode(buf, encodeKind); - return encoded; -} } // namespace @@ -377,13 +373,14 @@ class MessageUpdater { void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<Queue>& q) { broker::Exchange::shared_ptr alternateExchange = q->getAlternateExchange(); + _qmf::Queue* mgmtQueue = dynamic_cast<_qmf::Queue*>(q->GetManagementObject()); s.queueDeclare( arg::queue = q->getName(), arg::durable = q->isDurable(), arg::autoDelete = q->isAutoDelete(), arg::alternateExchange = alternateExchange ? alternateExchange->getName() : "", arg::arguments = q->getSettings(), - arg::exclusive = q->hasExclusiveOwner() + arg::exclusive = mgmtQueue && mgmtQueue->get_exclusive() ); MessageUpdater updater(q->getName(), s, expiry); q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1)); @@ -545,7 +542,8 @@ void UpdateClient::updateConsumer( ci->isNotifyEnabled(), ci->getPosition(), ci->getCredit().used().messages, - ci->getCredit().used().bytes + ci->getCredit().used().bytes, + ci->getDeliveryCount() ); consumerNumbering.add(ci.get()); diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index bfb4fd5b9e..c8ffb0b804 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -34,6 +34,29 @@ extern "C" { #if defined (HAVE_OPENAIS_CPG_H) # include <openais/cpg.h> + +// Provide translations back to the deprecated definitions in openais +typedef cpg_error_t cs_error_t; +#define CS_DISPATCH_ONE CPG_DISPATCH_ONE +#define CS_DISPATCH_ALL CPG_DISPATCH_ALL +#define CS_DISPATCH_BLOCKING CPG_DISPATCH_BLOCKING +#define CS_FLOW_CONTROL_DISABLED CPG_FLOW_CONTROL_DISABLED +#define CS_FLOW_CONTROL_ENABLED CPG_FLOW_CONTROL_ENABLED +#define CS_OK CPG_OK +#define CS_ERR_LIBRARY CPG_ERR_LIBRARY +#define CS_ERR_TIMEOUT CPG_ERR_TIMEOUT +#define CS_ERR_TRY_AGAIN CPG_ERR_TRY_AGAIN +#define CS_ERR_INVALID_PARAM CPG_ERR_INVALID_PARAM +#define CS_ERR_NO_MEMORY CPG_ERR_NO_MEMORY +#define CS_ERR_BAD_HANDLE CPG_ERR_BAD_HANDLE +#define CS_ERR_BUSY CPG_ERR_BUSY +#define CS_ERR_ACCESS CPG_ERR_ACCESS +#define CS_ERR_NOT_EXIST CPG_ERR_NOT_EXIST +#define CS_ERR_EXIST CPG_ERR_EXIST +#define CS_ERR_NOT_SUPPORTED CPG_ERR_NOT_SUPPORTED +#define CS_ERR_SECURITY CPG_ERR_SECURITY +#define CS_ERR_TOO_MANY_GROUPS CPG_ERR_TOO_MANY_GROUPS + #elif defined (HAVE_COROSYNC_CPG_H) # include <corosync/cpg.h> #else |
