diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-09 19:35:19 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-09 19:35:19 +0000 |
commit | 016ae5acebab0eaf6dd70f5d4d653fdfee93925d (patch) | |
tree | deda91367326dcbf69ed271dc53611338237861a /cpp | |
parent | 7852367e6a3e6510b4dbcaec21f023ead7827fa1 (diff) | |
download | qpid-python-016ae5acebab0eaf6dd70f5d4d653fdfee93925d.tar.gz |
QPID-1306
- from review, clean-up for acquire
- test for LVQ acquire
- suppress store for LVQ
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703236 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 9 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 48 |
2 files changed, 55 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index b153d14720..ea5f7a0ba9 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -203,6 +203,11 @@ bool Queue::acquire(const QueuedMessage& msg) { QPID_LOG(debug, "attempting to acquire " << msg.position); for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { if (i->position == msg.position) { + if (lastValueQueue){ + const framing::FieldTable* ft = msg.payload->getApplicationHeaders(); + string key = ft->getString(qpidVQMatchProperty); + lvq.erase(key); + } messages.erase(i); QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position); return true; @@ -523,7 +528,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) msg->addTraceId(traceId); } - if (msg->isPersistent() && store) { + if (msg->isPersistent() && store && !lastValueQueue) { msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); @@ -540,7 +545,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) Mutex::ScopedLock locker(messageLock); dequeued(msg); } - if (msg.payload->isPersistent() && store) { + if (msg.payload->isPersistent() && store && !lastValueQueue) { msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); store->dequeue(ctxt, pmsg, *this); diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index e18a2309fa..a189dc1f15 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -387,6 +387,54 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ } +QPID_AUTO_TEST_CASE(testLVQAcquire){ + + client::QueueOptions args; + // set queue mode + args.setOrdering(client::LVQ); + + Queue::shared_ptr queue(new Queue("my-queue", true )); + queue->configure(args); + + intrusive_ptr<Message> msg1 = message("e", "A"); + intrusive_ptr<Message> msg2 = message("e", "B"); + intrusive_ptr<Message> msg3 = message("e", "C"); + intrusive_ptr<Message> msg4 = message("e", "D"); + intrusive_ptr<Message> msg5 = message("e", "F"); + + //set deliever match for LVQ a,b,c,a + + string key; + args.getLVQKey(key); + BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); + + + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); + msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + + //enqueue 4 message + queue->deliver(msg1); + queue->deliver(msg2); + queue->deliver(msg3); + queue->deliver(msg4); + + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); + + framing::SequenceNumber sequence; + QueuedMessage qmsg(queue.get(), msg2, ++sequence); + queue->acquire(qmsg); + + BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); + + queue->deliver(msg5); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); + +} + + QPID_AUTO_TEST_CASE(testLVQSaftyCheck){ // This test is to check std::deque memory copy does not change out under us |