From 2263213d7dfa3aaba38360144f7b098fd0a96bee Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 16 Jul 2009 16:28:14 +0000 Subject: Update queue listeners in the correct order. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@794736 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/UpdateClient.cpp | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp') diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 1e9af4a589..143db20ac0 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -128,16 +128,17 @@ void UpdateClient::update() { Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1)); + // Update queue is used to transfer acquired messages that are no // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); - std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); - session.queueDelete(arg::queue=UPDATE); session.close(); + // Update queue listeners: must come after sessions so consumerNumbering is populated. + b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); ClusterConnectionProxy(session).expiryId(expiry.getId()); ClusterConnectionMembershipBody membership; @@ -295,11 +296,12 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr& upda } void UpdateClient::updateSession(broker::SessionHandler& sh) { - QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " - << sh.getSession()->getId()); broker::SessionState* ss = sh.getSession(); if (!ss) return; // no session. + QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() + << "[" << sh.getChannel() << "] = " << ss->getId()); + // Create a client session to update session state. boost::shared_ptr cimpl = client::ConnectionAccess::getImpl(shadowConnection); boost::shared_ptr simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); @@ -350,6 +352,7 @@ void UpdateClient::updateConsumer( { QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); + using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), @@ -367,10 +370,12 @@ void UpdateClient::updateConsumer( ClusterConnectionProxy(shadowSession).consumerState( ci->getName(), ci->isBlocked(), - ci->isNotifyEnabled(), - ci->getQueue()->getListeners().contains(ci) + ci->isNotifyEnabled() ); - QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); + consumerNumbering.add(ci); + + QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() + << " on " << shadowSession.getId()); } void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { @@ -448,4 +453,20 @@ void UpdateClient::updateTxState(broker::SemanticState& s) { } } +void UpdateClient::updateQueueListeners(const boost::shared_ptr& queue) { + queue->getListeners().eachListener( + boost::bind(&UpdateClient::updateQueueListener, this, queue->getName(), _1)); +} + +void UpdateClient::updateQueueListener(std::string& q, + const boost::shared_ptr& c) +{ + const boost::shared_ptr ci = + boost::dynamic_pointer_cast(c); + size_t n = consumerNumbering[ci]; + if (n >= consumerNumbering.size()) + throw Exception(QPID_MSG("Unexpected listener on queue " << q)); + ClusterConnectionProxy(session).addQueueListener(q, n); +} + }} // namespace qpid::cluster -- cgit v1.2.1