diff options
| author | Alan Conway <aconway@apache.org> | 2012-12-19 21:23:41 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-12-19 21:23:41 +0000 |
| commit | f7943607a216a66836a7b401860d635a332f42e3 (patch) | |
| tree | c960a01a71f036f4a232c1e34d5eddd3d3204302 /qpid/cpp/src | |
| parent | 96c6a3af1095cfd55bc2a8efa50ded661c49edde (diff) | |
| download | qpid-python-f7943607a216a66836a7b401860d635a332f42e3.tar.gz | |
QPID-4514: Remove obsolete cluster code: get rid of StatefulQueueObserver.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1424132 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/Makefile.am | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 97 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.h | 16 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/StatefulQueueObserver.h | 63 |
6 files changed, 12 insertions, 172 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index cdddd22c41..ad2a438c3b 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -727,7 +727,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SessionState.h \ qpid/broker/SignalHandler.cpp \ qpid/broker/SignalHandler.h \ - qpid/broker/StatefulQueueObserver.h \ qpid/broker/System.cpp \ qpid/broker/System.h \ qpid/broker/ThresholdAlerts.cpp \ diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index 0c70b6d448..c083e4ee0f 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -311,100 +311,3 @@ namespace { const std::string GROUP_STATE("group-state"); } - -/** Runs on UPDATER to snapshot current state */ -void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const -{ - using namespace qpid::framing; - state.clear(); - framing::Array groupState(TYPE_CODE_MAP); - for (GroupMap::const_iterator g = messageGroups.begin(); - g != messageGroups.end(); ++g) { - - framing::FieldTable group; - group.setString(GROUP_NAME, g->first); - group.setString(GROUP_OWNER, g->second.owner); - group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); - framing::Array positions(TYPE_CODE_UINT32); - framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); - for (GroupState::MessageFifo::const_iterator p = g->second.members.begin(); - p != g->second.members.end(); ++p) { - positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position ))); - acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired ))); - } - group.setArray(GROUP_POSITIONS, positions); - group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); - groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); - } - state.setArray(GROUP_STATE, groupState); - - QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader); -} - - -/** called on UPDATEE to set state from snapshot */ -void MessageGroupManager::setState(const qpid::framing::FieldTable& state) -{ - using namespace qpid::framing; - messageGroups.clear(); - freeGroups.clear(); - cachedGroup = 0; - - framing::Array groupState(TYPE_CODE_MAP); - - bool ok = state.getArray(GROUP_STATE, groupState); - if (!ok) { - QPID_LOG(error, "Unable to find message group state information for queue \"" << - qName << "\""); - return; - } - - for (framing::Array::const_iterator g = groupState.begin(); - g != groupState.end(); ++g) { - framing::FieldTable group; - ok = framing::getEncodedValue<FieldTable>(*g, group); - if (!ok) { - QPID_LOG(error, "Invalid message group state information for queue \"" << - qName << "\": table encoding error!"); - return; - } - MessageGroupManager::GroupState state; - if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) { - QPID_LOG(error, "Invalid message group state information for queue \"" << - qName << "\": fields missing error!"); - return; - } - state.group = group.getAsString(GROUP_NAME); - state.owner = group.getAsString(GROUP_OWNER); - state.acquired = group.getAsInt(GROUP_ACQUIRED_CT); - framing::Array positions(TYPE_CODE_UINT32); - ok = group.getArray(GROUP_POSITIONS, positions); - if (!ok) { - QPID_LOG(error, "Invalid message group state information for queue \"" << - qName << "\": position encoding error!"); - return; - } - framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); - ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); - if (!ok || positions.count() != acquiredMsgs.count()) { - QPID_LOG(error, "Invalid message group state information for queue \"" << - qName << "\": acquired flag encoding error!"); - return; - } - - Array::const_iterator a = acquiredMsgs.begin(); - for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) { - GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 4>()); - mState.acquired = (*a++)->getIntegerValue<bool>(); - state.members.push_back(mState); - } - - messageGroups[state.group] = state; - if (!state.owned()) { - assert(state.members.size()); - freeGroups[state.members.front().position] = &messageGroups[state.group]; - } - } - - QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader) -} diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h index fe39e007b5..bf45e776c8 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -25,11 +25,12 @@ /* for managing message grouping on Queues */ #include "qpid/broker/BrokerImportExport.h" -#include "qpid/broker/StatefulQueueObserver.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/broker/MessageDistributor.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/sys/unordered_map.h" +#include "boost/shared_ptr.hpp" #include <deque> namespace qpid { @@ -39,8 +40,9 @@ class QueueObserver; struct QueueSettings; class MessageDistributor; class Messages; +class Consumer; -class MessageGroupManager : public StatefulQueueObserver, public MessageDistributor +class MessageGroupManager : public QueueObserver, public MessageDistributor { static std::string defaultGroupId; // assigned if no group id header present @@ -101,10 +103,10 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu MessageGroupManager(const std::string& header, const std::string& _qName, Messages& container, unsigned int _timestamp=0 ) - : StatefulQueueObserver(std::string("MessageGroupManager:") + header), - groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName), - hits(0), misses(0), - lastMsg(0), cachedGroup(0) {} + : groupIdHeader( header ), timestamp(_timestamp), messages(container), + qName(_qName), + hits(0), misses(0), + lastMsg(0), cachedGroup(0) {} virtual ~MessageGroupManager(); // QueueObserver iface @@ -114,8 +116,6 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu void dequeued( const Message& qm ); void consumerAdded( const Consumer& ) {}; void consumerRemoved( const Consumer& ) {}; - void getState(qpid::framing::FieldTable& state ) const; - void setState(const qpid::framing::FieldTable&); // MessageDistributor iface bool acquire(const std::string& c, Message& ); diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 1ebaca2dae..9b2e31c925 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -65,7 +65,7 @@ namespace { QueueFlowLimit::QueueFlowLimit(Queue *_queue, uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize) - : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"), + : queue(_queue), queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), flowStopped(false), count(0), size(0), broker(0) diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index 94fec964af..b9aa09ec3a 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -26,7 +26,7 @@ #include <iostream> #include <memory> #include "qpid/broker/BrokerImportExport.h" -#include "qpid/broker/StatefulQueueObserver.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/sys/AtomicValue.h" @@ -40,6 +40,7 @@ namespace broker { class Broker; class Queue; +class Message; struct QueueSettings; /** @@ -49,7 +50,7 @@ struct QueueSettings; * passing _either_ level may turn flow control ON, but _both_ must be * below level before flow control will be turned OFF. */ - class QueueFlowLimit : public StatefulQueueObserver + class QueueFlowLimit : public QueueObserver { static uint64_t defaultMaxSize; static uint defaultFlowStopRatio; diff --git a/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h b/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h deleted file mode 100644 index c682d460b7..0000000000 --- a/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h +++ /dev/null @@ -1,63 +0,0 @@ -#ifndef QPID_BROKER_STATEFULQUEUEOBSERVER_H -#define QPID_BROKER_STATEFULQUEUEOBSERVER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/QueueObserver.h" -#include "qpid/framing/FieldTable.h" - -namespace qpid { -namespace broker { - -/** - * Specialized type of QueueObserver that maintains internal state that has to - * be replicated across clustered brokers. - */ -class StatefulQueueObserver : public QueueObserver -{ - public: - StatefulQueueObserver(std::string _id) : id(_id) {} - virtual ~StatefulQueueObserver() {} - - /** This identifier must uniquely identify this particular observer amoung - * all observers on a queue. For cluster replication, this id will be used - * to identify the peer queue observer for synchronization across - * brokers. - */ - const std::string& getId() const { return id; } - - /** This method should return the observer's internal state as an opaque - * map. - */ - virtual void getState(qpid::framing::FieldTable& state ) const = 0; - - /** The input map represents the internal state of the peer observer that - * this observer should synchonize to. - */ - virtual void setState(const qpid::framing::FieldTable&) = 0; - - - private: - std::string id; -}; -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_STATEFULQUEUEOBSERVER_H*/ |
