diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-12-16 21:41:01 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-12-16 21:41:01 +0000 |
commit | c5d4420b0bf574200158ba943d74f9bfd13ad56e (patch) | |
tree | 7420561262882208ce9936f4a6e6e2dad3255709 /cpp | |
parent | 66d295bf47c6c91ecd2b1e7054d44e877429f1ed (diff) | |
download | qpid-python-c5d4420b0bf574200158ba943d74f9bfd13ad56e.tar.gz |
LVQ queue option for no acquire
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@727166 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/QueueOptions.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/QueueOptions.h | 3 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 8 |
5 files changed, 24 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 22659ec26c..4880dda553 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -61,6 +61,7 @@ const std::string qpidNoLocal("no-local"); const std::string qpidTraceIdentity("qpid.trace.id"); const std::string qpidTraceExclude("qpid.trace.exclude"); const std::string qpidLastValueQueue("qpid.last_value_queue"); +const std::string qpidLastValueQueueNoAcquire("qpid.last_value_queue_no_acquire"); const std::string qpidPersistLastNode("qpid.persist_last_node"); const std::string qpidVQMatchProperty("qpid.LVQ_key"); } @@ -79,6 +80,7 @@ Queue::Queue(const string& _name, bool _autodelete, exclusive(0), noLocal(false), lastValueQueue(false), + lastValueQueueNoAcquire(false), persistLastNode(false), inLastNodeFailure(false), persistenceId(0), @@ -213,7 +215,7 @@ bool Queue::acquire(const QueuedMessage& msg) { || (lastValueQueue && (i->position == msg.position) && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) { - clearLVQIndex(msg); + if (!lastValueQueueNoAcquire) clearLVQIndex(msg); messages.erase(i); QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position); return true; @@ -673,6 +675,12 @@ void Queue::configure(const FieldTable& _settings) lastValueQueue= _settings.get(qpidLastValueQueue); if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue"); + lastValueQueueNoAcquire = _settings.get(qpidLastValueQueueNoAcquire); + if (lastValueQueueNoAcquire){ + QPID_LOG(debug, "Configured queue as Last Value Queue No Acquire"); + lastValueQueue = lastValueQueueNoAcquire; + } + persistLastNode= _settings.get(qpidPersistLastNode); if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index c11c03a773..89cd3afc35 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -76,6 +76,7 @@ namespace qpid { OwnershipToken* exclusive; bool noLocal; bool lastValueQueue; + bool lastValueQueueNoAcquire; bool persistLastNode; bool inLastNodeFailure; std::string traceId; diff --git a/cpp/src/qpid/client/QueueOptions.cpp b/cpp/src/qpid/client/QueueOptions.cpp index 66cf8544f0..b360c1ab93 100644 --- a/cpp/src/qpid/client/QueueOptions.cpp +++ b/cpp/src/qpid/client/QueueOptions.cpp @@ -38,6 +38,7 @@ const std::string QueueOptions::strRING_STRICT("ring_strict"); const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue"); const std::string QueueOptions::strPersistLastNode("qpid.persist_last_node"); const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key"); +const std::string QueueOptions::strLastValueQueueNoAcquire("qpid.last_value_queue_no_acquire"); QueueOptions::~QueueOptions() @@ -79,7 +80,9 @@ void QueueOptions::setOrdering(QueueOrderingPolicy op) { if (op == LVQ){ setInt(strLastValueQueue, 1); - }else{ + }else if (op == LVQ_NO_ACQUIRE){ + setInt(strLastValueQueueNoAcquire, 1); + }else { clearOrdering(); } } diff --git a/cpp/src/qpid/client/QueueOptions.h b/cpp/src/qpid/client/QueueOptions.h index e9deb7ead8..c6cb071714 100644 --- a/cpp/src/qpid/client/QueueOptions.h +++ b/cpp/src/qpid/client/QueueOptions.h @@ -27,7 +27,7 @@ namespace qpid { namespace client { enum QueueSizePolicy {NONE, REJECT, FLOW_TO_DISK, RING, RING_STRICT}; -enum QueueOrderingPolicy {FIFO, LVQ}; +enum QueueOrderingPolicy {FIFO, LVQ, LVQ_NO_ACQUIRE}; /** * A help class to set options on the Queue. Create a configured args while @@ -94,6 +94,7 @@ class QueueOptions: public framing::FieldTable static const std::string strLastValueQueue; static const std::string strPersistLastNode; static const std::string strLVQMatchProperty; + static const std::string strLastValueQueueNoAcquire; }; } diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index c60ee6dcb5..fcd46da318 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -370,6 +370,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ intrusive_ptr<Message> msg3 = create_message("e", "C"); intrusive_ptr<Message> msg4 = create_message("e", "D"); intrusive_ptr<Message> msg5 = create_message("e", "F"); + intrusive_ptr<Message> msg6 = create_message("e", "G"); //set deliever match for LVQ a,b,c,a @@ -383,6 +384,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); //enqueue 4 message queue->deliver(msg1); @@ -403,6 +405,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ queue->deliver(msg5); BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); + + // set mode to no acquire and check + args.setOrdering(client::LVQ_NO_ACQUIRE); + queue->configure(args); + queue->deliver(msg6); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); } |