summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-06-16 21:21:09 +0000
committerAlan Conway <aconway@apache.org>2009-06-16 21:21:09 +0000
commit80d65b38008d9b7f31c825508819f9600d63b63c (patch)
tree316862bff35f1cae6f0d1152dcf4a6e3b0f967ed /cpp/src/qpid/cluster
parentf5e98a6dfb8c4defe22755340f440e6f16c2559a (diff)
downloadqpid-python-80d65b38008d9b7f31c825508819f9600d63b63c.tar.gz
Performance improvements in AggregateOutput and SemanticState.
Replaced AggregateOutput hierarchy with a flat list per connection holding only the OutputTasks that are potentially active. Tasks are droped from the list as soon as they return false, and added back when they may have output. Inlined frequently-used SequenceNumber functions. Replace std::list in QueueListeners with std::vector. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@785408 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp9
-rw-r--r--cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp16
-rw-r--r--cpp/src/qpid/cluster/Connection.h4
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp2
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp38
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h3
7 files changed, 55 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 37562ce46c..fe6958244f 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -755,13 +755,16 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
-void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
+void Cluster::errorCheck(const MemberId& m, uint8_t type, uint64_t frameSeq, Lock&) {
// If we receive an errorCheck here, it's because we have processed past the point
// of the error so respond with ERROR_TYPE_NONE
assert(map.getFrameSeq() >= frameSeq);
- if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE.
+ if (type != framing::cluster::ERROR_TYPE_NONE) { // Don't respond to NONE.
+ QPID_LOG(debug, "Error " << frameSeq << " on " << m << " did not occur locally");
mcast.mcastControl(
- ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+ ClusterErrorCheckBody(ProtocolVersion(),
+ framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+ }
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index b857c8a913..c6b5f8499c 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -113,7 +113,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
Decoder& getDecoder() { return decoder; }
ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
-
+
private:
typedef sys::Monitor::ScopedLock Lock;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index afecbd50e5..e7dac82159 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -245,10 +245,13 @@ broker::SemanticState& Connection::semanticState() {
return sessionState().getSemanticState();
}
-void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) {
+void Connection::consumerState(
+ const string& name, bool blocked, bool notifyEnabled, bool isInListener)
+{
broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
c.setBlocked(blocked);
if (notifyEnabled) c.enableNotify(); else c.disableNotify();
+ if (isInListener) c.getQueue()->getListeners().addListener(c.shared_from_this());
}
void Connection::sessionState(
@@ -270,6 +273,17 @@ void Connection::sessionState(
unknownCompleted,
receivedIncomplete);
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
+ // The output tasks will be added later in the update process.
+ connection.getOutputTasks().removeAll();
+}
+
+void Connection::outputTask(uint16_t channel, const std::string& name) {
+ broker::SessionState* session = connection.getChannel(channel).getSession();
+ if (!session)
+ throw Exception(QPID_MSG(cluster << " channel not attached " << *this
+ << "[" << channel << "] "));
+ OutputTask* task = &session->getSemanticState().find(name);
+ connection.getOutputTasks().addOutputTask(task);
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) {
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 8e3b0ad337..51aab92bfc 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -103,7 +103,7 @@ class Connection :
// Called for data delivered from the cluster.
void deliveredFrame(const EventFrame&);
- void consumerState(const std::string& name, bool blocked, bool notifyEnabled);
+ void consumerState(const std::string& name, bool blocked, bool notifyEnabled, bool isInListener);
// ==== Used in catch-up mode to build initial state.
//
@@ -115,6 +115,8 @@ class Connection :
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
+ void outputTask(uint16_t channel, const std::string& name);
+
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index ef99058471..3c3c330787 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -48,8 +48,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) {
LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
parent.getCluster().checkQuorum();
{
- // FIXME aconway 2009-04-28: locking around next-> may be redundant
- // with the fixes to read-credit in the IO layer. Review.
sys::Mutex::ScopedLock l(lock);
next->send(f);
}
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 332e74c512..7c305a2e92 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -54,6 +54,7 @@
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
+#include <boost/cast.hpp>
#include <algorithm>
namespace qpid {
@@ -64,6 +65,8 @@ using broker::Exchange;
using broker::Queue;
using broker::QueueBinding;
using broker::Message;
+using broker::SemanticState;
+
using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
@@ -125,7 +128,8 @@ void UpdateClient::update() {
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));
- // Update queue is used to transfer acquired messages that are no longer on their original queue.
+ // Update queue is used to transfer acquired messages that are no
+ // longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
@@ -256,6 +260,16 @@ void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& que
s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
}
+void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
+ const SemanticState::ConsumerImpl* cci =
+ boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
+ SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
+ uint16_t channel = ci->getParent().getSession().getChannel();
+ ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
+ QPID_LOG(debug, updaterId << " updating output task " << ci->getName()
+ << " channel=" << channel);
+}
+
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
shadowConnection = catchUpConnection();
@@ -266,6 +280,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
// Safe to use decoder here because we are stalled for update.
std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment();
+ bc.getOutputTasks().eachOutput(
+ boost::bind(&UpdateClient::updateOutputTask, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
updateConnection->getId().getNumber(),
@@ -294,9 +310,9 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, updaterId << " updating exclusive queues.");
ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1));
- // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
QPID_LOG(debug, updaterId << " updating consumers.");
- ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));
+ ss->getSemanticState().eachConsumer(
+ boost::bind(&UpdateClient::updateConsumer, this, _1));
QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
@@ -304,7 +320,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
updateTxState(ss->getSemanticState()); // Tx transaction state.
- // Adjust for command counter for message in progress, will be sent after state update.
+ // Adjust command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
@@ -328,8 +344,11 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
}
-void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) {
- QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId());
+void UpdateClient::updateConsumer(
+ const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
+{
+ QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on "
+ << shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
@@ -344,13 +363,12 @@ void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci)
shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
- ClusterConnectionConsumerStateBody state(
- ProtocolVersion(),
+ ClusterConnectionProxy(shadowSession).consumerState(
ci->getName(),
ci->isBlocked(),
- ci->isNotifyEnabled()
+ ci->isNotifyEnabled(),
+ ci->getQueue()->getListeners().contains(ci)
);
- client::SessionBase_0_10Access(shadowSession).get()->send(state);
QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId());
}
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 030566b52d..ba5bdd1d75 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -91,7 +91,8 @@ class UpdateClient : public sys::Runnable {
void updateConnection(const boost::intrusive_ptr<Connection>& connection);
void updateSession(broker::SessionHandler& s);
void updateTxState(broker::SemanticState& s);
- void updateConsumer(const broker::SemanticState::ConsumerImpl*);
+ void updateOutputTask(const sys::OutputTask* task);
+ void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
MemberId updaterId;
MemberId updateeId;