summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-10-09 19:35:19 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-10-09 19:35:19 +0000
commit016ae5acebab0eaf6dd70f5d4d653fdfee93925d (patch)
treededa91367326dcbf69ed271dc53611338237861a /cpp
parent7852367e6a3e6510b4dbcaec21f023ead7827fa1 (diff)
downloadqpid-python-016ae5acebab0eaf6dd70f5d4d653fdfee93925d.tar.gz
QPID-1306
- from review, clean-up for acquire - test for LVQ acquire - suppress store for LVQ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703236 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp9
-rw-r--r--cpp/src/tests/QueueTest.cpp48
2 files changed, 55 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index b153d14720..ea5f7a0ba9 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -203,6 +203,11 @@ bool Queue::acquire(const QueuedMessage& msg) {
QPID_LOG(debug, "attempting to acquire " << msg.position);
for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
if (i->position == msg.position) {
+ if (lastValueQueue){
+ const framing::FieldTable* ft = msg.payload->getApplicationHeaders();
+ string key = ft->getString(qpidVQMatchProperty);
+ lvq.erase(key);
+ }
messages.erase(i);
QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position);
return true;
@@ -523,7 +528,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
msg->addTraceId(traceId);
}
- if (msg->isPersistent() && store) {
+ if (msg->isPersistent() && store && !lastValueQueue) {
msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
@@ -540,7 +545,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
Mutex::ScopedLock locker(messageLock);
dequeued(msg);
}
- if (msg.payload->isPersistent() && store) {
+ if (msg.payload->isPersistent() && store && !lastValueQueue) {
msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
store->dequeue(ctxt, pmsg, *this);
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index e18a2309fa..a189dc1f15 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -387,6 +387,54 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
}
+QPID_AUTO_TEST_CASE(testLVQAcquire){
+
+ client::QueueOptions args;
+ // set queue mode
+ args.setOrdering(client::LVQ);
+
+ Queue::shared_ptr queue(new Queue("my-queue", true ));
+ queue->configure(args);
+
+ intrusive_ptr<Message> msg1 = message("e", "A");
+ intrusive_ptr<Message> msg2 = message("e", "B");
+ intrusive_ptr<Message> msg3 = message("e", "C");
+ intrusive_ptr<Message> msg4 = message("e", "D");
+ intrusive_ptr<Message> msg5 = message("e", "F");
+
+ //set deliever match for LVQ a,b,c,a
+
+ string key;
+ args.getLVQKey(key);
+ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
+
+
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+ msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+ msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+
+ //enqueue 4 message
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+ queue->deliver(msg3);
+ queue->deliver(msg4);
+
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+
+ framing::SequenceNumber sequence;
+ QueuedMessage qmsg(queue.get(), msg2, ++sequence);
+ queue->acquire(qmsg);
+
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
+
+ queue->deliver(msg5);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+
+}
+
+
QPID_AUTO_TEST_CASE(testLVQSaftyCheck){
// This test is to check std::deque memory copy does not change out under us