summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-12-16 21:41:01 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-12-16 21:41:01 +0000
commitc5d4420b0bf574200158ba943d74f9bfd13ad56e (patch)
tree7420561262882208ce9936f4a6e6e2dad3255709 /cpp
parent66d295bf47c6c91ecd2b1e7054d44e877429f1ed (diff)
downloadqpid-python-c5d4420b0bf574200158ba943d74f9bfd13ad56e.tar.gz
LVQ queue option for no acquire
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@727166 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp10
-rw-r--r--cpp/src/qpid/broker/Queue.h1
-rw-r--r--cpp/src/qpid/client/QueueOptions.cpp5
-rw-r--r--cpp/src/qpid/client/QueueOptions.h3
-rw-r--r--cpp/src/tests/QueueTest.cpp8
5 files changed, 24 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 22659ec26c..4880dda553 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -61,6 +61,7 @@ const std::string qpidNoLocal("no-local");
const std::string qpidTraceIdentity("qpid.trace.id");
const std::string qpidTraceExclude("qpid.trace.exclude");
const std::string qpidLastValueQueue("qpid.last_value_queue");
+const std::string qpidLastValueQueueNoAcquire("qpid.last_value_queue_no_acquire");
const std::string qpidPersistLastNode("qpid.persist_last_node");
const std::string qpidVQMatchProperty("qpid.LVQ_key");
}
@@ -79,6 +80,7 @@ Queue::Queue(const string& _name, bool _autodelete,
exclusive(0),
noLocal(false),
lastValueQueue(false),
+ lastValueQueueNoAcquire(false),
persistLastNode(false),
inLastNodeFailure(false),
persistenceId(0),
@@ -213,7 +215,7 @@ bool Queue::acquire(const QueuedMessage& msg) {
|| (lastValueQueue && (i->position == msg.position) &&
msg.payload.get() == checkLvqReplace(*i).payload.get()) ) {
- clearLVQIndex(msg);
+ if (!lastValueQueueNoAcquire) clearLVQIndex(msg);
messages.erase(i);
QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position);
return true;
@@ -673,6 +675,12 @@ void Queue::configure(const FieldTable& _settings)
lastValueQueue= _settings.get(qpidLastValueQueue);
if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue");
+ lastValueQueueNoAcquire = _settings.get(qpidLastValueQueueNoAcquire);
+ if (lastValueQueueNoAcquire){
+ QPID_LOG(debug, "Configured queue as Last Value Queue No Acquire");
+ lastValueQueue = lastValueQueueNoAcquire;
+ }
+
persistLastNode= _settings.get(qpidPersistLastNode);
if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index c11c03a773..89cd3afc35 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -76,6 +76,7 @@ namespace qpid {
OwnershipToken* exclusive;
bool noLocal;
bool lastValueQueue;
+ bool lastValueQueueNoAcquire;
bool persistLastNode;
bool inLastNodeFailure;
std::string traceId;
diff --git a/cpp/src/qpid/client/QueueOptions.cpp b/cpp/src/qpid/client/QueueOptions.cpp
index 66cf8544f0..b360c1ab93 100644
--- a/cpp/src/qpid/client/QueueOptions.cpp
+++ b/cpp/src/qpid/client/QueueOptions.cpp
@@ -38,6 +38,7 @@ const std::string QueueOptions::strRING_STRICT("ring_strict");
const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue");
const std::string QueueOptions::strPersistLastNode("qpid.persist_last_node");
const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key");
+const std::string QueueOptions::strLastValueQueueNoAcquire("qpid.last_value_queue_no_acquire");
QueueOptions::~QueueOptions()
@@ -79,7 +80,9 @@ void QueueOptions::setOrdering(QueueOrderingPolicy op)
{
if (op == LVQ){
setInt(strLastValueQueue, 1);
- }else{
+ }else if (op == LVQ_NO_ACQUIRE){
+ setInt(strLastValueQueueNoAcquire, 1);
+ }else {
clearOrdering();
}
}
diff --git a/cpp/src/qpid/client/QueueOptions.h b/cpp/src/qpid/client/QueueOptions.h
index e9deb7ead8..c6cb071714 100644
--- a/cpp/src/qpid/client/QueueOptions.h
+++ b/cpp/src/qpid/client/QueueOptions.h
@@ -27,7 +27,7 @@ namespace qpid {
namespace client {
enum QueueSizePolicy {NONE, REJECT, FLOW_TO_DISK, RING, RING_STRICT};
-enum QueueOrderingPolicy {FIFO, LVQ};
+enum QueueOrderingPolicy {FIFO, LVQ, LVQ_NO_ACQUIRE};
/**
* A help class to set options on the Queue. Create a configured args while
@@ -94,6 +94,7 @@ class QueueOptions: public framing::FieldTable
static const std::string strLastValueQueue;
static const std::string strPersistLastNode;
static const std::string strLVQMatchProperty;
+ static const std::string strLastValueQueueNoAcquire;
};
}
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index c60ee6dcb5..fcd46da318 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -370,6 +370,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
intrusive_ptr<Message> msg3 = create_message("e", "C");
intrusive_ptr<Message> msg4 = create_message("e", "D");
intrusive_ptr<Message> msg5 = create_message("e", "F");
+ intrusive_ptr<Message> msg6 = create_message("e", "G");
//set deliever match for LVQ a,b,c,a
@@ -383,6 +384,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+ msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
//enqueue 4 message
queue->deliver(msg1);
@@ -403,6 +405,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
queue->deliver(msg5);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+
+ // set mode to no acquire and check
+ args.setOrdering(client::LVQ_NO_ACQUIRE);
+ queue->configure(args);
+ queue->deliver(msg6);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
}