summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-12-19 21:23:31 +0000
committerAlan Conway <aconway@apache.org>2012-12-19 21:23:31 +0000
commit96c6a3af1095cfd55bc2a8efa50ded661c49edde (patch)
treef472a83ab1973db759487cefc04bdddd2bfe363f /qpid/cpp
parentfa2da17046308a9aa0f879af3d1e220f9550a90b (diff)
downloadqpid-python-96c6a3af1095cfd55bc2a8efa50ded661c49edde.tar.gz
QPID-4514: Remove obsolete cluster code: QueueFlowLimit
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1424131 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp75
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.h4
2 files changed, 2 insertions, 77 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index b887364d51..1ebaca2dae 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -291,79 +291,8 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& s
return 0;
}
-/* Cluster replication */
-
-namespace {
- /** pack a set of sequence number ranges into a framing::Array */
- void buildSeqRangeArray(qpid::framing::Array *seqs,
- const qpid::framing::SequenceNumber& first,
- const qpid::framing::SequenceNumber& last)
- {
- seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first)));
- seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last)));
- }
-}
-
-/** Runs on UPDATER to snapshot current state */
-void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const
-{
- sys::Mutex::ScopedLock l(indexLock);
- state.clear();
-
- framing::SequenceSet ss;
- if (!index.empty()) {
- /* replicate the set of messages pending flow control */
- for (std::map<framing::SequenceNumber, Message >::const_iterator itr = index.begin();
- itr != index.end(); ++itr) {
- ss.add(itr->first);
- }
- framing::Array seqs(TYPE_CODE_UINT32);
- typedef boost::function<void(framing::SequenceNumber, framing::SequenceNumber)> arrayBuilder;
- ss.for_each((arrayBuilder)boost::bind(&buildSeqRangeArray, &seqs, _1, _2));
- state.setArray("pendingMsgSeqs", seqs);
- }
- QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss);
-}
-
-
-/** called on UPDATEE to set state from snapshot */
-void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
-{
- sys::Mutex::ScopedLock l(indexLock);
- index.clear();
-
- framing::SequenceSet fcmsg;
- framing::Array seqArray(TYPE_CODE_UINT32);
- if (state.getArray("pendingMsgSeqs", seqArray)) {
- assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges
- framing::Array::const_iterator i = seqArray.begin();
- while (i != seqArray.end()) {
- framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>());
- ++i;
- framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>());
- ++i;
- fcmsg.add(first, last);
- for (SequenceNumber seq = first; seq <= last; ++seq) {
- Message msg;
- queue->find(seq, msg); // fyi: may not be found if msg is acquired & unacked
- bool unique;
- unique = index.insert(std::pair<framing::SequenceNumber, Message >(seq, msg)).second;
- // Like this to avoid tripping up unused variable warning when NDEBUG set
- if (!unique) assert(unique);
- }
- }
- }
-
- flowStopped = index.size() != 0;
- if (queueMgmtObj) {
- queueMgmtObj->set_flowStopped(isFlowControlActive());
- }
- QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg)
-}
-
-
namespace qpid {
- namespace broker {
+namespace broker {
std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f)
{
@@ -372,6 +301,6 @@ std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f)
return out;
}
- }
+}
}
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
index 0e83457efa..94fec964af 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -84,10 +84,6 @@ struct QueueSettings;
QPID_BROKER_EXTERN void acquired(const Message&) {};
QPID_BROKER_EXTERN void requeued(const Message&) {};
- /** for clustering: */
- QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;
- QPID_BROKER_EXTERN void setState(const qpid::framing::FieldTable&);
-
uint32_t getFlowStopCount() const { return flowStopCount; }
uint32_t getFlowResumeCount() const { return flowResumeCount; }
uint64_t getFlowStopSize() const { return flowStopSize; }