From 03f1df9ff7894a6d910120c82bba49e6193178ee Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 4 Oct 2007 16:06:05 +0000 Subject: Additional tests and fixes git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@581957 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/MessageHandlerImpl.cpp | 3 ++- cpp/src/qpid/broker/SemanticHandler.cpp | 14 ++++++-------- cpp/src/qpid/broker/SemanticHandler.h | 5 +++++ cpp/src/qpid/framing/AccumulatedAck.cpp | 7 ++++--- cpp/src/qpid/framing/AccumulatedAck.h | 3 ++- cpp/src/qpid/framing/SequenceNumberSet.cpp | 6 ++++++ cpp/src/qpid/framing/SequenceNumberSet.h | 7 ++++--- 7 files changed, 29 insertions(+), 16 deletions(-) (limited to 'cpp/src/qpid') diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index b3880d86e5..e6c7b28a49 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -198,7 +198,8 @@ void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /* //TODO: implement mode SequenceNumberSet results; - transfers.processRanges(boost::bind(&SemanticState::acquire, &state, _1, _2, results)); + RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2, results); + transfers.processRanges(op); results = results.condense(); getProxy().getMessage().acquired(results); } diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 9cacb4ccf7..dc5407be99 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -31,12 +31,16 @@ #include "qpid/framing/InvocationVisitor.h" #include +#include using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -SemanticHandler::SemanticHandler(SessionState& s) : state(*this,s), session(s) {} +SemanticHandler::SemanticHandler(SessionState& s) : + state(*this,s), session(s), + ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2)) + {} void SemanticHandler::handle(framing::AMQFrame& frame) { @@ -81,13 +85,7 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran //ack messages: state.ackCumulative(mark.getValue()); } - if (range.size() % 2) { //must be even number - throw ConnectionException(530, "Received odd number of elements in ranged mark"); - } else { - for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { - state.ackRange(*i, *(++i)); - } - } + range.processRanges(ackOp); } void SemanticHandler::sendCompletion() diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index dc7f21ac34..9380708ec5 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -33,6 +33,8 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceNumber.h" +#include + namespace qpid { namespace framing { @@ -51,6 +53,8 @@ class SemanticHandler : public DeliveryAdapter, public framing::AMQP_ServerOperations::ExecutionHandler { + typedef boost::function RangedOperation; + SemanticState state; SessionState& session; // FIXME aconway 2007-09-20: Why are these on the handler rather than the @@ -59,6 +63,7 @@ class SemanticHandler : public DeliveryAdapter, framing::Window outgoing; sys::Mutex outLock; MessageBuilder msgBuilder; + RangedOperation ackOp; enum TrackId {EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK}; TrackId getTrack(const framing::AMQFrame& frame); diff --git a/cpp/src/qpid/framing/AccumulatedAck.cpp b/cpp/src/qpid/framing/AccumulatedAck.cpp index 219a68b96c..bf53bf0cd6 100644 --- a/cpp/src/qpid/framing/AccumulatedAck.cpp +++ b/cpp/src/qpid/framing/AccumulatedAck.cpp @@ -22,12 +22,15 @@ #include #include +#include using std::list; using std::max; using std::min; using namespace qpid::framing; +AccumulatedAck::AccumulatedAck(SequenceNumber r) : mark(r) {} + void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){ assert(first <= last); if (last < mark) return; @@ -103,9 +106,7 @@ void AccumulatedAck::collectRanges(SequenceNumberSet& set) const void AccumulatedAck::update(const SequenceNumber cumulative, const SequenceNumberSet& range) { update(mark, cumulative); - for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { - update(*i, *(++i)); - } + range.processRanges(*this); } diff --git a/cpp/src/qpid/framing/AccumulatedAck.h b/cpp/src/qpid/framing/AccumulatedAck.h index 1f66197e2a..a635d2ea04 100644 --- a/cpp/src/qpid/framing/AccumulatedAck.h +++ b/cpp/src/qpid/framing/AccumulatedAck.h @@ -58,13 +58,14 @@ namespace qpid { */ std::list ranges; - explicit AccumulatedAck(SequenceNumber r = SequenceNumber()) : mark(r) {} + explicit AccumulatedAck(SequenceNumber r = SequenceNumber()); void update(SequenceNumber firstTag, SequenceNumber lastTag); void consolidate(); void clear(); bool covers(SequenceNumber tag) const; void collectRanges(SequenceNumberSet& set) const; void update(const SequenceNumber cumulative, const SequenceNumberSet& range); + void operator()(SequenceNumber first, SequenceNumber last) { update(first, last); } }; std::ostream& operator<<(std::ostream&, const Range&); std::ostream& operator<<(std::ostream&, const AccumulatedAck&); diff --git a/cpp/src/qpid/framing/SequenceNumberSet.cpp b/cpp/src/qpid/framing/SequenceNumberSet.cpp index f1c81e078b..b769befeb7 100644 --- a/cpp/src/qpid/framing/SequenceNumberSet.cpp +++ b/cpp/src/qpid/framing/SequenceNumberSet.cpp @@ -60,6 +60,12 @@ SequenceNumberSet SequenceNumberSet::condense() const return result; } +void SequenceNumberSet::addRange(const SequenceNumber& start, const SequenceNumber& end) +{ + push_back(start); + push_back(end); +} + namespace qpid{ namespace framing{ diff --git a/cpp/src/qpid/framing/SequenceNumberSet.h b/cpp/src/qpid/framing/SequenceNumberSet.h index f9d0cc1fd4..9091e7142e 100644 --- a/cpp/src/qpid/framing/SequenceNumberSet.h +++ b/cpp/src/qpid/framing/SequenceNumberSet.h @@ -41,17 +41,18 @@ public: void decode(Buffer& buffer); uint32_t encodedSize() const; SequenceNumberSet condense() const; + void addRange(const SequenceNumber& start, const SequenceNumber& end); template - void processRanges(T t) const + void processRanges(T& t) const { if (size() % 2) { //must be even number throw InvalidArgumentException("SequenceNumberSet contains odd number of elements"); } for (SequenceNumberSet::const_iterator i = begin(); i != end(); i++) { - SequenceNumber first = i->getValue(); - SequenceNumber last = (++i)->getValue(); + SequenceNumber first = *(i); + SequenceNumber last = *(++i); t(first, last); } } -- cgit v1.2.1