From 4fcd0a1f4d52dffe2c524af06882470dd4a48213 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 11 Oct 2006 08:24:42 +0000 Subject: Implementation of basic_get. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@462729 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/broker/inc/Channel.h | 32 ++++-- cpp/broker/inc/Message.h | 14 ++- cpp/broker/src/Channel.cpp | 53 ++++++++-- cpp/broker/src/Message.cpp | 16 ++- cpp/broker/src/Queue.cpp | 8 +- cpp/broker/src/SessionHandlerImpl.cpp | 10 +- cpp/broker/test/QueueTest.cpp | 179 ++++++++++++++++++++++++++++++++++ cpp/broker/test/queue_test.cpp | 138 -------------------------- 8 files changed, 289 insertions(+), 161 deletions(-) create mode 100644 cpp/broker/test/QueueTest.cpp delete mode 100644 cpp/broker/test/queue_test.cpp (limited to 'cpp') diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h index 4f4d8e2890..a5a54aea1f 100644 --- a/cpp/broker/inc/Channel.h +++ b/cpp/broker/inc/Channel.h @@ -60,12 +60,24 @@ namespace qpid { Queue::shared_ptr queue; string consumerTag; u_int64_t deliveryTag; - - AckRecord(Message::shared_ptr _msg, Queue::shared_ptr _queue, - string _consumerTag, u_int64_t _deliveryTag) : msg(_msg), - queue(_queue), - consumerTag(_consumerTag), - deliveryTag(_deliveryTag){} + bool pull; + + AckRecord(Message::shared_ptr _msg, + Queue::shared_ptr _queue, + const string _consumerTag, + const u_int64_t _deliveryTag) : msg(_msg), + queue(_queue), + consumerTag(_consumerTag), + deliveryTag(_deliveryTag), + pull(false){} + + AckRecord(Message::shared_ptr _msg, + Queue::shared_ptr _queue, + const u_int64_t _deliveryTag) : msg(_msg), + queue(_queue), + consumerTag(""), + deliveryTag(_deliveryTag), + pull(true){} }; typedef std::vector::iterator ack_iterator; @@ -89,12 +101,14 @@ namespace qpid { void operator()(AckRecord& record) const; }; - class AddSize{ + class CalculatePrefetch{ u_int32_t size; + u_int16_t count; public: - AddSize(); + CalculatePrefetch(); void operator()(AckRecord& record); u_int32_t getSize(); + u_int16_t getCount(); }; const int id; @@ -106,6 +120,7 @@ namespace qpid { u_int32_t prefetchSize; u_int16_t prefetchCount; u_int32_t outstandingSize; + u_int16_t outstandingCount; u_int32_t framesize; Message::shared_ptr message; NameGenerator tagGenerator; @@ -136,6 +151,7 @@ namespace qpid { bool exists(const string& consumerTag); void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0); void cancel(const string& tag); + bool get(Queue::shared_ptr queue, bool ackExpected); void begin(); void close(); void commit(); diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h index 7b2c2bc848..94b9aa5bdd 100644 --- a/cpp/broker/inc/Message.h +++ b/cpp/broker/inc/Message.h @@ -49,6 +49,9 @@ namespace qpid { content_list content; u_int64_t size; + void sendContent(qpid::framing::OutputHandler* out, + int channel, u_int32_t framesize); + public: typedef std::tr1::shared_ptr shared_ptr; @@ -61,9 +64,16 @@ namespace qpid { bool isComplete(); const ConnectionToken* const getPublisher(); - void deliver(qpid::framing::OutputHandler* out, int channel, - string& consumerTag, u_int64_t deliveryTag, + void deliver(qpid::framing::OutputHandler* out, + int channel, + const string& consumerTag, + u_int64_t deliveryTag, u_int32_t framesize); + void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize); void redeliver(); qpid::framing::BasicHeaderProperties* getHeaderProperties(); diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp index f887e080a6..ed1125ee76 100644 --- a/cpp/broker/src/Channel.cpp +++ b/cpp/broker/src/Channel.cpp @@ -31,6 +31,7 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out) prefetchCount(0), prefetchSize(0), outstandingSize(0), + outstandingCount(0), framesize(_framesize), transactional(false), deliveryTag(1), @@ -98,6 +99,7 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar if(ackExpected){ unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag)); outstandingSize += msg->contentSize(); + outstandingCount++; } //send deliver method, header and content(s) msg->deliver(out, id, consumerTag, myDeliveryTag, framesize); @@ -162,10 +164,15 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ throw InvalidAckException(); }else if(multiple){ unacknowledged.erase(unacknowledged.begin(), ++i); - //recompute outstandingSize (might in some cases be quicker to add up removed size and subtract from total?): - outstandingSize = for_each(unacknowledged.begin(), unacknowledged.end(), AddSize()).getSize(); + //recompute prefetch outstanding (note: messages delivered through get are ignored) + CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch())); + outstandingSize = calc.getSize(); + outstandingCount = calc.getCount(); }else{ - outstandingSize -= i->msg->contentSize(); + if(!i->pull){ + outstandingSize -= i->msg->contentSize(); + outstandingCount--; + } unacknowledged.erase(i); } @@ -181,6 +188,7 @@ void Channel::recover(bool requeue){ if(requeue){ outstandingSize = 0; + outstandingCount = 0; ack_iterator start(unacknowledged.begin()); ack_iterator end(unacknowledged.end()); for_each(start, end, Requeue()); @@ -190,6 +198,21 @@ void Channel::recover(bool requeue){ } } +bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ + Message::shared_ptr msg = queue->dequeue(); + if(msg){ + Locker locker(deliveryLock); + u_int64_t myDeliveryTag = deliveryTag++; + msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); + if(ackExpected){ + unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag)); + } + return true; + }else{ + return false; + } +} + Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {} bool Channel::MatchAck::operator()(AckRecord& record) const{ @@ -204,15 +227,29 @@ void Channel::Requeue::operator()(AckRecord& record) const{ Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {} void Channel::Redeliver::operator()(AckRecord& record) const{ - record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); + if(record.pull){ + //if message was originally sent as response to get, we must requeue it + record.msg->redeliver(); + record.queue->deliver(record.msg); + }else{ + record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); + } } -Channel::AddSize::AddSize() : size(0){} +Channel::CalculatePrefetch::CalculatePrefetch() : size(0){} -void Channel::AddSize::operator()(AckRecord& record){ - size += record.msg->contentSize(); +void Channel::CalculatePrefetch::operator()(AckRecord& record){ + if(!record.pull){ + //ignore messages that were sent in response to get when calculating prefetch + size += record.msg->contentSize(); + count++; + } } -u_int32_t Channel::AddSize::getSize(){ +u_int32_t Channel::CalculatePrefetch::getSize(){ return size; } + +u_int16_t Channel::CalculatePrefetch::getCount(){ + return count; +} diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp index a44eeaab59..b0e5d16b77 100644 --- a/cpp/broker/src/Message.cpp +++ b/cpp/broker/src/Message.cpp @@ -59,10 +59,24 @@ void Message::redeliver(){ } void Message::deliver(OutputHandler* out, int channel, - string& consumerTag, u_int64_t deliveryTag, + const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize){ out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey))); + sendContent(out, channel, framesize); +} + +void Message::sendGetOk(OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize){ + + out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount))); + sendContent(out, channel, framesize); +} + +void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){ AMQBody::shared_ptr headerBody = static_pointer_cast(header); out->send(new AMQFrame(channel, headerBody)); for(content_iterator i = content.begin(); i != content.end(); i++){ diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp index f7b8605b03..1db4454235 100644 --- a/cpp/broker/src/Queue.cpp +++ b/cpp/broker/src/Queue.cpp @@ -122,7 +122,13 @@ void Queue::cancel(Consumer* c){ } Message::shared_ptr Queue::dequeue(){ - + Locker locker(lock); + Message::shared_ptr msg; + if(!messages.empty()){ + msg = messages.front(); + messages.pop(); + } + return msg; } u_int32_t Queue::purge(){ diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index 857730f3f7..ad73c1b23b 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -338,7 +338,6 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global){ //TODO: handle global - //TODO: channel doesn't do anything with these qos parameters yet parent->getChannel(channel)->setPrefetchSize(prefetchSize); parent->getChannel(channel)->setPrefetchCount(prefetchCount); parent->client.getBasic().qosOk(channel); @@ -349,7 +348,6 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ bool noLocal, bool noAck, bool exclusive, bool nowait){ - //TODO: implement nolocal Queue::shared_ptr queue = parent->getQueue(queueName, channelId); Channel* channel = parent->channels[channelId]; if(!consumerTag.empty() && channel->exists(consumerTag)){ @@ -382,7 +380,13 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t parent->getChannel(channel)->handlePublish(msg); } -void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){} +void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t ticket, string& queueName, bool noAck){ + Queue::shared_ptr queue = parent->getQueue(queueName, channelId); + if(!parent->getChannel(channelId)->get(queue, !noAck)){ + string clusterId;//not used, part of an imatix hack + parent->client.getBasic().getEmpty(channelId, clusterId); + } +} void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ try{ diff --git a/cpp/broker/test/QueueTest.cpp b/cpp/broker/test/QueueTest.cpp new file mode 100644 index 0000000000..973b1b5cf6 --- /dev/null +++ b/cpp/broker/test/QueueTest.cpp @@ -0,0 +1,179 @@ + /* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Queue.h" +#include "QueueRegistry.h" +#include +#include +#include +#include +#include + +using namespace qpid::broker; +using namespace qpid::concurrent; + + +class TestBinding : public virtual Binding{ + bool cancelled; + +public: + TestBinding(); + virtual void cancel(); + bool isCancelled(); +}; + +class TestConsumer : public virtual Consumer{ +public: + Message::shared_ptr last; + + virtual bool deliver(Message::shared_ptr& msg); +}; + + +class QueueTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(QueueTest); + CPPUNIT_TEST(testConsumers); + CPPUNIT_TEST(testBinding); + CPPUNIT_TEST(testRegistry); + CPPUNIT_TEST(testDequeue); + CPPUNIT_TEST_SUITE_END(); + + public: + void testConsumers(){ + Queue::shared_ptr queue(new Queue("my_queue", true, true)); + + //Test adding consumers: + TestConsumer c1; + TestConsumer c2; + queue->consume(&c1); + queue->consume(&c2); + + CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount()); + + //Test basic delivery: + Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); + Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); + Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); + + queue->deliver(msg1); + CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); + + queue->deliver(msg2); + CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get()); + + queue->deliver(msg3); + CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get()); + + //Test cancellation: + queue->cancel(&c1); + CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount()); + queue->cancel(&c2); + CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount()); + } + + void testBinding(){ + Queue::shared_ptr queue(new Queue("my_queue", true, true)); + //Test bindings: + TestBinding a; + TestBinding b; + queue->bound(&a); + queue->bound(&b); + + queue.reset(); + + CPPUNIT_ASSERT(a.isCancelled()); + CPPUNIT_ASSERT(b.isCancelled()); + } + + void testRegistry(){ + //Test use of queues in registry: + QueueRegistry registry; + registry.declare("queue1", true, true); + registry.declare("queue2", true, true); + registry.declare("queue3", true, true); + + CPPUNIT_ASSERT(registry.find("queue1")); + CPPUNIT_ASSERT(registry.find("queue2")); + CPPUNIT_ASSERT(registry.find("queue3")); + + registry.destroy("queue1"); + registry.destroy("queue2"); + registry.destroy("queue3"); + + CPPUNIT_ASSERT(!registry.find("queue1")); + CPPUNIT_ASSERT(!registry.find("queue2")); + CPPUNIT_ASSERT(!registry.find("queue3")); + } + + void testDequeue(){ + Queue::shared_ptr queue(new Queue("my_queue", true, true)); + + Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); + Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); + Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); + Message::shared_ptr received; + + queue->deliver(msg1); + queue->deliver(msg2); + queue->deliver(msg3); + + CPPUNIT_ASSERT_EQUAL(u_int32_t(3), queue->getMessageCount()); + + received = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get()); + CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getMessageCount()); + + received = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get()); + CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getMessageCount()); + + TestConsumer consumer; + queue->consume(&consumer); + queue->dispatch(); + CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get()); + CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount()); + + received = queue->dequeue(); + CPPUNIT_ASSERT(!received); + CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount()); + + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest); + +//TestBinding +TestBinding::TestBinding() : cancelled(false) {} + +void TestBinding::cancel(){ + CPPUNIT_ASSERT(!cancelled); + cancelled = true; +} + +bool TestBinding::isCancelled(){ + return cancelled; +} + +//TestConsumer +bool TestConsumer::deliver(Message::shared_ptr& msg){ + last = msg; + return true; +} + diff --git a/cpp/broker/test/queue_test.cpp b/cpp/broker/test/queue_test.cpp deleted file mode 100644 index aa423e7e08..0000000000 --- a/cpp/broker/test/queue_test.cpp +++ /dev/null @@ -1,138 +0,0 @@ - /* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Queue.h" -#include "QueueRegistry.h" -#include -#include -#include -#include -#include - -using namespace qpid::broker; -using namespace qpid::concurrent; - - -class TestBinding : public virtual Binding{ - bool cancelled; - -public: - TestBinding(); - virtual void cancel(); - bool isCancelled(); -}; - -class TestConsumer : public virtual Consumer{ -public: - Message::shared_ptr last; - - virtual bool deliver(Message::shared_ptr& msg); -}; - - -class QueueTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(QueueTest); - CPPUNIT_TEST(testMe); - CPPUNIT_TEST_SUITE_END(); - - public: - void testMe() - { - Queue::shared_ptr queue(new Queue("my_queue", true, true)); - - //Test adding consumers: - TestConsumer c1; - TestConsumer c2; - queue->consume(&c1); - queue->consume(&c2); - - CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount()); - - //Test basic delivery: - Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); - Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); - Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); - - queue->deliver(msg1); - CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); - - queue->deliver(msg2); - CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get()); - - queue->deliver(msg3); - CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get()); - - //Test cancellation: - queue->cancel(&c1); - CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount()); - queue->cancel(&c2); - CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount()); - - //Test bindings: - TestBinding a; - TestBinding b; - queue->bound(&a); - queue->bound(&b); - - queue.reset(); - - CPPUNIT_ASSERT(a.isCancelled()); - CPPUNIT_ASSERT(b.isCancelled()); - - //Test use of queues in registry: - QueueRegistry registry; - registry.declare("queue1", true, true); - registry.declare("queue2", true, true); - registry.declare("queue3", true, true); - - CPPUNIT_ASSERT(registry.find("queue1")); - CPPUNIT_ASSERT(registry.find("queue2")); - CPPUNIT_ASSERT(registry.find("queue3")); - - registry.destroy("queue1"); - registry.destroy("queue2"); - registry.destroy("queue3"); - - CPPUNIT_ASSERT(!registry.find("queue1")); - CPPUNIT_ASSERT(!registry.find("queue2")); - CPPUNIT_ASSERT(!registry.find("queue3")); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest); - -//TestBinding -TestBinding::TestBinding() : cancelled(false) {} - -void TestBinding::cancel(){ - CPPUNIT_ASSERT(!cancelled); - cancelled = true; -} - -bool TestBinding::isCancelled(){ - return cancelled; -} - -//TestConsumer -bool TestConsumer::deliver(Message::shared_ptr& msg){ - last = msg; - return true; -} - -- cgit v1.2.1