summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-21 14:37:03 +0000
committerGordon Sim <gsim@apache.org>2008-04-21 14:37:03 +0000
commit61647950e1c4e6b1efb0a1b3f3b220783680103f (patch)
treef666cacf0e56079e23ef0a9c881d26baa7d5a1fe /cpp/src/qpid/client
parentceca53c26ab6ed56929dc558b3255bdd83090315 (diff)
downloadqpid-python-61647950e1c4e6b1efb0a1b3f3b220783680103f.tar.gz
QPID-920: send message-accept for acks (as well as completion)
* AckPolicy now maintains a set of transfered messages for cumulative accepts git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@650159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/AckPolicy.h27
-rw-r--r--cpp/src/qpid/client/Channel.cpp9
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp4
-rw-r--r--cpp/src/qpid/client/Execution.h4
-rw-r--r--cpp/src/qpid/client/LocalQueue.cpp5
-rw-r--r--cpp/src/qpid/client/LocalQueue.h3
-rw-r--r--cpp/src/qpid/client/Message.cpp12
-rw-r--r--cpp/src/qpid/client/Message.h6
-rw-r--r--cpp/src/qpid/client/SessionBase.cpp10
-rw-r--r--cpp/src/qpid/client/SessionBase.h2
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp6
-rw-r--r--cpp/src/qpid/client/SessionImpl.h3
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp2
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h4
14 files changed, 64 insertions, 33 deletions
diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/AckPolicy.h
index af17539ebe..8d62b6f4f2 100644
--- a/cpp/src/qpid/client/AckPolicy.h
+++ b/cpp/src/qpid/client/AckPolicy.h
@@ -21,6 +21,8 @@
*
*/
+#include "qpid/framing/SequenceSet.h"
+
namespace qpid {
namespace client {
@@ -31,21 +33,36 @@ namespace client {
*/
class AckPolicy
{
+ framing::SequenceSet accepted;
size_t interval;
size_t count;
public:
/**
*@param n: acknowledge every n messages.
- *n==0 means no automatick acknowledgement.
+ *n==0 means no automatic acknowledgement.
*/
AckPolicy(size_t n=1) : interval(n), count(n) {}
- void ack(const Message& msg) {
+ void ack(const Message& msg, Session& session) {
+ accepted.add(msg.getId());
if (!interval) return;
- bool send=(--count==0);
- msg.acknowledge(true, send);
- if (send) count = interval;
+ if (--count==0) {
+ session.markCompleted(msg.getId(), false, true);
+ session.messageAccept(accepted);
+ accepted.clear();
+ count = interval;
+ } else {
+ session.markCompleted(msg.getId(), false, false);
+ }
+ }
+
+ void ackOutstanding(Session& session) {
+ if (!accepted.empty()) {
+ session.messageAccept(accepted);
+ accepted.clear();
+ session.sendCompletion();
+ }
}
};
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index ae9f78483d..f32b5e2614 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -23,6 +23,7 @@
#include <sstream>
#include "Channel.h"
#include "qpid/sys/Monitor.h"
+#include "AckPolicy.h"
#include "Message.h"
#include "Connection.h"
#include "Demux.h"
@@ -202,8 +203,10 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
if (incoming->tryPop(p)) {
msg.populate(*p);
if (ackMode == AUTO_ACK) {
- msg.setSession(session);
- msg.acknowledge(false, true);
+ AckPolicy acker;
+ acker.ack(msg, session);
+ } else {
+ session.markCompleted(msg.getId(), false, false);
}
return true;
}
@@ -260,7 +263,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination)
bool send = i->second.ackMode == AUTO_ACK
|| (prefetch && ++(i->second.count) > (prefetch / 2));
if (send) i->second.count = 0;
- session.getExecution().markCompleted(content.getId(), true, send);
+ session.markCompleted(content.getId(), true, send);
}
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index cc67701748..68e799918c 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -43,7 +43,7 @@ void Subscriber::received(Message& msg)
{
if (listener) {
listener->received(msg);
- autoAck.ack(msg);
+ autoAck.ack(msg, session);
}
}
@@ -72,7 +72,7 @@ void Dispatcher::run()
Mutex::ScopedUnlock u(lock);
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
- Message msg(*content, session);
+ Message msg(*content);
Subscriber::shared_ptr listener = find(msg.getDestination());
if (!listener) {
QPID_LOG(error, "No listener found for destination " << msg.getDestination());
diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h
index e4b2db23e1..b8c8a4292d 100644
--- a/cpp/src/qpid/client/Execution.h
+++ b/cpp/src/qpid/client/Execution.h
@@ -36,10 +36,6 @@ class Execution
public:
virtual ~Execution() {}
/**
- * Mark the incoming command with the specified id as completed
- */
- virtual void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) = 0;
- /**
* Provides access to the demultiplexing function within the
* session implementation
*/
diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp
index 951996f005..04cee40a37 100644
--- a/cpp/src/qpid/client/LocalQueue.cpp
+++ b/cpp/src/qpid/client/LocalQueue.cpp
@@ -36,8 +36,8 @@ Message LocalQueue::pop() {
throw ClosedException();
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
- Message m(*content, session);
- autoAck.ack(m);
+ Message m(*content);
+ autoAck.ack(m, session);
return m;
}
else
@@ -46,6 +46,7 @@ Message LocalQueue::pop() {
}
void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; }
+AckPolicy& LocalQueue::getAckPolicy() { return autoAck; }
bool LocalQueue::empty() const
{
diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h
index f8b2c2e0b3..58887077f6 100644
--- a/cpp/src/qpid/client/LocalQueue.h
+++ b/cpp/src/qpid/client/LocalQueue.h
@@ -47,9 +47,10 @@ class LocalQueue
bool empty() const;
size_t size() const;
void setAckPolicy(AckPolicy);
+ AckPolicy& getAckPolicy();
private:
- friend class SubscriptionManager;
+ friend class SubscriptionManager;
Session session;
Demux::QueuePtr queue;
AckPolicy autoAck;
diff --git a/cpp/src/qpid/client/Message.cpp b/cpp/src/qpid/client/Message.cpp
index 3d4b9da9fa..58d8fb142a 100644
--- a/cpp/src/qpid/client/Message.cpp
+++ b/cpp/src/qpid/client/Message.cpp
@@ -48,11 +48,6 @@ namespace client {
return getMessageProperties().getApplicationHeaders();
}
- void Message::acknowledge(bool cumulative, bool notifyPeer) const
- {
- const_cast<Session&>(session).getExecution().markCompleted(id, cumulative, notifyPeer);
- }
-
const framing::MessageTransferBody& Message::getMethod() const
{
return method;
@@ -64,13 +59,10 @@ namespace client {
}
/**@internal for incoming messages */
- Message::Message(const framing::FrameSet& frameset, Session s) :
- method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s)
+ Message::Message(const framing::FrameSet& frameset) :
+ method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId())
{
populate(frameset);
}
- /**@internal use for incoming messages. */
- void Message::setSession(Session s) { session=s; }
-
}}
diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h
index 977cc89146..864a16d9ce 100644
--- a/cpp/src/qpid/client/Message.h
+++ b/cpp/src/qpid/client/Message.h
@@ -45,19 +45,15 @@ public:
bool isRedelivered() const;
void setRedelivered(bool redelivered);
framing::FieldTable& getHeaders();
- void acknowledge(bool cumulative = true, bool notifyPeer = true) const;
const framing::MessageTransferBody& getMethod() const;
const framing::SequenceNumber& getId() const;
/**@internal for incoming messages */
- Message(const framing::FrameSet& frameset, Session s);
- /**@internal use for incoming messages. */
- void setSession(Session s);
+ Message(const framing::FrameSet& frameset);
private:
//method and id are only set for received messages:
framing::MessageTransferBody method;
framing::SequenceNumber id;
- Session session;
};
}}
diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp
index d6a7571e9f..4a202842a5 100644
--- a/cpp/src/qpid/client/SessionBase.cpp
+++ b/cpp/src/qpid/client/SessionBase.cpp
@@ -50,6 +50,16 @@ void SessionBase::sync()
impl->send(b).wait(*impl);
}
+void SessionBase::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer)
+{
+ impl->markCompleted(id, cumulative, notifyPeer);
+}
+
+void SessionBase::sendCompletion()
+{
+ impl->sendCompletion();
+}
+
Uuid SessionBase::getId() const { return impl->getId(); }
framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h
index 54484113b1..29a266f4ca 100644
--- a/cpp/src/qpid/client/SessionBase.h
+++ b/cpp/src/qpid/client/SessionBase.h
@@ -131,6 +131,8 @@ class SessionBase
Execution& getExecution();
void sync();
+ void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
+ void sendCompletion();
typedef framing::TransferContent DefaultContent;
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 57f12cf28e..4e3f6cdd98 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -518,6 +518,12 @@ void SessionImpl::flush(bool expected, bool confirmed, bool completed)
void SessionImpl::sendCompletion()
{
+ Lock l(state);
+ sendCompletionImpl();
+}
+
+void SessionImpl::sendCompletionImpl()
+{
proxy.completed(completedIn, true);
}
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 1284670389..eb1c0acb97 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -87,6 +87,7 @@ public:
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
void waitForCompletion(const framing::SequenceNumber& id);
+ void sendCompletion();
//NOTE: these are called by the network thread when the connection is closed or dies
void connectionClosed(uint16_t code, const std::string& text);
@@ -122,7 +123,7 @@ private:
void sendContent(const framing::MethodContent&);
void waitForCompletionImpl(const framing::SequenceNumber& id);
- void sendCompletion();
+ void sendCompletionImpl();
// Note: Following methods are called by network thread in
// response to session controls from the broker
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index b353be481b..2ba3f5fe62 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -87,6 +87,8 @@ void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
+AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; }
+
void SubscriptionManager::cancel(const std::string dest)
{
dispatcher.cancel(dest);
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 48cb725fb8..4ccb95c968 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -133,6 +133,10 @@ class SubscriptionManager : public sys::Runnable
* Default is to acknowledge every message automatically.
*/
void setAckPolicy(const AckPolicy& autoAck);
+ /**
+ *
+ */
+ AckPolicy& getAckPolicy();
};