diff options
| author | Gordon Sim <gsim@apache.org> | 2008-04-21 14:37:03 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-04-21 14:37:03 +0000 |
| commit | 61647950e1c4e6b1efb0a1b3f3b220783680103f (patch) | |
| tree | f666cacf0e56079e23ef0a9c881d26baa7d5a1fe /cpp/src/qpid/client | |
| parent | ceca53c26ab6ed56929dc558b3255bdd83090315 (diff) | |
| download | qpid-python-61647950e1c4e6b1efb0a1b3f3b220783680103f.tar.gz | |
QPID-920: send message-accept for acks (as well as completion)
* AckPolicy now maintains a set of transfered messages for cumulative accepts
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@650159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/AckPolicy.h | 27 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Channel.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Execution.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/client/LocalQueue.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Message.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Message.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionBase.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionBase.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 4 |
14 files changed, 64 insertions, 33 deletions
diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/AckPolicy.h index af17539ebe..8d62b6f4f2 100644 --- a/cpp/src/qpid/client/AckPolicy.h +++ b/cpp/src/qpid/client/AckPolicy.h @@ -21,6 +21,8 @@ * */ +#include "qpid/framing/SequenceSet.h" + namespace qpid { namespace client { @@ -31,21 +33,36 @@ namespace client { */ class AckPolicy { + framing::SequenceSet accepted; size_t interval; size_t count; public: /** *@param n: acknowledge every n messages. - *n==0 means no automatick acknowledgement. + *n==0 means no automatic acknowledgement. */ AckPolicy(size_t n=1) : interval(n), count(n) {} - void ack(const Message& msg) { + void ack(const Message& msg, Session& session) { + accepted.add(msg.getId()); if (!interval) return; - bool send=(--count==0); - msg.acknowledge(true, send); - if (send) count = interval; + if (--count==0) { + session.markCompleted(msg.getId(), false, true); + session.messageAccept(accepted); + accepted.clear(); + count = interval; + } else { + session.markCompleted(msg.getId(), false, false); + } + } + + void ackOutstanding(Session& session) { + if (!accepted.empty()) { + session.messageAccept(accepted); + accepted.clear(); + session.sendCompletion(); + } } }; diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index ae9f78483d..f32b5e2614 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -23,6 +23,7 @@ #include <sstream> #include "Channel.h" #include "qpid/sys/Monitor.h" +#include "AckPolicy.h" #include "Message.h" #include "Connection.h" #include "Demux.h" @@ -202,8 +203,10 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { if (incoming->tryPop(p)) { msg.populate(*p); if (ackMode == AUTO_ACK) { - msg.setSession(session); - msg.acknowledge(false, true); + AckPolicy acker; + acker.ack(msg, session); + } else { + session.markCompleted(msg.getId(), false, false); } return true; } @@ -260,7 +263,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination) bool send = i->second.ackMode == AUTO_ACK || (prefetch && ++(i->second.count) > (prefetch / 2)); if (send) i->second.count = 0; - session.getExecution().markCompleted(content.getId(), true, send); + session.markCompleted(content.getId(), true, send); } } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index cc67701748..68e799918c 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -43,7 +43,7 @@ void Subscriber::received(Message& msg) { if (listener) { listener->received(msg); - autoAck.ack(msg); + autoAck.ack(msg, session); } } @@ -72,7 +72,7 @@ void Dispatcher::run() Mutex::ScopedUnlock u(lock); FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { - Message msg(*content, session); + Message msg(*content); Subscriber::shared_ptr listener = find(msg.getDestination()); if (!listener) { QPID_LOG(error, "No listener found for destination " << msg.getDestination()); diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h index e4b2db23e1..b8c8a4292d 100644 --- a/cpp/src/qpid/client/Execution.h +++ b/cpp/src/qpid/client/Execution.h @@ -36,10 +36,6 @@ class Execution public: virtual ~Execution() {} /** - * Mark the incoming command with the specified id as completed - */ - virtual void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) = 0; - /** * Provides access to the demultiplexing function within the * session implementation */ diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp index 951996f005..04cee40a37 100644 --- a/cpp/src/qpid/client/LocalQueue.cpp +++ b/cpp/src/qpid/client/LocalQueue.cpp @@ -36,8 +36,8 @@ Message LocalQueue::pop() { throw ClosedException(); FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { - Message m(*content, session); - autoAck.ack(m); + Message m(*content); + autoAck.ack(m, session); return m; } else @@ -46,6 +46,7 @@ Message LocalQueue::pop() { } void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; } +AckPolicy& LocalQueue::getAckPolicy() { return autoAck; } bool LocalQueue::empty() const { diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index f8b2c2e0b3..58887077f6 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -47,9 +47,10 @@ class LocalQueue bool empty() const; size_t size() const; void setAckPolicy(AckPolicy); + AckPolicy& getAckPolicy(); private: - friend class SubscriptionManager; + friend class SubscriptionManager; Session session; Demux::QueuePtr queue; AckPolicy autoAck; diff --git a/cpp/src/qpid/client/Message.cpp b/cpp/src/qpid/client/Message.cpp index 3d4b9da9fa..58d8fb142a 100644 --- a/cpp/src/qpid/client/Message.cpp +++ b/cpp/src/qpid/client/Message.cpp @@ -48,11 +48,6 @@ namespace client { return getMessageProperties().getApplicationHeaders(); } - void Message::acknowledge(bool cumulative, bool notifyPeer) const - { - const_cast<Session&>(session).getExecution().markCompleted(id, cumulative, notifyPeer); - } - const framing::MessageTransferBody& Message::getMethod() const { return method; @@ -64,13 +59,10 @@ namespace client { } /**@internal for incoming messages */ - Message::Message(const framing::FrameSet& frameset, Session s) : - method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s) + Message::Message(const framing::FrameSet& frameset) : + method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()) { populate(frameset); } - /**@internal use for incoming messages. */ - void Message::setSession(Session s) { session=s; } - }} diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h index 977cc89146..864a16d9ce 100644 --- a/cpp/src/qpid/client/Message.h +++ b/cpp/src/qpid/client/Message.h @@ -45,19 +45,15 @@ public: bool isRedelivered() const; void setRedelivered(bool redelivered); framing::FieldTable& getHeaders(); - void acknowledge(bool cumulative = true, bool notifyPeer = true) const; const framing::MessageTransferBody& getMethod() const; const framing::SequenceNumber& getId() const; /**@internal for incoming messages */ - Message(const framing::FrameSet& frameset, Session s); - /**@internal use for incoming messages. */ - void setSession(Session s); + Message(const framing::FrameSet& frameset); private: //method and id are only set for received messages: framing::MessageTransferBody method; framing::SequenceNumber id; - Session session; }; }} diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp index d6a7571e9f..4a202842a5 100644 --- a/cpp/src/qpid/client/SessionBase.cpp +++ b/cpp/src/qpid/client/SessionBase.cpp @@ -50,6 +50,16 @@ void SessionBase::sync() impl->send(b).wait(*impl); } +void SessionBase::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) +{ + impl->markCompleted(id, cumulative, notifyPeer); +} + +void SessionBase::sendCompletion() +{ + impl->sendCompletion(); +} + Uuid SessionBase::getId() const { return impl->getId(); } framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); } diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h index 54484113b1..29a266f4ca 100644 --- a/cpp/src/qpid/client/SessionBase.h +++ b/cpp/src/qpid/client/SessionBase.h @@ -131,6 +131,8 @@ class SessionBase Execution& getExecution(); void sync(); + void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); + void sendCompletion(); typedef framing::TransferContent DefaultContent; diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 57f12cf28e..4e3f6cdd98 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -518,6 +518,12 @@ void SessionImpl::flush(bool expected, bool confirmed, bool completed) void SessionImpl::sendCompletion() { + Lock l(state); + sendCompletionImpl(); +} + +void SessionImpl::sendCompletionImpl() +{ proxy.completed(completedIn, true); } diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 1284670389..eb1c0acb97 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -87,6 +87,7 @@ public: bool isComplete(const framing::SequenceNumber& id); bool isCompleteUpTo(const framing::SequenceNumber& id); void waitForCompletion(const framing::SequenceNumber& id); + void sendCompletion(); //NOTE: these are called by the network thread when the connection is closed or dies void connectionClosed(uint16_t code, const std::string& text); @@ -122,7 +123,7 @@ private: void sendContent(const framing::MethodContent&); void waitForCompletionImpl(const framing::SequenceNumber& id); - void sendCompletion(); + void sendCompletionImpl(); // Note: Following methods are called by network thread in // response to session controls from the broker diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index b353be481b..2ba3f5fe62 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -87,6 +87,8 @@ void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; } void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; } +AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; } + void SubscriptionManager::cancel(const std::string dest) { dispatcher.cancel(dest); diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 48cb725fb8..4ccb95c968 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -133,6 +133,10 @@ class SubscriptionManager : public sys::Runnable * Default is to acknowledge every message automatically. */ void setAckPolicy(const AckPolicy& autoAck); + /** + * + */ + AckPolicy& getAckPolicy(); }; |
