summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-21 14:37:03 +0000
committerGordon Sim <gsim@apache.org>2008-04-21 14:37:03 +0000
commit61647950e1c4e6b1efb0a1b3f3b220783680103f (patch)
treef666cacf0e56079e23ef0a9c881d26baa7d5a1fe /cpp/src
parentceca53c26ab6ed56929dc558b3255bdd83090315 (diff)
downloadqpid-python-61647950e1c4e6b1efb0a1b3f3b220783680103f.tar.gz
QPID-920: send message-accept for acks (as well as completion)
* AckPolicy now maintains a set of transfered messages for cumulative accepts git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@650159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/AckPolicy.h27
-rw-r--r--cpp/src/qpid/client/Channel.cpp9
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp4
-rw-r--r--cpp/src/qpid/client/Execution.h4
-rw-r--r--cpp/src/qpid/client/LocalQueue.cpp5
-rw-r--r--cpp/src/qpid/client/LocalQueue.h3
-rw-r--r--cpp/src/qpid/client/Message.cpp12
-rw-r--r--cpp/src/qpid/client/Message.h6
-rw-r--r--cpp/src/qpid/client/SessionBase.cpp10
-rw-r--r--cpp/src/qpid/client/SessionBase.h2
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp6
-rw-r--r--cpp/src/qpid/client/SessionImpl.h3
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp2
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h4
-rw-r--r--cpp/src/qpid/framing/SequenceSet.cpp26
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp4
-rw-r--r--cpp/src/tests/client_test.cpp5
-rw-r--r--cpp/src/tests/perftest.cpp6
-rw-r--r--cpp/src/tests/topic_listener.cpp2
-rw-r--r--cpp/src/tests/txtest.cpp2
20 files changed, 90 insertions, 52 deletions
diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/AckPolicy.h
index af17539ebe..8d62b6f4f2 100644
--- a/cpp/src/qpid/client/AckPolicy.h
+++ b/cpp/src/qpid/client/AckPolicy.h
@@ -21,6 +21,8 @@
*
*/
+#include "qpid/framing/SequenceSet.h"
+
namespace qpid {
namespace client {
@@ -31,21 +33,36 @@ namespace client {
*/
class AckPolicy
{
+ framing::SequenceSet accepted;
size_t interval;
size_t count;
public:
/**
*@param n: acknowledge every n messages.
- *n==0 means no automatick acknowledgement.
+ *n==0 means no automatic acknowledgement.
*/
AckPolicy(size_t n=1) : interval(n), count(n) {}
- void ack(const Message& msg) {
+ void ack(const Message& msg, Session& session) {
+ accepted.add(msg.getId());
if (!interval) return;
- bool send=(--count==0);
- msg.acknowledge(true, send);
- if (send) count = interval;
+ if (--count==0) {
+ session.markCompleted(msg.getId(), false, true);
+ session.messageAccept(accepted);
+ accepted.clear();
+ count = interval;
+ } else {
+ session.markCompleted(msg.getId(), false, false);
+ }
+ }
+
+ void ackOutstanding(Session& session) {
+ if (!accepted.empty()) {
+ session.messageAccept(accepted);
+ accepted.clear();
+ session.sendCompletion();
+ }
}
};
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index ae9f78483d..f32b5e2614 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -23,6 +23,7 @@
#include <sstream>
#include "Channel.h"
#include "qpid/sys/Monitor.h"
+#include "AckPolicy.h"
#include "Message.h"
#include "Connection.h"
#include "Demux.h"
@@ -202,8 +203,10 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
if (incoming->tryPop(p)) {
msg.populate(*p);
if (ackMode == AUTO_ACK) {
- msg.setSession(session);
- msg.acknowledge(false, true);
+ AckPolicy acker;
+ acker.ack(msg, session);
+ } else {
+ session.markCompleted(msg.getId(), false, false);
}
return true;
}
@@ -260,7 +263,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination)
bool send = i->second.ackMode == AUTO_ACK
|| (prefetch && ++(i->second.count) > (prefetch / 2));
if (send) i->second.count = 0;
- session.getExecution().markCompleted(content.getId(), true, send);
+ session.markCompleted(content.getId(), true, send);
}
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index cc67701748..68e799918c 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -43,7 +43,7 @@ void Subscriber::received(Message& msg)
{
if (listener) {
listener->received(msg);
- autoAck.ack(msg);
+ autoAck.ack(msg, session);
}
}
@@ -72,7 +72,7 @@ void Dispatcher::run()
Mutex::ScopedUnlock u(lock);
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
- Message msg(*content, session);
+ Message msg(*content);
Subscriber::shared_ptr listener = find(msg.getDestination());
if (!listener) {
QPID_LOG(error, "No listener found for destination " << msg.getDestination());
diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h
index e4b2db23e1..b8c8a4292d 100644
--- a/cpp/src/qpid/client/Execution.h
+++ b/cpp/src/qpid/client/Execution.h
@@ -36,10 +36,6 @@ class Execution
public:
virtual ~Execution() {}
/**
- * Mark the incoming command with the specified id as completed
- */
- virtual void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) = 0;
- /**
* Provides access to the demultiplexing function within the
* session implementation
*/
diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp
index 951996f005..04cee40a37 100644
--- a/cpp/src/qpid/client/LocalQueue.cpp
+++ b/cpp/src/qpid/client/LocalQueue.cpp
@@ -36,8 +36,8 @@ Message LocalQueue::pop() {
throw ClosedException();
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
- Message m(*content, session);
- autoAck.ack(m);
+ Message m(*content);
+ autoAck.ack(m, session);
return m;
}
else
@@ -46,6 +46,7 @@ Message LocalQueue::pop() {
}
void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; }
+AckPolicy& LocalQueue::getAckPolicy() { return autoAck; }
bool LocalQueue::empty() const
{
diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h
index f8b2c2e0b3..58887077f6 100644
--- a/cpp/src/qpid/client/LocalQueue.h
+++ b/cpp/src/qpid/client/LocalQueue.h
@@ -47,9 +47,10 @@ class LocalQueue
bool empty() const;
size_t size() const;
void setAckPolicy(AckPolicy);
+ AckPolicy& getAckPolicy();
private:
- friend class SubscriptionManager;
+ friend class SubscriptionManager;
Session session;
Demux::QueuePtr queue;
AckPolicy autoAck;
diff --git a/cpp/src/qpid/client/Message.cpp b/cpp/src/qpid/client/Message.cpp
index 3d4b9da9fa..58d8fb142a 100644
--- a/cpp/src/qpid/client/Message.cpp
+++ b/cpp/src/qpid/client/Message.cpp
@@ -48,11 +48,6 @@ namespace client {
return getMessageProperties().getApplicationHeaders();
}
- void Message::acknowledge(bool cumulative, bool notifyPeer) const
- {
- const_cast<Session&>(session).getExecution().markCompleted(id, cumulative, notifyPeer);
- }
-
const framing::MessageTransferBody& Message::getMethod() const
{
return method;
@@ -64,13 +59,10 @@ namespace client {
}
/**@internal for incoming messages */
- Message::Message(const framing::FrameSet& frameset, Session s) :
- method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s)
+ Message::Message(const framing::FrameSet& frameset) :
+ method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId())
{
populate(frameset);
}
- /**@internal use for incoming messages. */
- void Message::setSession(Session s) { session=s; }
-
}}
diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h
index 977cc89146..864a16d9ce 100644
--- a/cpp/src/qpid/client/Message.h
+++ b/cpp/src/qpid/client/Message.h
@@ -45,19 +45,15 @@ public:
bool isRedelivered() const;
void setRedelivered(bool redelivered);
framing::FieldTable& getHeaders();
- void acknowledge(bool cumulative = true, bool notifyPeer = true) const;
const framing::MessageTransferBody& getMethod() const;
const framing::SequenceNumber& getId() const;
/**@internal for incoming messages */
- Message(const framing::FrameSet& frameset, Session s);
- /**@internal use for incoming messages. */
- void setSession(Session s);
+ Message(const framing::FrameSet& frameset);
private:
//method and id are only set for received messages:
framing::MessageTransferBody method;
framing::SequenceNumber id;
- Session session;
};
}}
diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp
index d6a7571e9f..4a202842a5 100644
--- a/cpp/src/qpid/client/SessionBase.cpp
+++ b/cpp/src/qpid/client/SessionBase.cpp
@@ -50,6 +50,16 @@ void SessionBase::sync()
impl->send(b).wait(*impl);
}
+void SessionBase::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer)
+{
+ impl->markCompleted(id, cumulative, notifyPeer);
+}
+
+void SessionBase::sendCompletion()
+{
+ impl->sendCompletion();
+}
+
Uuid SessionBase::getId() const { return impl->getId(); }
framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); }
diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h
index 54484113b1..29a266f4ca 100644
--- a/cpp/src/qpid/client/SessionBase.h
+++ b/cpp/src/qpid/client/SessionBase.h
@@ -131,6 +131,8 @@ class SessionBase
Execution& getExecution();
void sync();
+ void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
+ void sendCompletion();
typedef framing::TransferContent DefaultContent;
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 57f12cf28e..4e3f6cdd98 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -518,6 +518,12 @@ void SessionImpl::flush(bool expected, bool confirmed, bool completed)
void SessionImpl::sendCompletion()
{
+ Lock l(state);
+ sendCompletionImpl();
+}
+
+void SessionImpl::sendCompletionImpl()
+{
proxy.completed(completedIn, true);
}
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 1284670389..eb1c0acb97 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -87,6 +87,7 @@ public:
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
void waitForCompletion(const framing::SequenceNumber& id);
+ void sendCompletion();
//NOTE: these are called by the network thread when the connection is closed or dies
void connectionClosed(uint16_t code, const std::string& text);
@@ -122,7 +123,7 @@ private:
void sendContent(const framing::MethodContent&);
void waitForCompletionImpl(const framing::SequenceNumber& id);
- void sendCompletion();
+ void sendCompletionImpl();
// Note: Following methods are called by network thread in
// response to session controls from the broker
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index b353be481b..2ba3f5fe62 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -87,6 +87,8 @@ void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
+AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; }
+
void SubscriptionManager::cancel(const std::string dest)
{
dispatcher.cancel(dest);
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 48cb725fb8..4ccb95c968 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -133,6 +133,10 @@ class SubscriptionManager : public sys::Runnable
* Default is to acknowledge every message automatically.
*/
void setAckPolicy(const AckPolicy& autoAck);
+ /**
+ *
+ */
+ AckPolicy& getAckPolicy();
};
diff --git a/cpp/src/qpid/framing/SequenceSet.cpp b/cpp/src/qpid/framing/SequenceSet.cpp
index 1858467de6..4903d92a91 100644
--- a/cpp/src/qpid/framing/SequenceSet.cpp
+++ b/cpp/src/qpid/framing/SequenceSet.cpp
@@ -143,20 +143,24 @@ void SequenceSet::remove(const SequenceNumber& start, const SequenceNumber& end)
void SequenceSet::remove(const SequenceNumber& s)
{
- for (Ranges::iterator i = ranges.begin(); i != ranges.end() && s >= i->start; i++) {
- if (i->start == s) {
+ for (Ranges::iterator i = ranges.begin(); i != ranges.end() && s >= i->start; i++) {
+ if (i->contains(s)) {
if (i->start == i->end) {
- ranges.erase(i);
- } else {
+ //range is just a single number, so we can delete the whole range
+ i = ranges.erase(i);
+ } else if (i->start == s) {
+ //move the start forward to exclude s
++(i->start);
+ } else if (i->end == s) {
+ //move the end backward to exclude s
+ --(i->end);
+ } else {
+ //need to split range pointed to by i
+ Range r(i->start, (uint32_t)s - 1);
+ i->start = s + 1;
+ i = ranges.insert(i, r);
}
- } else if (i->end == s) {
- --(i->end);
- } else if (i->contains(s)) {
- //need to split range pointed to by i
- Range r(i->start, (uint32_t)s - 1);
- i->start = s + 1;
- ranges.insert(i, r);
+ break;
}
}
}
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 9b6e0dce21..a5f7b9d803 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -20,6 +20,7 @@
*/
#include "unit_test.h"
#include "BrokerFixture.h"
+#include "qpid/client/AckPolicy.h"
#include "qpid/client/Dispatcher.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
@@ -124,7 +125,8 @@ QPID_AUTO_TEST_CASE(testTransfer)
BOOST_CHECK(msg->isA<MessageTransferBody>());
BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
//confirm receipt:
- fix.session.getExecution().markCompleted(msg->getId(), true, true);
+ AckPolicy autoAck;
+ autoAck.ack(Message(*msg), fix.session);
}
QPID_AUTO_TEST_CASE(testDispatcher)
diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp
index 011dcd4678..d0da2ec8ca 100644
--- a/cpp/src/tests/client_test.cpp
+++ b/cpp/src/tests/client_test.cpp
@@ -124,10 +124,11 @@ int main(int argc, char** argv)
if (opts.trace) std::cout << "Subscribed to queue." << std::endl;
FrameSet::shared_ptr incoming = session.get();
if (incoming->isA<MessageTransferBody>()) {
- Message msgIn(*incoming, session);
+ Message msgIn(*incoming);
if (msgIn.getData() == msgOut.getData()) {
if (opts.trace) std::cout << "Received the exepected message." << std::endl;
- msgIn.acknowledge();
+ session.messageAccept(SequenceSet(msgIn.getId()));
+ session.markCompleted(msgIn.getId(), true, true);
} else {
print("Received an unexepected message: ", msgIn);
}
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index 966d708ff6..231b25daa4 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -533,9 +533,9 @@ struct SubscribeThread : public Client {
size_t expect=0;
for (size_t i = 0; i < opts.subQuota; ++i) {
msg=lq.pop();
- if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
+ if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
// TODO aconway 2007-11-23: check message order for.
- // multiple publishers. Need an acorray of counters,
+ // multiple publishers. Need an array of counters,
// one per publisher and a publisher ID in the
// message. Careful not to introduce a lot of overhead
// here, e.g. no std::map, std::string etc.
@@ -550,7 +550,7 @@ struct SubscribeThread : public Client {
}
}
if (opts.ack !=0)
- msg.acknowledge(); // Cumulative ack for final batch.
+ subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
AbsTime end=now();
// Report to publisher.
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index 5208b67445..3dd042605e 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -159,7 +159,7 @@ void Listener::received(Message& message){
if(!!type && StringValue("TERMINATION_REQUEST") == *type){
shutdown();
}else if(!!type && StringValue("REPORT_REQUEST") == *type){
- message.acknowledge();//acknowledge everything upto this point
+ mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything upto this point
cout <<"Batch ended, sending report." << endl;
//send a report:
report();
diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp
index 5030b24070..a8369df759 100644
--- a/cpp/src/tests/txtest.cpp
+++ b/cpp/src/tests/txtest.cpp
@@ -144,7 +144,7 @@ struct Transfer : public Client, public Runnable
out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode());
session.messageTransfer(arg::content=out, arg::acceptMode=1);
}
- in.acknowledge();
+ lq.getAckPolicy().ackOutstanding(session);
session.txCommit();
}
} catch(const std::exception& e) {