diff options
| author | Alan Conway <aconway@apache.org> | 2012-12-19 21:23:31 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-12-19 21:23:31 +0000 |
| commit | 96c6a3af1095cfd55bc2a8efa50ded661c49edde (patch) | |
| tree | f472a83ab1973db759487cefc04bdddd2bfe363f /qpid/cpp | |
| parent | fa2da17046308a9aa0f879af3d1e220f9550a90b (diff) | |
| download | qpid-python-96c6a3af1095cfd55bc2a8efa50ded661c49edde.tar.gz | |
QPID-4514: Remove obsolete cluster code: QueueFlowLimit
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1424131 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 75 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 4 |
2 files changed, 2 insertions, 77 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index b887364d51..1ebaca2dae 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -291,79 +291,8 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& s return 0; } -/* Cluster replication */ - -namespace { - /** pack a set of sequence number ranges into a framing::Array */ - void buildSeqRangeArray(qpid::framing::Array *seqs, - const qpid::framing::SequenceNumber& first, - const qpid::framing::SequenceNumber& last) - { - seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first))); - seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last))); - } -} - -/** Runs on UPDATER to snapshot current state */ -void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const -{ - sys::Mutex::ScopedLock l(indexLock); - state.clear(); - - framing::SequenceSet ss; - if (!index.empty()) { - /* replicate the set of messages pending flow control */ - for (std::map<framing::SequenceNumber, Message >::const_iterator itr = index.begin(); - itr != index.end(); ++itr) { - ss.add(itr->first); - } - framing::Array seqs(TYPE_CODE_UINT32); - typedef boost::function<void(framing::SequenceNumber, framing::SequenceNumber)> arrayBuilder; - ss.for_each((arrayBuilder)boost::bind(&buildSeqRangeArray, &seqs, _1, _2)); - state.setArray("pendingMsgSeqs", seqs); - } - QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss); -} - - -/** called on UPDATEE to set state from snapshot */ -void QueueFlowLimit::setState(const qpid::framing::FieldTable& state) -{ - sys::Mutex::ScopedLock l(indexLock); - index.clear(); - - framing::SequenceSet fcmsg; - framing::Array seqArray(TYPE_CODE_UINT32); - if (state.getArray("pendingMsgSeqs", seqArray)) { - assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges - framing::Array::const_iterator i = seqArray.begin(); - while (i != seqArray.end()) { - framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>()); - ++i; - framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>()); - ++i; - fcmsg.add(first, last); - for (SequenceNumber seq = first; seq <= last; ++seq) { - Message msg; - queue->find(seq, msg); // fyi: may not be found if msg is acquired & unacked - bool unique; - unique = index.insert(std::pair<framing::SequenceNumber, Message >(seq, msg)).second; - // Like this to avoid tripping up unused variable warning when NDEBUG set - if (!unique) assert(unique); - } - } - } - - flowStopped = index.size() != 0; - if (queueMgmtObj) { - queueMgmtObj->set_flowStopped(isFlowControlActive()); - } - QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg) -} - - namespace qpid { - namespace broker { +namespace broker { std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f) { @@ -372,6 +301,6 @@ std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f) return out; } - } +} } diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index 0e83457efa..94fec964af 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -84,10 +84,6 @@ struct QueueSettings; QPID_BROKER_EXTERN void acquired(const Message&) {}; QPID_BROKER_EXTERN void requeued(const Message&) {}; - /** for clustering: */ - QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const; - QPID_BROKER_EXTERN void setState(const qpid::framing::FieldTable&); - uint32_t getFlowStopCount() const { return flowStopCount; } uint32_t getFlowResumeCount() const { return flowResumeCount; } uint64_t getFlowStopSize() const { return flowStopSize; } |
