diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/AckPolicy.h | 27 | ||||
-rw-r--r-- | cpp/src/qpid/client/Channel.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/Execution.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/Message.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/client/Message.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SequenceSet.cpp | 26 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 4 | ||||
-rw-r--r-- | cpp/src/tests/client_test.cpp | 5 | ||||
-rw-r--r-- | cpp/src/tests/perftest.cpp | 6 | ||||
-rw-r--r-- | cpp/src/tests/topic_listener.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/txtest.cpp | 2 |
20 files changed, 90 insertions, 52 deletions
diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/AckPolicy.h index af17539ebe..8d62b6f4f2 100644 --- a/cpp/src/qpid/client/AckPolicy.h +++ b/cpp/src/qpid/client/AckPolicy.h @@ -21,6 +21,8 @@ * */ +#include "qpid/framing/SequenceSet.h" + namespace qpid { namespace client { @@ -31,21 +33,36 @@ namespace client { */ class AckPolicy { + framing::SequenceSet accepted; size_t interval; size_t count; public: /** *@param n: acknowledge every n messages. - *n==0 means no automatick acknowledgement. + *n==0 means no automatic acknowledgement. */ AckPolicy(size_t n=1) : interval(n), count(n) {} - void ack(const Message& msg) { + void ack(const Message& msg, Session& session) { + accepted.add(msg.getId()); if (!interval) return; - bool send=(--count==0); - msg.acknowledge(true, send); - if (send) count = interval; + if (--count==0) { + session.markCompleted(msg.getId(), false, true); + session.messageAccept(accepted); + accepted.clear(); + count = interval; + } else { + session.markCompleted(msg.getId(), false, false); + } + } + + void ackOutstanding(Session& session) { + if (!accepted.empty()) { + session.messageAccept(accepted); + accepted.clear(); + session.sendCompletion(); + } } }; diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index ae9f78483d..f32b5e2614 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -23,6 +23,7 @@ #include <sstream> #include "Channel.h" #include "qpid/sys/Monitor.h" +#include "AckPolicy.h" #include "Message.h" #include "Connection.h" #include "Demux.h" @@ -202,8 +203,10 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { if (incoming->tryPop(p)) { msg.populate(*p); if (ackMode == AUTO_ACK) { - msg.setSession(session); - msg.acknowledge(false, true); + AckPolicy acker; + acker.ack(msg, session); + } else { + session.markCompleted(msg.getId(), false, false); } return true; } @@ -260,7 +263,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination) bool send = i->second.ackMode == AUTO_ACK || (prefetch && ++(i->second.count) > (prefetch / 2)); if (send) i->second.count = 0; - session.getExecution().markCompleted(content.getId(), true, send); + session.markCompleted(content.getId(), true, send); } } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index cc67701748..68e799918c 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -43,7 +43,7 @@ void Subscriber::received(Message& msg) { if (listener) { listener->received(msg); - autoAck.ack(msg); + autoAck.ack(msg, session); } } @@ -72,7 +72,7 @@ void Dispatcher::run() Mutex::ScopedUnlock u(lock); FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { - Message msg(*content, session); + Message msg(*content); Subscriber::shared_ptr listener = find(msg.getDestination()); if (!listener) { QPID_LOG(error, "No listener found for destination " << msg.getDestination()); diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h index e4b2db23e1..b8c8a4292d 100644 --- a/cpp/src/qpid/client/Execution.h +++ b/cpp/src/qpid/client/Execution.h @@ -36,10 +36,6 @@ class Execution public: virtual ~Execution() {} /** - * Mark the incoming command with the specified id as completed - */ - virtual void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) = 0; - /** * Provides access to the demultiplexing function within the * session implementation */ diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp index 951996f005..04cee40a37 100644 --- a/cpp/src/qpid/client/LocalQueue.cpp +++ b/cpp/src/qpid/client/LocalQueue.cpp @@ -36,8 +36,8 @@ Message LocalQueue::pop() { throw ClosedException(); FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { - Message m(*content, session); - autoAck.ack(m); + Message m(*content); + autoAck.ack(m, session); return m; } else @@ -46,6 +46,7 @@ Message LocalQueue::pop() { } void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; } +AckPolicy& LocalQueue::getAckPolicy() { return autoAck; } bool LocalQueue::empty() const { diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index f8b2c2e0b3..58887077f6 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -47,9 +47,10 @@ class LocalQueue bool empty() const; size_t size() const; void setAckPolicy(AckPolicy); + AckPolicy& getAckPolicy(); private: - friend class SubscriptionManager; + friend class SubscriptionManager; Session session; Demux::QueuePtr queue; AckPolicy autoAck; diff --git a/cpp/src/qpid/client/Message.cpp b/cpp/src/qpid/client/Message.cpp index 3d4b9da9fa..58d8fb142a 100644 --- a/cpp/src/qpid/client/Message.cpp +++ b/cpp/src/qpid/client/Message.cpp @@ -48,11 +48,6 @@ namespace client { return getMessageProperties().getApplicationHeaders(); } - void Message::acknowledge(bool cumulative, bool notifyPeer) const - { - const_cast<Session&>(session).getExecution().markCompleted(id, cumulative, notifyPeer); - } - const framing::MessageTransferBody& Message::getMethod() const { return method; @@ -64,13 +59,10 @@ namespace client { } /**@internal for incoming messages */ - Message::Message(const framing::FrameSet& frameset, Session s) : - method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s) + Message::Message(const framing::FrameSet& frameset) : + method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()) { populate(frameset); } - /**@internal use for incoming messages. */ - void Message::setSession(Session s) { session=s; } - }} diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h index 977cc89146..864a16d9ce 100644 --- a/cpp/src/qpid/client/Message.h +++ b/cpp/src/qpid/client/Message.h @@ -45,19 +45,15 @@ public: bool isRedelivered() const; void setRedelivered(bool redelivered); framing::FieldTable& getHeaders(); - void acknowledge(bool cumulative = true, bool notifyPeer = true) const; const framing::MessageTransferBody& getMethod() const; const framing::SequenceNumber& getId() const; /**@internal for incoming messages */ - Message(const framing::FrameSet& frameset, Session s); - /**@internal use for incoming messages. */ - void setSession(Session s); + Message(const framing::FrameSet& frameset); private: //method and id are only set for received messages: framing::MessageTransferBody method; framing::SequenceNumber id; - Session session; }; }} diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp index d6a7571e9f..4a202842a5 100644 --- a/cpp/src/qpid/client/SessionBase.cpp +++ b/cpp/src/qpid/client/SessionBase.cpp @@ -50,6 +50,16 @@ void SessionBase::sync() impl->send(b).wait(*impl); } +void SessionBase::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) +{ + impl->markCompleted(id, cumulative, notifyPeer); +} + +void SessionBase::sendCompletion() +{ + impl->sendCompletion(); +} + Uuid SessionBase::getId() const { return impl->getId(); } framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); } diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h index 54484113b1..29a266f4ca 100644 --- a/cpp/src/qpid/client/SessionBase.h +++ b/cpp/src/qpid/client/SessionBase.h @@ -131,6 +131,8 @@ class SessionBase Execution& getExecution(); void sync(); + void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); + void sendCompletion(); typedef framing::TransferContent DefaultContent; diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 57f12cf28e..4e3f6cdd98 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -518,6 +518,12 @@ void SessionImpl::flush(bool expected, bool confirmed, bool completed) void SessionImpl::sendCompletion() { + Lock l(state); + sendCompletionImpl(); +} + +void SessionImpl::sendCompletionImpl() +{ proxy.completed(completedIn, true); } diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 1284670389..eb1c0acb97 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -87,6 +87,7 @@ public: bool isComplete(const framing::SequenceNumber& id); bool isCompleteUpTo(const framing::SequenceNumber& id); void waitForCompletion(const framing::SequenceNumber& id); + void sendCompletion(); //NOTE: these are called by the network thread when the connection is closed or dies void connectionClosed(uint16_t code, const std::string& text); @@ -122,7 +123,7 @@ private: void sendContent(const framing::MethodContent&); void waitForCompletionImpl(const framing::SequenceNumber& id); - void sendCompletion(); + void sendCompletionImpl(); // Note: Following methods are called by network thread in // response to session controls from the broker diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index b353be481b..2ba3f5fe62 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -87,6 +87,8 @@ void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; } void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; } +AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; } + void SubscriptionManager::cancel(const std::string dest) { dispatcher.cancel(dest); diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 48cb725fb8..4ccb95c968 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -133,6 +133,10 @@ class SubscriptionManager : public sys::Runnable * Default is to acknowledge every message automatically. */ void setAckPolicy(const AckPolicy& autoAck); + /** + * + */ + AckPolicy& getAckPolicy(); }; diff --git a/cpp/src/qpid/framing/SequenceSet.cpp b/cpp/src/qpid/framing/SequenceSet.cpp index 1858467de6..4903d92a91 100644 --- a/cpp/src/qpid/framing/SequenceSet.cpp +++ b/cpp/src/qpid/framing/SequenceSet.cpp @@ -143,20 +143,24 @@ void SequenceSet::remove(const SequenceNumber& start, const SequenceNumber& end) void SequenceSet::remove(const SequenceNumber& s) { - for (Ranges::iterator i = ranges.begin(); i != ranges.end() && s >= i->start; i++) { - if (i->start == s) { + for (Ranges::iterator i = ranges.begin(); i != ranges.end() && s >= i->start; i++) { + if (i->contains(s)) { if (i->start == i->end) { - ranges.erase(i); - } else { + //range is just a single number, so we can delete the whole range + i = ranges.erase(i); + } else if (i->start == s) { + //move the start forward to exclude s ++(i->start); + } else if (i->end == s) { + //move the end backward to exclude s + --(i->end); + } else { + //need to split range pointed to by i + Range r(i->start, (uint32_t)s - 1); + i->start = s + 1; + i = ranges.insert(i, r); } - } else if (i->end == s) { - --(i->end); - } else if (i->contains(s)) { - //need to split range pointed to by i - Range r(i->start, (uint32_t)s - 1); - i->start = s + 1; - ranges.insert(i, r); + break; } } } diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 9b6e0dce21..a5f7b9d803 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -20,6 +20,7 @@ */ #include "unit_test.h" #include "BrokerFixture.h" +#include "qpid/client/AckPolicy.h" #include "qpid/client/Dispatcher.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" @@ -124,7 +125,8 @@ QPID_AUTO_TEST_CASE(testTransfer) BOOST_CHECK(msg->isA<MessageTransferBody>()); BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); //confirm receipt: - fix.session.getExecution().markCompleted(msg->getId(), true, true); + AckPolicy autoAck; + autoAck.ack(Message(*msg), fix.session); } QPID_AUTO_TEST_CASE(testDispatcher) diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp index 011dcd4678..d0da2ec8ca 100644 --- a/cpp/src/tests/client_test.cpp +++ b/cpp/src/tests/client_test.cpp @@ -124,10 +124,11 @@ int main(int argc, char** argv) if (opts.trace) std::cout << "Subscribed to queue." << std::endl; FrameSet::shared_ptr incoming = session.get(); if (incoming->isA<MessageTransferBody>()) { - Message msgIn(*incoming, session); + Message msgIn(*incoming); if (msgIn.getData() == msgOut.getData()) { if (opts.trace) std::cout << "Received the exepected message." << std::endl; - msgIn.acknowledge(); + session.messageAccept(SequenceSet(msgIn.getId())); + session.markCompleted(msgIn.getId(), true, true); } else { print("Received an unexepected message: ", msgIn); } diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index 966d708ff6..231b25daa4 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -533,9 +533,9 @@ struct SubscribeThread : public Client { size_t expect=0; for (size_t i = 0; i < opts.subQuota; ++i) { msg=lq.pop(); - if (opts.intervalSub) ::usleep(opts.intervalSub*1000); + if (opts.intervalSub) ::usleep(opts.intervalSub*1000); // TODO aconway 2007-11-23: check message order for. - // multiple publishers. Need an acorray of counters, + // multiple publishers. Need an array of counters, // one per publisher and a publisher ID in the // message. Careful not to introduce a lot of overhead // here, e.g. no std::map, std::string etc. @@ -550,7 +550,7 @@ struct SubscribeThread : public Client { } } if (opts.ack !=0) - msg.acknowledge(); // Cumulative ack for final batch. + subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. AbsTime end=now(); // Report to publisher. diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index 5208b67445..3dd042605e 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -159,7 +159,7 @@ void Listener::received(Message& message){ if(!!type && StringValue("TERMINATION_REQUEST") == *type){ shutdown(); }else if(!!type && StringValue("REPORT_REQUEST") == *type){ - message.acknowledge();//acknowledge everything upto this point + mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything upto this point cout <<"Batch ended, sending report." << endl; //send a report: report(); diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp index 5030b24070..a8369df759 100644 --- a/cpp/src/tests/txtest.cpp +++ b/cpp/src/tests/txtest.cpp @@ -144,7 +144,7 @@ struct Transfer : public Client, public Runnable out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode()); session.messageTransfer(arg::content=out, arg::acceptMode=1); } - in.acknowledge(); + lq.getAckPolicy().ackOutstanding(session); session.txCommit(); } } catch(const std::exception& e) { |