diff options
Diffstat (limited to 'qpid/cpp/tests')
49 files changed, 0 insertions, 5900 deletions
diff --git a/qpid/cpp/tests/.vg-supp b/qpid/cpp/tests/.vg-supp deleted file mode 100644 index b5abdf1385..0000000000 --- a/qpid/cpp/tests/.vg-supp +++ /dev/null @@ -1,18 +0,0 @@ -{ - <insert a suppression name here> - Memcheck:Leak - fun:_Znwj - fun:_ZN4qpid6broker17ReferenceRegistry4openERKSs - fun:_ZN13ReferenceTestC1Ev - fun:_ZN7CppUnit25ConcretTestFixtureFactoryI13ReferenceTestE11makeFixtureEv - fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv - fun:_ZNK7CppUnit23TestSuiteBuilderContextI13ReferenceTestE11makeFixtureEv - fun:_ZN13ReferenceTest15addTestsToSuiteERN7CppUnit27TestSuiteBuilderContextBaseE - fun:_ZN13ReferenceTest5suiteEv - fun:_ZN7CppUnit16TestSuiteFactoryI13ReferenceTestE8makeTestEv - fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE - fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv - obj:/usr/bin/DllPlugInTester - obj:/usr/bin/DllPlugInTester - fun:(below main) -} diff --git a/qpid/cpp/tests/APRBaseTest.cpp b/qpid/cpp/tests/APRBaseTest.cpp deleted file mode 100644 index 7d95c3bf52..0000000000 --- a/qpid/cpp/tests/APRBaseTest.cpp +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <apr/APRBase.h> -#include <qpid_test_plugin.h> -#include <iostream> - -using namespace qpid::sys; - -class APRBaseTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(APRBaseTest); - CPPUNIT_TEST(testMe); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testMe() - { - APRBase::increment(); - APRBase::increment(); - APRBase::decrement(); - APRBase::decrement(); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(APRBaseTest); - diff --git a/qpid/cpp/tests/AccumulatedAckTest.cpp b/qpid/cpp/tests/AccumulatedAckTest.cpp deleted file mode 100644 index 30554f808e..0000000000 --- a/qpid/cpp/tests/AccumulatedAckTest.cpp +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <AccumulatedAck.h> -#include <qpid_test_plugin.h> -#include <iostream> -#include <list> - -using std::list; -using namespace qpid::broker; - -class AccumulatedAckTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(AccumulatedAckTest); - CPPUNIT_TEST(testGeneral); - CPPUNIT_TEST(testCovers); - CPPUNIT_TEST(testUpdateAndConsolidate); - CPPUNIT_TEST_SUITE_END(); - - public: - void testGeneral() - { - AccumulatedAck ack(0); - ack.clear(); - ack.update(3,3); - ack.update(7,7); - ack.update(9,9); - ack.update(1,2); - ack.update(4,5); - ack.update(6,6); - - for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(ack.covers(i)); - CPPUNIT_ASSERT(ack.covers(9)); - - CPPUNIT_ASSERT(!ack.covers(8)); - CPPUNIT_ASSERT(!ack.covers(10)); - - ack.consolidate(); - - for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(ack.covers(i)); - CPPUNIT_ASSERT(ack.covers(9)); - - CPPUNIT_ASSERT(!ack.covers(8)); - CPPUNIT_ASSERT(!ack.covers(10)); - } - - void testCovers() - { - AccumulatedAck ack(5); - ack.individual.push_back(7); - ack.individual.push_back(9); - - CPPUNIT_ASSERT(ack.covers(1)); - CPPUNIT_ASSERT(ack.covers(2)); - CPPUNIT_ASSERT(ack.covers(3)); - CPPUNIT_ASSERT(ack.covers(4)); - CPPUNIT_ASSERT(ack.covers(5)); - CPPUNIT_ASSERT(ack.covers(7)); - CPPUNIT_ASSERT(ack.covers(9)); - - CPPUNIT_ASSERT(!ack.covers(6)); - CPPUNIT_ASSERT(!ack.covers(8)); - CPPUNIT_ASSERT(!ack.covers(10)); - } - - void testUpdateAndConsolidate() - { - AccumulatedAck ack(0); - ack.update(1, 1); - ack.update(3, 3); - ack.update(10, 10); - ack.update(8, 8); - ack.update(6, 6); - ack.update(3, 3); - ack.update(2, 2); - ack.update(0, 5); - ack.consolidate(); - CPPUNIT_ASSERT_EQUAL((uint64_t) 6, ack.range); - CPPUNIT_ASSERT_EQUAL((size_t) 2, ack.individual.size()); - list<uint64_t>::iterator i = ack.individual.begin(); - CPPUNIT_ASSERT_EQUAL((uint64_t) 8, *i); - i++; - CPPUNIT_ASSERT_EQUAL((uint64_t) 10, *i); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(AccumulatedAckTest); - diff --git a/qpid/cpp/tests/BrokerChannelTest.cpp b/qpid/cpp/tests/BrokerChannelTest.cpp deleted file mode 100644 index 66a7138dab..0000000000 --- a/qpid/cpp/tests/BrokerChannelTest.cpp +++ /dev/null @@ -1,357 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <BrokerChannel.h> -#include <BrokerMessage.h> -#include <BrokerQueue.h> -#include <FanOutExchange.h> -#include <NullMessageStore.h> -#include <qpid_test_plugin.h> -#include <iostream> -#include <memory> -#include <AMQP_HighestVersion.h> -#include "AMQFrame.h" -#include "MockChannel.h" -#include "broker/Connection.h" -#include "ProtocolInitiation.h" -#include <boost/ptr_container/ptr_vector.hpp> - -using namespace boost; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; -using std::string; -using std::queue; - -struct MockHandler : ConnectionOutputHandler{ - boost::ptr_vector<AMQFrame> frames; - - void send(AMQFrame* frame){ frames.push_back(frame); } - - void close() {}; -}; - - -class BrokerChannelTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(BrokerChannelTest); - CPPUNIT_TEST(testConsumerMgmt); - CPPUNIT_TEST(testDeliveryNoAck); - CPPUNIT_TEST(testDeliveryAndRecovery); - CPPUNIT_TEST(testStaging); - CPPUNIT_TEST(testQueuePolicy); - CPPUNIT_TEST_SUITE_END(); - - Broker::shared_ptr broker; - Connection connection; - MockHandler handler; - - class MockMessageStore : public NullMessageStore - { - struct MethodCall - { - const string name; - PersistableMessage* msg; - const string data;//only needed for appendContent - - void check(const MethodCall& other) const - { - CPPUNIT_ASSERT_EQUAL(name, other.name); - CPPUNIT_ASSERT_EQUAL(msg, other.msg); - CPPUNIT_ASSERT_EQUAL(data, other.data); - } - }; - - queue<MethodCall> expected; - bool expectMode;//true when setting up expected calls - - void handle(const MethodCall& call) - { - if (expectMode) { - expected.push(call); - } else { - call.check(expected.front()); - expected.pop(); - } - } - - void handle(const string& name, PersistableMessage* msg, const string& data) - { - MethodCall call = {name, msg, data}; - handle(call); - } - - public: - - MockMessageStore() : expectMode(false) {} - - void stage(PersistableMessage& msg) - { - if(!expectMode) msg.setPersistenceId(1); - MethodCall call = {"stage", &msg, ""}; - handle(call); - } - - void appendContent(PersistableMessage& msg, const string& data) - { - MethodCall call = {"appendContent", &msg, data}; - handle(call); - } - - // Don't hide overloads. - using NullMessageStore::destroy; - - void destroy(PersistableMessage& msg) - { - MethodCall call = {"destroy", &msg, ""}; - handle(call); - } - - void expect() - { - expectMode = true; - } - - void test() - { - expectMode = false; - } - - void check() - { - CPPUNIT_ASSERT(expected.empty()); - } - }; - - - public: - - BrokerChannelTest() : - broker(Broker::create()), - connection(&handler, *broker) - { - connection.initiated(ProtocolInitiation()); - } - - - void testConsumerMgmt(){ - Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(connection, 0, 0, 0); - channel.open(); - CPPUNIT_ASSERT(!channel.exists("my_consumer")); - - ConnectionToken* owner = 0; - string tag("my_consumer"); - channel.consume(tag, queue, false, false, owner); - string tagA; - string tagB; - channel.consume(tagA, queue, false, false, owner); - channel.consume(tagB, queue, false, false, owner); - CPPUNIT_ASSERT_EQUAL((uint32_t) 3, queue->getConsumerCount()); - CPPUNIT_ASSERT(channel.exists("my_consumer")); - CPPUNIT_ASSERT(channel.exists(tagA)); - CPPUNIT_ASSERT(channel.exists(tagB)); - channel.cancel(tagA); - CPPUNIT_ASSERT_EQUAL((uint32_t) 2, queue->getConsumerCount()); - CPPUNIT_ASSERT(channel.exists("my_consumer")); - CPPUNIT_ASSERT(!channel.exists(tagA)); - CPPUNIT_ASSERT(channel.exists(tagB)); - channel.close(); - CPPUNIT_ASSERT_EQUAL((uint32_t) 0, queue->getConsumerCount()); - } - - void testDeliveryNoAck(){ - Channel channel(connection, 7, 10000); - channel.open(); - const string data("abcdefghijklmn"); - Message::shared_ptr msg( - createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); - - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel()); - CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( - handler.frames[0].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( - handler.frames[1].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( - handler.frames[2].getBody().get())); - AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( - handler.frames[3].getBody().get()); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); - } - - void testDeliveryAndRecovery(){ - Channel channel(connection, 7, 10000); - channel.open(); - const string data("abcdefghijklmn"); - - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("ack"); - channel.consume(tag, queue, true, false, owner); - - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel()); - CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( - handler.frames[0].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( - handler.frames[1].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( - handler.frames[2].getBody().get())); - AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( - handler.frames[3].getBody().get()); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); - } - - void testStaging(){ - MockMessageStore store; - Channel channel( - connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); - const string data[] = {"abcde", "fghij", "klmno"}; - - Message* msg = new BasicMessage( - 0, "my_exchange", "my_routing_key", false, false, - MockChannel::basicGetBody()); - - store.expect(); - store.stage(*msg); - for (int i = 0; i < 3; i++) { - store.appendContent(*msg, data[i]); - } - store.destroy(*msg); - store.test(); - - Exchange::shared_ptr exchange = - broker->getExchanges().declare("my_exchange", "fanout").first; - Queue::shared_ptr queue(new Queue("my_queue")); - exchange->bind(queue, "", 0); - - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - uint64_t contentSize(0); - for (int i = 0; i < 3; i++) { - contentSize += data[i].size(); - } - header->setContentSize(contentSize); - channel.handlePublish(msg); - channel.handleHeader(header); - - for (int i = 0; i < 3; i++) { - AMQContentBody::shared_ptr body(new AMQContentBody(data[i])); - channel.handleContent(body); - } - Message::shared_ptr msg2 = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg, msg2.get()); - msg2.reset();//should trigger destroy call - - store.check(); - } - - - //NOTE: strictly speaking this should/could be part of QueueTest, - //but as it can usefully use the same utility classes as this - //class it is defined here for simpllicity - void testQueuePolicy() - { - MockMessageStore store; - {//must ensure that store is last thing deleted as it is needed by destructor of lazy loaded content - const string data1("abcd"); - const string data2("efghijk"); - const string data3("lmnopqrstuvwxyz"); - Message::shared_ptr msg1(createMessage("e", "A", "MsgA", data1.size())); - Message::shared_ptr msg2(createMessage("e", "B", "MsgB", data2.size())); - Message::shared_ptr msg3(createMessage("e", "C", "MsgC", data3.size())); - addContent(msg1, data1); - addContent(msg2, data2); - addContent(msg3, data3); - - QueuePolicy policy(2, 0);//third message should be stored on disk and lazy loaded - FieldTable settings; - policy.update(settings); - - store.expect(); - store.stage(*msg3); - store.destroy(*msg3); - store.test(); - - Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0)); - queue->configure(settings);//set policy - queue->deliver(msg1); - queue->deliver(msg2); - queue->deliver(msg3); - - Message::shared_ptr next = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg1, next); - CPPUNIT_ASSERT_EQUAL((uint32_t) data1.size(), next->encodedContentSize()); - next = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg2, next); - CPPUNIT_ASSERT_EQUAL((uint32_t) data2.size(), next->encodedContentSize()); - next = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg3, next); - CPPUNIT_ASSERT_EQUAL((uint32_t) 0, next->encodedContentSize()); - - next.reset(); - msg1.reset(); - msg2.reset(); - msg3.reset();//must clear all references to messages to allow them to be destroyed - - } - store.check(); - } - - Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) - { - BasicMessage* msg = new BasicMessage( - 0, exchange, routingKey, false, false, - MockChannel::basicGetBody()); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(contentSize); - msg->setHeader(header); - msg->getHeaderProperties()->setMessageId(messageId); - return msg; - } - - void addContent(Message::shared_ptr msg, const string& data) - { - AMQContentBody::shared_ptr body(new AMQContentBody(data)); - msg->addContent(body); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(BrokerChannelTest); diff --git a/qpid/cpp/tests/ClientChannelTest.cpp b/qpid/cpp/tests/ClientChannelTest.cpp deleted file mode 100644 index d041106a23..0000000000 --- a/qpid/cpp/tests/ClientChannelTest.cpp +++ /dev/null @@ -1,193 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <vector> -#include "qpid_test_plugin.h" -#include "InProcessBroker.h" -#include "ClientChannel.h" -#include "ClientMessage.h" -#include "ClientQueue.h" -#include "ClientExchange.h" -#include "client/MessageListener.h" - -using namespace std; -using namespace boost; -using namespace qpid::client; -using namespace qpid::sys; -using namespace qpid::framing; - -/// Small frame size so we can create fragmented messages. -const size_t FRAME_MAX = 256; - - -/** - * Test client API using an in-process broker. - */ -class ClientChannelTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ClientChannelTest); - CPPUNIT_TEST(testPublishGet); - CPPUNIT_TEST(testGetNoContent); - CPPUNIT_TEST(testConsumeCancel); - CPPUNIT_TEST(testConsumePublished); - CPPUNIT_TEST(testGetFragmentedMessage); - CPPUNIT_TEST(testConsumeFragmentedMessage); - CPPUNIT_TEST_SUITE_END(); - - struct Listener: public qpid::client::MessageListener { - vector<Message> messages; - Monitor monitor; - void received(Message& msg) { - Mutex::ScopedLock l(monitor); - messages.push_back(msg); - monitor.notifyAll(); - } - }; - - InProcessBrokerClient connection; // client::connection + local broker - Channel channel; - const std::string qname; - const std::string data; - Queue queue; - Exchange exchange; - Listener listener; - - public: - - ClientChannelTest() - : connection(FRAME_MAX), - qname("testq"), data("hello"), - queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE) - { - connection.openChannel(channel); - CPPUNIT_ASSERT(channel.getId() != 0); - channel.declareQueue(queue); - } - - void testPublishGet() { - Message pubMsg(data); - pubMsg.getHeaders().setString("hello", "world"); - channel.publish(pubMsg, exchange, qname); - Message getMsg; - CPPUNIT_ASSERT(channel.get(getMsg, queue)); - CPPUNIT_ASSERT_EQUAL(data, getMsg.getData()); - CPPUNIT_ASSERT_EQUAL(string("world"), - getMsg.getHeaders().getString("hello")); - CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue - } - - void testGetNoContent() { - Message pubMsg; - pubMsg.getHeaders().setString("hello", "world"); - channel.publish(pubMsg, exchange, qname); - Message getMsg; - CPPUNIT_ASSERT(channel.get(getMsg, queue)); - CPPUNIT_ASSERT(getMsg.getData().empty()); - CPPUNIT_ASSERT_EQUAL(string("world"), - getMsg.getHeaders().getString("hello")); - } - - void testConsumeCancel() { - string tag; // Broker assigned - channel.consume(queue, tag, &listener); - channel.start(); - CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); - channel.publish(Message("a"), exchange, qname); - { - Mutex::ScopedLock l(listener.monitor); - Time deadline(now() + 1*TIME_SEC); - while (listener.messages.size() != 1) { - CPPUNIT_ASSERT(listener.monitor.wait(deadline)); - } - } - CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size()); - CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData()); - - channel.publish(Message("b"), exchange, qname); - channel.publish(Message("c"), exchange, qname); - { - Mutex::ScopedLock l(listener.monitor); - while (listener.messages.size() != 3) { - CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); - } - } - CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size()); - CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData()); - CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData()); - - channel.cancel(tag); - channel.publish(Message("d"), exchange, qname); - CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size()); - { - Mutex::ScopedLock l(listener.monitor); - CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2)); - } - Message msg; - CPPUNIT_ASSERT(channel.get(msg, queue)); - CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData()); - } - - // Consume already-published messages - void testConsumePublished() { - Message pubMsg("x"); - pubMsg.getHeaders().setString("y", "z"); - channel.publish(pubMsg, exchange, qname); - string tag; - channel.consume(queue, tag, &listener); - CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); - channel.start(); - { - Mutex::ScopedLock l(listener.monitor); - while (listener.messages.size() != 1) - CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); - } - CPPUNIT_ASSERT_EQUAL(string("x"), listener.messages[0].getData()); - CPPUNIT_ASSERT_EQUAL(string("z"), - listener.messages[0].getHeaders().getString("y")); - } - - void testGetFragmentedMessage() { - string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size. - channel.publish(Message(longStr), exchange, qname); - Message getMsg; - CPPUNIT_ASSERT(channel.get(getMsg, queue)); - } - - void testConsumeFragmentedMessage() { - string xx(FRAME_MAX*2, 'x'); - channel.publish(Message(xx), exchange, qname); - channel.start(); - string tag; - channel.consume(queue, tag, &listener); - string yy(FRAME_MAX*2, 'y'); - channel.publish(Message(yy), exchange, qname); - { - Mutex::ScopedLock l(listener.monitor); - while (listener.messages.size() != 2) - CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); - } - CPPUNIT_ASSERT_EQUAL(xx, listener.messages[0].getData()); - CPPUNIT_ASSERT_EQUAL(yy, listener.messages[1].getData()); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ClientChannelTest); diff --git a/qpid/cpp/tests/ConfigurationTest.cpp b/qpid/cpp/tests/ConfigurationTest.cpp deleted file mode 100644 index 3a1d5ba85d..0000000000 --- a/qpid/cpp/tests/ConfigurationTest.cpp +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <Configuration.h> -#include <qpid_test_plugin.h> -#include <iostream> - -using namespace std; -using namespace qpid::broker; - -class ConfigurationTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ConfigurationTest); - CPPUNIT_TEST(testIsHelp); - CPPUNIT_TEST(testPortLongForm); - CPPUNIT_TEST(testPortShortForm); - CPPUNIT_TEST(testStore); - CPPUNIT_TEST(testStagingThreshold); - CPPUNIT_TEST(testVarious); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testIsHelp() - { - Configuration conf; - char* argv[] = {"ignore", "--help"}; - conf.parse("ignore", 2, argv); - CPPUNIT_ASSERT(conf.isHelp()); - } - - void testPortLongForm() - { - Configuration conf; - char* argv[] = {"ignore", "--port", "6789"}; - conf.parse("ignore", 3, argv); - CPPUNIT_ASSERT_EQUAL(6789, conf.getPort()); - } - - void testPortShortForm() - { - Configuration conf; - char* argv[] = {"ignore", "-p", "6789"}; - conf.parse("ignore", 3, argv); - CPPUNIT_ASSERT_EQUAL(6789, conf.getPort()); - } - - void testStore() - { - Configuration conf; - char* argv[] = {"ignore", "--store", "my-store-module.so"}; - conf.parse("ignore", 3, argv); - std::string expected("my-store-module.so"); - CPPUNIT_ASSERT_EQUAL(expected, conf.getStore()); - } - - void testStagingThreshold() - { - Configuration conf; - char* argv[] = {"ignore", "--staging-threshold", "123456789"}; - conf.parse("ignore", 3, argv); - long expected = 123456789; - CPPUNIT_ASSERT_EQUAL(expected, conf.getStagingThreshold()); - } - - void testVarious() - { - Configuration conf; - char* argv[] = {"ignore", "-t", "--worker-threads", "10"}; - conf.parse("ignore", 4, argv); - CPPUNIT_ASSERT_EQUAL(5672, conf.getPort());//default - CPPUNIT_ASSERT_EQUAL(10, conf.getWorkerThreads()); - CPPUNIT_ASSERT(conf.isTrace()); - CPPUNIT_ASSERT(!conf.isHelp()); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ConfigurationTest); - diff --git a/qpid/cpp/tests/EventChannelConnectionTest.cpp b/qpid/cpp/tests/EventChannelConnectionTest.cpp deleted file mode 100644 index 66561daf83..0000000000 --- a/qpid/cpp/tests/EventChannelConnectionTest.cpp +++ /dev/null @@ -1,109 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <iostream> -#include <boost/bind.hpp> -#include "framing/AMQHeartbeatBody.h" -#include "framing/AMQFrame.h" -#include "sys/posix/EventChannelConnection.h" -#include "sys/ConnectionInputHandler.h" -#include "sys/ConnectionInputHandlerFactory.h" -#include "sys/Socket.h" -#include "qpid_test_plugin.h" -#include "MockConnectionInputHandler.h" - -using namespace qpid::sys; -using namespace qpid::framing; -using namespace std; - -class EventChannelConnectionTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(EventChannelConnectionTest); - CPPUNIT_TEST(testSendReceive); - CPPUNIT_TEST(testCloseExternal); - CPPUNIT_TEST(testCloseException); - CPPUNIT_TEST_SUITE_END(); - - public: - - void setUp() { - threads = EventChannelThreads::create(); - CPPUNIT_ASSERT_EQUAL(0, ::pipe(pipe)); - connection.reset( - new EventChannelConnection(threads, factory, pipe[0], pipe[1])); - CPPUNIT_ASSERT(factory.handler != 0); - } - - void tearDown() { - threads->shutdown(); - threads->join(); - } - - void testSendReceive() - { - // Send a protocol initiation. - Buffer buf(1024); - ProtocolInitiation(4,2).encode(buf); - buf.flip(); - ssize_t n = write(pipe[1], buf.start(), buf.available()); - CPPUNIT_ASSERT_EQUAL(ssize_t(buf.available()), n); - - // Verify session handler got the protocol init. - ProtocolInitiation init = factory.handler->waitForProtocolInit(); - CPPUNIT_ASSERT_EQUAL(int(4), int(init.getMajor())); - CPPUNIT_ASSERT_EQUAL(int(2), int(init.getMinor())); - - // Send a heartbeat frame, verify connection got it. - connection->send(new AMQFrame(42, new AMQHeartbeatBody())); - AMQFrame frame = factory.handler->waitForFrame(); - CPPUNIT_ASSERT_EQUAL(uint16_t(42), frame.getChannel()); - CPPUNIT_ASSERT_EQUAL(uint8_t(HEARTBEAT_BODY), - frame.getBody()->type()); - threads->shutdown(); - } - - // Make sure the handler is closed if the connection is closed. - void testCloseExternal() { - connection->close(); - factory.handler->waitForClosed(); - } - - // Make sure the handler is closed if the connection closes or fails. - // TODO aconway 2006-12-18: logs exception message in test output. - void testCloseException() { - ::close(pipe[0]); - ::close(pipe[1]); - // TODO aconway 2006-12-18: Shouldn't this be failing? - connection->send(new AMQFrame(42, new AMQHeartbeatBody())); - factory.handler->waitForClosed(); - } - - private: - EventChannelThreads::shared_ptr threads; - int pipe[2]; - std::auto_ptr<EventChannelConnection> connection; - MockConnectionInputHandlerFactory factory; -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelConnectionTest); - diff --git a/qpid/cpp/tests/EventChannelTest.cpp b/qpid/cpp/tests/EventChannelTest.cpp deleted file mode 100644 index 8e5c724a15..0000000000 --- a/qpid/cpp/tests/EventChannelTest.cpp +++ /dev/null @@ -1,187 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <posix/EventChannel.h> -#include <posix/check.h> -#include <sys/Runnable.h> -#include <sys/Socket.h> -#include <sys/Thread.h> -#include <qpid_test_plugin.h> - -#include <sys/socket.h> -#include <signal.h> -#include <netinet/in.h> -#include <netdb.h> -#include <iostream> - -using namespace qpid::sys; - - -const char hello[] = "hello"; -const size_t size = sizeof(hello); - -struct RunMe : public Runnable -{ - bool ran; - RunMe() : ran(false) {} - void run() { ran = true; } -}; - -class EventChannelTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(EventChannelTest); - CPPUNIT_TEST(testEvent); - CPPUNIT_TEST(testRead); - CPPUNIT_TEST(testFailedRead); - CPPUNIT_TEST(testWrite); - CPPUNIT_TEST(testFailedWrite); - CPPUNIT_TEST(testReadWrite); - CPPUNIT_TEST(testAccept); - CPPUNIT_TEST_SUITE_END(); - - private: - EventChannel::shared_ptr ec; - int pipe[2]; - char readBuf[size]; - - public: - - void setUp() - { - memset(readBuf, size, 0); - ec = EventChannel::create(); - if (::pipe(pipe) != 0) throw QPID_POSIX_ERROR(errno); - // Ignore SIGPIPE, otherwise we will crash writing to broken pipe. - signal(SIGPIPE, SIG_IGN); - } - - // Verify that calling getEvent returns event. - template <class T> bool isNextEvent(T& event) - { - return &event == dynamic_cast<T*>(ec->getEvent()); - } - - template <class T> bool isNextEventOk(T& event) - { - Event* next = ec->getEvent(); - if (next) next->throwIfError(); - return &event == next; - } - - void testEvent() - { - RunMe runMe; - CPPUNIT_ASSERT(!runMe.ran); - // Instances of Event just pass thru the channel immediately. - Event e(runMe.functor()); - ec->postEvent(e); - CPPUNIT_ASSERT(isNextEventOk(e)); - e.dispatch(); - CPPUNIT_ASSERT(runMe.ran); - } - - void testRead() { - ReadEvent re(pipe[0], readBuf, size); - ec->postEvent(re); - CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::write(pipe[1], hello, size)); - CPPUNIT_ASSERT(isNextEventOk(re)); - CPPUNIT_ASSERT_EQUAL(size, re.getSize()); - CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); - } - - void testFailedRead() - { - ReadEvent re(pipe[0], readBuf, size); - ec->postEvent(re); - - // EOF before all data read. - ::close(pipe[1]); - CPPUNIT_ASSERT(isNextEvent(re)); - CPPUNIT_ASSERT(re.hasError()); - try { - re.throwIfError(); - CPPUNIT_FAIL("Expected QpidError."); - } - catch (const qpid::QpidError&) { } - - // Bad file descriptor. Note in this case we fail - // in postEvent and throw immediately. - try { - ReadEvent bad; - ec->postEvent(bad); - CPPUNIT_FAIL("Expected QpidError."); - } - catch (const qpid::QpidError&) { } - } - - void testWrite() { - WriteEvent wr(pipe[1], hello, size); - ec->postEvent(wr); - CPPUNIT_ASSERT(isNextEventOk(wr)); - CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::read(pipe[0], readBuf, size));; - CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); - } - - void testFailedWrite() { - WriteEvent wr(pipe[1], hello, size); - ::close(pipe[0]); - ec->postEvent(wr); - CPPUNIT_ASSERT(isNextEvent(wr)); - CPPUNIT_ASSERT(wr.hasError()); - } - - void testReadWrite() - { - ReadEvent re(pipe[0], readBuf, size); - WriteEvent wr(pipe[1], hello, size); - ec->postEvent(re); - ec->postEvent(wr); - ec->getEvent(); - ec->getEvent(); - CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); - } - - void testAccept() { - Socket s = Socket::createTcp(); - int port = s.listen(0, 10); - CPPUNIT_ASSERT(port != 0); - - AcceptEvent ae(s.fd()); - ec->postEvent(ae); - Socket client = Socket::createTcp(); - client.connect("localhost", port); - CPPUNIT_ASSERT(isNextEvent(ae)); - ae.dispatch(); - - // Verify client writes are read by the accepted descriptor. - char readBuf[size]; - ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size); - ec->postEvent(re); - CPPUNIT_ASSERT_EQUAL(ssize_t(size), client.send(hello, sizeof(hello))); - CPPUNIT_ASSERT(isNextEvent(re)); - re.dispatch(); - CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelTest); - diff --git a/qpid/cpp/tests/EventChannelThreadsTest.cpp b/qpid/cpp/tests/EventChannelThreadsTest.cpp deleted file mode 100644 index 285ed29518..0000000000 --- a/qpid/cpp/tests/EventChannelThreadsTest.cpp +++ /dev/null @@ -1,247 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <boost/bind.hpp> - -#include <sys/Socket.h> -#include <posix/EventChannelThreads.h> -#include <qpid_test_plugin.h> - - -using namespace std; - -using namespace qpid::sys; - -const int nConnections = 5; -const int nMessages = 10; // Messages read/written per connection. - - -// Accepts + reads + writes. -const int totalEvents = nConnections+2*nConnections*nMessages; - -/** - * Messages are numbered 0..nMessages. - * We count the total number of events, and the - * number of reads and writes for each message number. - */ -class TestResults : public Monitor { - public: - TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {} - - void countEvent() { - if (--nEventsRemaining == 0) - shutdown(); - } - - void countRead(int messageNo) { - ++reads[messageNo]; - countEvent(); - } - - void countWrite(int messageNo) { - ++writes[messageNo]; - countEvent(); - } - - void shutdown(const std::string& exceptionMsg = std::string()) { - ScopedLock lock(*this); - exception = exceptionMsg; - isShutdown = true; - notifyAll(); - } - - void wait() { - ScopedLock lock(*this); - Time deadline = now() + 10*TIME_SEC; - while (!isShutdown) { - CPPUNIT_ASSERT(Monitor::wait(deadline)); - } - } - - bool isShutdown; - std::string exception; - AtomicCount reads[nMessages]; - AtomicCount writes[nMessages]; - AtomicCount nEventsRemaining; -}; - -TestResults results; - -EventChannelThreads::shared_ptr threads; - -// Functor to wrap callbacks in try/catch. -class SafeCallback { - public: - SafeCallback(Runnable& r) : callback(r.functor()) {} - SafeCallback(Event::Callback cb) : callback(cb) {} - - void operator()() { - std::string exception; - try { - callback(); - return; - } - catch (const std::exception& e) { - exception = e.what(); - } - catch (...) { - exception = "Unknown exception."; - } - results.shutdown(exception); - } - - private: - Event::Callback callback; -}; - -/** Repost an event N times. */ -class Repost { - public: - Repost(int n) : count (n) {} - virtual ~Repost() {} - - void repost(Event* event) { - if (--count==0) { - delete event; - } else { - threads->postEvent(event); - } - } - private: - int count; -}; - - - -/** Repeating read event. */ -class TestReadEvent : public ReadEvent, public Runnable, private Repost { - public: - explicit TestReadEvent(int fd=-1) : - ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)), - Repost(nMessages) - {} - - void run() { - CPPUNIT_ASSERT_EQUAL(sizeof(value), getSize()); - CPPUNIT_ASSERT(0 <= value); - CPPUNIT_ASSERT(value < nMessages); - results.countRead(value); - repost(this); - } - - private: - int value; - ReadEvent original; -}; - - -/** Fire and forget write event */ -class TestWriteEvent : public WriteEvent, public Runnable, private Repost { - public: - TestWriteEvent(int fd=-1) : - WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)), - Repost(nMessages), - value(0) - {} - - void run() { - CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize()); - results.countWrite(value++); - repost(this); - } - - private: - int value; -}; - -/** Fire-and-forget Accept event, posts reads on the accepted connection. */ -class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost { - public: - TestAcceptEvent(int fd=-1) : - AcceptEvent(fd, SafeCallback(*this)), - Repost(nConnections) - {} - - void run() { - threads->postEvent(new TestReadEvent(getAcceptedDesscriptor())); - results.countEvent(); - repost(this); - } -}; - -class EventChannelThreadsTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(EventChannelThreadsTest); - CPPUNIT_TEST(testThreads); - CPPUNIT_TEST_SUITE_END(); - - public: - - void setUp() { - threads = EventChannelThreads::create(EventChannel::create()); - } - - void tearDown() { - threads.reset(); - } - - void testThreads() - { - Socket listener = Socket::createTcp(); - int port = listener.listen(); - - // Post looping accept events, will repost nConnections times. - // The accept event will automatically post read events. - threads->postEvent(new TestAcceptEvent(listener.fd())); - - // Make connections. - Socket connections[nConnections]; - for (int i = 0; i < nConnections; ++i) { - connections[i] = Socket::createTcp(); - connections[i].connect("localhost", port); - } - - // Post looping write events. - for (int i = 0; i < nConnections; ++i) { - threads->postEvent(new TestWriteEvent(connections[i].fd())); - } - - // Wait for all events to be dispatched. - results.wait(); - - if (!results.exception.empty()) CPPUNIT_FAIL(results.exception); - CPPUNIT_ASSERT_EQUAL(0, int(results.nEventsRemaining)); - - // Expect a read and write for each messageNo from each connection. - for (int messageNo = 0; messageNo < nMessages; ++messageNo) { - CPPUNIT_ASSERT_EQUAL(nConnections, int(results.reads[messageNo])); - CPPUNIT_ASSERT_EQUAL(nConnections, int(results.writes[messageNo])); - } - - threads->shutdown(); - threads->join(); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelThreadsTest); - diff --git a/qpid/cpp/tests/ExchangeTest.cpp b/qpid/cpp/tests/ExchangeTest.cpp deleted file mode 100644 index cccec92024..0000000000 --- a/qpid/cpp/tests/ExchangeTest.cpp +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <DeliverableMessage.h> -#include <DirectExchange.h> -#include <BrokerExchange.h> -#include <BrokerQueue.h> -#include <TopicExchange.h> -#include <qpid_test_plugin.h> -#include <iostream> -#include "BasicGetBody.h" - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -class ExchangeTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ExchangeTest); - CPPUNIT_TEST(testMe); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testMe() - { - Queue::shared_ptr queue(new Queue("queue", true)); - Queue::shared_ptr queue2(new Queue("queue2", true)); - - TopicExchange topic("topic"); - topic.bind(queue, "abc", 0); - topic.bind(queue2, "abc", 0); - - DirectExchange direct("direct"); - direct.bind(queue, "abc", 0); - direct.bind(queue2, "abc", 0); - - queue.reset(); - queue2.reset(); - - Message::shared_ptr msgPtr( - new BasicMessage( - 0, "e", "A", true, true, - AMQMethodBody::shared_ptr( - new BasicGetBody(ProtocolVersion())))); - DeliverableMessage msg(msgPtr); - topic.route(msg, "abc", 0); - direct.route(msg, "abc", 0); - - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ExchangeTest); diff --git a/qpid/cpp/tests/FieldTableTest.cpp b/qpid/cpp/tests/FieldTableTest.cpp deleted file mode 100644 index 8d9285bf4b..0000000000 --- a/qpid/cpp/tests/FieldTableTest.cpp +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <amqp_framing.h> -#include <qpid_test_plugin.h> - -using namespace qpid::framing; - -class FieldTableTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(FieldTableTest); - CPPUNIT_TEST(testMe); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testMe() - { - FieldTable ft; - ft.setString("A", "BCDE"); - CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft.getString("A")); - - Buffer buffer(100); - buffer.putFieldTable(ft); - buffer.flip(); - FieldTable ft2; - buffer.getFieldTable(ft2); - CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft2.getString("A")); - - } -}; - - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(FieldTableTest); - diff --git a/qpid/cpp/tests/FramingTest.cpp b/qpid/cpp/tests/FramingTest.cpp deleted file mode 100644 index f8754337c8..0000000000 --- a/qpid/cpp/tests/FramingTest.cpp +++ /dev/null @@ -1,381 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <memory> -#include <boost/lexical_cast.hpp> - -#include <ConnectionRedirectBody.h> -#include <ProtocolVersion.h> -#include <amqp_framing.h> -#include <iostream> -#include <qpid_test_plugin.h> -#include <sstream> -#include <typeinfo> -#include <QpidError.h> -#include <AMQP_HighestVersion.h> -#include "AMQRequestBody.h" -#include "AMQResponseBody.h" -#include "Requester.h" -#include "Responder.h" -#include "InProcessBroker.h" -#include "client/Connection.h" -#include "client/ClientExchange.h" -#include "client/ClientQueue.h" - -using namespace qpid; -using namespace qpid::framing; -using namespace std; - -template <class T> -std::string tostring(const T& x) -{ - std::ostringstream out; - out << x; - return out.str(); -} - -class FramingTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(FramingTest); - CPPUNIT_TEST(testBasicQosBody); - CPPUNIT_TEST(testConnectionSecureBody); - CPPUNIT_TEST(testConnectionRedirectBody); - CPPUNIT_TEST(testAccessRequestBody); - CPPUNIT_TEST(testBasicConsumeBody); - CPPUNIT_TEST(testConnectionRedirectBodyFrame); - CPPUNIT_TEST(testBasicConsumeOkBodyFrame); - CPPUNIT_TEST(testRequestBodyFrame); - CPPUNIT_TEST(testResponseBodyFrame); - CPPUNIT_TEST(testRequester); - CPPUNIT_TEST(testResponder); - CPPUNIT_TEST(testInlineContent); - CPPUNIT_TEST(testContentReference); - CPPUNIT_TEST(testContentValidation); - CPPUNIT_TEST(testRequestResponseRoundtrip); - CPPUNIT_TEST_SUITE_END(); - - private: - Buffer buffer; - ProtocolVersion version; - AMQP_MethodVersionMap versionMap; - - public: - - FramingTest() : buffer(1024), version(highestProtocolVersion) {} - - void testBasicQosBody() - { - BasicQosBody in(version, 0xCAFEBABE, 0xABBA, true); - in.encodeContent(buffer); - buffer.flip(); - BasicQosBody out(version); - out.decodeContent(buffer); - CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); - } - - void testConnectionSecureBody() - { - std::string s = "security credential"; - ConnectionSecureBody in(version, s); - in.encodeContent(buffer); - buffer.flip(); - ConnectionSecureBody out(version); - out.decodeContent(buffer); - CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); - } - - void testConnectionRedirectBody() - { - std::string a = "hostA"; - std::string b = "hostB"; - ConnectionRedirectBody in(version, 0, a, b); - in.encodeContent(buffer); - buffer.flip(); - ConnectionRedirectBody out(version); - out.decodeContent(buffer); - CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); - } - - void testAccessRequestBody() - { - std::string s = "text"; - AccessRequestBody in(version, s, true, false, true, false, true); - in.encodeContent(buffer); - buffer.flip(); - AccessRequestBody out(version); - out.decodeContent(buffer); - CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); - } - - void testBasicConsumeBody() - { - std::string q = "queue"; - std::string t = "tag"; - BasicConsumeBody in(version, 0, q, t, false, true, false, false, - FieldTable()); - in.encodeContent(buffer); - buffer.flip(); - BasicConsumeBody out(version); - out.decodeContent(buffer); - CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); - } - - - void testConnectionRedirectBodyFrame() - { - std::string a = "hostA"; - std::string b = "hostB"; - AMQFrame in(version, 999, - new ConnectionRedirectBody(version, 0, a, b)); - in.encode(buffer); - buffer.flip(); - AMQFrame out; - out.decode(buffer); - CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); - } - - void testBasicConsumeOkBodyFrame() - { - std::string s = "hostA"; - AMQFrame in(version, 999, new BasicConsumeOkBody(version, 0, s)); - in.encode(buffer); - buffer.flip(); - AMQFrame out; - for(int i = 0; i < 5; i++){ - out.decode(buffer); - CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); - } - } - - void testRequestBodyFrame() { - std::string testing("testing"); - AMQBody::shared_ptr request(new ChannelOpenBody(version, testing)); - AMQFrame in(version, 999, request); - in.encode(buffer); - buffer.flip(); - AMQFrame out; - out.decode(buffer); - ChannelOpenBody* decoded = - dynamic_cast<ChannelOpenBody*>(out.getBody().get()); - CPPUNIT_ASSERT(decoded); - CPPUNIT_ASSERT_EQUAL(testing, decoded->getOutOfBand()); - } - - void testResponseBodyFrame() { - AMQBody::shared_ptr response(new ChannelOkBody(version)); - AMQFrame in(version, 999, response); - in.encode(buffer); - buffer.flip(); - AMQFrame out; - out.decode(buffer); - ChannelOkBody* decoded = - dynamic_cast<ChannelOkBody*>(out.getBody().get()); - CPPUNIT_ASSERT(decoded); - } - - void testInlineContent() { - Content content(INLINE, "MyData"); - CPPUNIT_ASSERT(content.isInline()); - content.encode(buffer); - buffer.flip(); - Content recovered; - recovered.decode(buffer); - CPPUNIT_ASSERT(recovered.isInline()); - CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue()); - } - - void testContentReference() { - Content content(REFERENCE, "MyRef"); - CPPUNIT_ASSERT(content.isReference()); - content.encode(buffer); - buffer.flip(); - Content recovered; - recovered.decode(buffer); - CPPUNIT_ASSERT(recovered.isReference()); - CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue()); - } - - void testContentValidation() { - try { - Content content(REFERENCE, ""); - CPPUNIT_ASSERT(false);//fail, expected exception - } catch (QpidError& e) { - CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code); - CPPUNIT_ASSERT_EQUAL(string("Reference cannot be empty"), e.msg); - } - - try { - Content content(2, "Blah"); - CPPUNIT_ASSERT(false);//fail, expected exception - } catch (QpidError& e) { - CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code); - CPPUNIT_ASSERT_EQUAL(string("Invalid discriminator: 2"), e.msg); - } - - try { - buffer.putOctet(2); - buffer.putLongString("blah, blah"); - buffer.flip(); - Content content; - content.decode(buffer); - CPPUNIT_ASSERT(false);//fail, expected exception - } catch (QpidError& e) { - CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code); - CPPUNIT_ASSERT_EQUAL(string("Invalid discriminator: 2"), e.msg); - } - - } - - void testRequester() { - Requester r; - AMQRequestBody::Data q; - AMQResponseBody::Data p; - - r.sending(q); - CPPUNIT_ASSERT_EQUAL(1ULL, q.requestId); - CPPUNIT_ASSERT_EQUAL(0ULL, q.responseMark); - - r.sending(q); - CPPUNIT_ASSERT_EQUAL(2ULL, q.requestId); - CPPUNIT_ASSERT_EQUAL(0ULL, q.responseMark); - - // Now process a response - p.responseId = 1; - p.requestId = 2; - r.processed(AMQResponseBody::Data(1, 2)); - - r.sending(q); - CPPUNIT_ASSERT_EQUAL(3ULL, q.requestId); - CPPUNIT_ASSERT_EQUAL(1ULL, q.responseMark); - - try { - r.processed(p); // Already processed this response. - CPPUNIT_FAIL("Expected exception"); - } catch (...) {} - - try { - p.requestId = 50; - r.processed(p); // No such request - CPPUNIT_FAIL("Expected exception"); - } catch (...) {} - - r.sending(q); // reqId=4 - r.sending(q); // reqId=5 - r.sending(q); // reqId=6 - p.responseId++; - p.requestId = 4; - p.batchOffset = 2; - r.processed(p); - r.sending(q); - CPPUNIT_ASSERT_EQUAL(7ULL, q.requestId); - CPPUNIT_ASSERT_EQUAL(2ULL, q.responseMark); - - p.responseId++; - p.requestId = 1; // Out of order - p.batchOffset = 0; - r.processed(p); - r.sending(q); - CPPUNIT_ASSERT_EQUAL(8ULL, q.requestId); - CPPUNIT_ASSERT_EQUAL(3ULL, q.responseMark); - } - - void testResponder() { - Responder r; - AMQRequestBody::Data q; - AMQResponseBody::Data p; - - q.requestId = 1; - q.responseMark = 0; - r.received(q); - p.requestId = q.requestId; - r.sending(p); - CPPUNIT_ASSERT_EQUAL(1ULL, p.responseId); - CPPUNIT_ASSERT_EQUAL(1ULL, p.requestId); - CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset); - CPPUNIT_ASSERT_EQUAL(0ULL, r.getResponseMark()); - - q.requestId++; - q.responseMark = 1; - r.received(q); - r.sending(p); - CPPUNIT_ASSERT_EQUAL(2ULL, p.responseId); - CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset); - CPPUNIT_ASSERT_EQUAL(1ULL, r.getResponseMark()); - - try { - // Response mark higher any request ID sent. - q.responseMark = 3; - r.received(q); - } catch(...) {} - - try { - // Response mark lower than previous response mark. - q.responseMark = 0; - r.received(q); - } catch(...) {} - - // TODO aconway 2007-01-14: Test for batching when supported. - - } - - // expect may contain null chars so use string(ptr,size) constructor - // Use sizeof(expect)-1 to strip the trailing null. -#define ASSERT_FRAME(expect, frame) \ - CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), boost::lexical_cast<string>(frame)) - - void testRequestResponseRoundtrip() { - broker::InProcessBroker ibroker(version); - client::Connection clientConnection; - clientConnection.setConnector(ibroker); - clientConnection.open(""); - client::Channel c; - clientConnection.openChannel(c); - - client::Exchange exchange( - "MyExchange", client::Exchange::TOPIC_EXCHANGE); - client::Queue queue("MyQueue", true); - c.declareExchange(exchange); - c.declareQueue(queue); - c.bind(exchange, queue, "MyTopic", framing::FieldTable()); - broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin(); - ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=9; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=100; frameMax=65536; heartbeat=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=100; frameMax=65536; heartbeat=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: channelId=]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; passive=0; durable=0; autoDelete=0; internal=0; nowait=0; arguments={}]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=2,batch=0): ExchangeDeclareOk: ]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=2): QueueDeclare: ticket=0; queue=MyQueue; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; response(id=3,request=3,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=3): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; nowait=0; arguments={}]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; response(id=4,request=4,batch=0): QueueBindOk: ]", *i++); - } - }; - - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(FramingTest); - - - diff --git a/qpid/cpp/tests/HeaderTest.cpp b/qpid/cpp/tests/HeaderTest.cpp deleted file mode 100644 index 77e68829c3..0000000000 --- a/qpid/cpp/tests/HeaderTest.cpp +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <amqp_framing.h> -#include <qpid_test_plugin.h> - -using namespace qpid::framing; - -class HeaderTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(HeaderTest); - CPPUNIT_TEST(testGenericProperties); - CPPUNIT_TEST(testAllSpecificProperties); - CPPUNIT_TEST(testSomeSpecificProperties); - CPPUNIT_TEST_SUITE_END(); - -public: - - void testGenericProperties() - { - AMQHeaderBody body(BASIC); - dynamic_cast<BasicHeaderProperties*>(body.getProperties())->getHeaders().setString("A", "BCDE"); - Buffer buffer(100); - - body.encode(buffer); - buffer.flip(); - AMQHeaderBody body2; - body2.decode(buffer, body.size()); - BasicHeaderProperties* props = - dynamic_cast<BasicHeaderProperties*>(body2.getProperties()); - CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), - props->getHeaders().getString("A")); - } - - void testAllSpecificProperties(){ - string contentType("text/html"); - string contentEncoding("UTF8"); - DeliveryMode deliveryMode(PERSISTENT); - uint8_t priority(3); - string correlationId("abc"); - string replyTo("no-address"); - string expiration("why is this a string?"); - string messageId("xyz"); - uint64_t timestamp(0xabcd); - string type("eh?"); - string userId("guest"); - string appId("just testing"); - string clusterId("no clustering required"); - - AMQHeaderBody body(BASIC); - BasicHeaderProperties* properties = - dynamic_cast<BasicHeaderProperties*>(body.getProperties()); - properties->setContentType(contentType); - properties->getHeaders().setString("A", "BCDE"); - properties->setDeliveryMode(deliveryMode); - properties->setPriority(priority); - properties->setCorrelationId(correlationId); - properties->setReplyTo(replyTo); - properties->setExpiration(expiration); - properties->setMessageId(messageId); - properties->setTimestamp(timestamp); - properties->setType(type); - properties->setUserId(userId); - properties->setAppId(appId); - properties->setClusterId(clusterId); - - Buffer buffer(10000); - body.encode(buffer); - buffer.flip(); - AMQHeaderBody temp; - temp.decode(buffer, body.size()); - properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); - - CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); - CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), properties->getHeaders().getString("A")); - CPPUNIT_ASSERT_EQUAL(deliveryMode, properties->getDeliveryMode()); - CPPUNIT_ASSERT_EQUAL(priority, properties->getPriority()); - CPPUNIT_ASSERT_EQUAL(correlationId, properties->getCorrelationId()); - CPPUNIT_ASSERT_EQUAL(replyTo, properties->getReplyTo()); - CPPUNIT_ASSERT_EQUAL(expiration, properties->getExpiration()); - CPPUNIT_ASSERT_EQUAL(messageId, properties->getMessageId()); - CPPUNIT_ASSERT_EQUAL(timestamp, properties->getTimestamp()); - CPPUNIT_ASSERT_EQUAL(type, properties->getType()); - CPPUNIT_ASSERT_EQUAL(userId, properties->getUserId()); - CPPUNIT_ASSERT_EQUAL(appId, properties->getAppId()); - CPPUNIT_ASSERT_EQUAL(clusterId, properties->getClusterId()); - } - - void testSomeSpecificProperties(){ - string contentType("application/octet-stream"); - DeliveryMode deliveryMode(PERSISTENT); - uint8_t priority(6); - string expiration("Z"); - uint64_t timestamp(0xabe4a34a); - - AMQHeaderBody body(BASIC); - BasicHeaderProperties* properties = - dynamic_cast<BasicHeaderProperties*>(body.getProperties()); - properties->setContentType(contentType); - properties->setDeliveryMode(deliveryMode); - properties->setPriority(priority); - properties->setExpiration(expiration); - properties->setTimestamp(timestamp); - - Buffer buffer(100); - body.encode(buffer); - buffer.flip(); - AMQHeaderBody temp; - temp.decode(buffer, body.size()); - properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); - - CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); - CPPUNIT_ASSERT_EQUAL((int) deliveryMode, (int) properties->getDeliveryMode()); - CPPUNIT_ASSERT_EQUAL((int) priority, (int) properties->getPriority()); - CPPUNIT_ASSERT_EQUAL(expiration, properties->getExpiration()); - CPPUNIT_ASSERT_EQUAL(timestamp, properties->getTimestamp()); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(HeaderTest); - diff --git a/qpid/cpp/tests/HeadersExchangeTest.cpp b/qpid/cpp/tests/HeadersExchangeTest.cpp deleted file mode 100644 index 6cd51c55a9..0000000000 --- a/qpid/cpp/tests/HeadersExchangeTest.cpp +++ /dev/null @@ -1,115 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <HeadersExchange.h> -#include <FieldTable.h> -#include <Value.h> -#include <qpid_test_plugin.h> - -using namespace qpid::broker; -using namespace qpid::framing; - -class HeadersExchangeTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(HeadersExchangeTest); - CPPUNIT_TEST(testMatchAll); - CPPUNIT_TEST(testMatchAny); - CPPUNIT_TEST(testMatchEmptyValue); - CPPUNIT_TEST(testMatchEmptyArgs); - CPPUNIT_TEST(testMatchNoXMatch); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testMatchAll() - { - FieldTable b, m; - b.setString("x-match", "all"); - b.setString("foo", "FOO"); - b.setInt("n", 42); - m.setString("foo", "FOO"); - m.setInt("n", 42); - CPPUNIT_ASSERT(HeadersExchange::match(b, m)); - - // Ignore extras. - m.setString("extra", "x"); - CPPUNIT_ASSERT(HeadersExchange::match(b, m)); - - // Fail mismatch, wrong value. - m.setString("foo", "NotFoo"); - CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); - - // Fail mismatch, missing value - m.erase("foo"); - CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); - } - - void testMatchAny() - { - FieldTable b, m; - b.setString("x-match", "any"); - b.setString("foo", "FOO"); - b.setInt("n", 42); - m.setString("foo", "FOO"); - CPPUNIT_ASSERT(HeadersExchange::match(b, m)); - m.erase("foo"); - CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); - m.setInt("n", 42); - CPPUNIT_ASSERT(HeadersExchange::match(b, m)); - } - - void testMatchEmptyValue() - { - FieldTable b, m; - b.setString("x-match", "all"); - b.getMap()["foo"] = FieldTable::ValuePtr(new EmptyValue()); - b.getMap()["n"] = FieldTable::ValuePtr(new EmptyValue()); - CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); - m.setString("foo", "blah"); - m.setInt("n", 123); - } - - void testMatchEmptyArgs() - { - FieldTable b, m; - m.setString("foo", "FOO"); - - b.setString("x-match", "all"); - CPPUNIT_ASSERT(HeadersExchange::match(b, m)); - b.setString("x-match", "any"); - CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); - } - - - void testMatchNoXMatch() - { - FieldTable b, m; - b.setString("foo", "FOO"); - m.setString("foo", "FOO"); - CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); - } - - -}; - -// make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(HeadersExchangeTest); diff --git a/qpid/cpp/tests/InMemoryContentTest.cpp b/qpid/cpp/tests/InMemoryContentTest.cpp deleted file mode 100644 index 4597ee5e5d..0000000000 --- a/qpid/cpp/tests/InMemoryContentTest.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <InMemoryContent.h> -#include <qpid_test_plugin.h> -#include <AMQP_HighestVersion.h> -#include <iostream> -#include <list> -#include "AMQFrame.h" -#include "MockChannel.h" - -using std::list; -using std::string; -using boost::dynamic_pointer_cast; -using namespace qpid::broker; -using namespace qpid::framing; - - -class InMemoryContentTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(InMemoryContentTest); - CPPUNIT_TEST(testRefragmentation); - CPPUNIT_TEST_SUITE_END(); - -public: - void testRefragmentation() - { - {//no remainder - string out[] = {"abcde", "fghij", "klmno", "pqrst"}; - string in[] = {out[0] + out[1], out[2] + out[3]}; - refragment(2, in, 4, out); - } - {//remainder for last frame - string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvw"}; - string in[] = {out[0] + out[1], out[2] + out[3] + out[4]}; - refragment(2, in, 5, out); - } - } - - - void refragment(size_t inCount, string* in, size_t outCount, string* out, uint32_t framesize = 5) - { - InMemoryContent content; - MockChannel channel(3); - - addframes(content, inCount, in); - content.send(channel, framesize); - CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); - - for (unsigned int i = 0; i < outCount; i++) { - AMQContentBody::shared_ptr chunk( - dynamic_pointer_cast<AMQContentBody>( - channel.out.frames[i].getBody())); - CPPUNIT_ASSERT(chunk); - CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData()); - CPPUNIT_ASSERT_EQUAL( - ChannelId(3), channel.out.frames[i].getChannel()); - } - } - - void addframes(InMemoryContent& content, size_t frameCount, string* frameData) - { - for (unsigned int i = 0; i < frameCount; i++) { - AMQContentBody::shared_ptr frame(new AMQContentBody(frameData[i])); - content.add(frame); - } - } - - -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(InMemoryContentTest); - diff --git a/qpid/cpp/tests/InProcessBroker.h b/qpid/cpp/tests/InProcessBroker.h deleted file mode 100644 index 2882ab28e8..0000000000 --- a/qpid/cpp/tests/InProcessBroker.h +++ /dev/null @@ -1,163 +0,0 @@ -#ifndef _tests_InProcessBroker_h -#define _tests_InProcessBroker_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "AMQP_HighestVersion.h" -#include "framing/AMQFrame.h" -#include "broker/Broker.h" -#include "broker/Connection.h" -#include "client/Connector.h" -#include "client/Connection.h" - -#include <vector> -#include <iostream> -#include <algorithm> - - -namespace qpid { -namespace broker { - -/** - * A broker that implements client::Connector allowing direct - * in-process connection of client to broker. Used to write round-trip - * tests without requiring an external broker process. - * - * Also allows you to "snoop" on frames exchanged between client & broker. - * - * see FramingTest::testRequestResponseRoundtrip() for example of use. - */ -class InProcessBroker : public client::Connector { - public: - enum Sender {CLIENT,BROKER}; - - /** A frame tagged with the sender */ - struct TaggedFrame { - TaggedFrame(Sender e, framing::AMQFrame* f) : frame(f), sender(e) {} - bool fromBroker() const { return sender == BROKER; } - bool fromClient() const { return sender == CLIENT; } - - template <class MethodType> - MethodType* asMethod() { - return dynamic_cast<MethodType*>(frame->getBody().get()); - } - shared_ptr<framing::AMQFrame> frame; - Sender sender; - }; - - typedef std::vector<TaggedFrame> Conversation; - - InProcessBroker(framing::ProtocolVersion ver= - framing::highestProtocolVersion - ) : - Connector(ver), - protocolInit(ver), - broker(broker::Broker::create()), - brokerOut(BROKER, conversation), - brokerConnection(&brokerOut, *broker), - clientOut(CLIENT, conversation, &brokerConnection) - {} - - ~InProcessBroker() { broker->shutdown(); } - - void connect(const std::string& /*host*/, int /*port*/) {} - void init() { brokerConnection.initiated(protocolInit); } - void close() {} - - /** Client's input handler. */ - void setInputHandler(framing::InputHandler* handler) { - brokerOut.in = handler; - } - - /** Called by client to send a frame */ - void send(framing::AMQFrame* frame) { - clientOut.send(frame); - } - - /** Entire client-broker conversation is recorded here */ - Conversation conversation; - - private: - /** OutputHandler that forwards data to an InputHandler */ - struct OutputToInputHandler : public sys::ConnectionOutputHandler { - OutputToInputHandler( - Sender sender_, Conversation& conversation_, - framing::InputHandler* ih=0 - ) : sender(sender_), conversation(conversation_), in(ih) {} - - void send(framing::AMQFrame* frame) { - conversation.push_back(TaggedFrame(sender, frame)); - in->received(frame); - } - - void close() {} - - Sender sender; - Conversation& conversation; - framing::InputHandler* in; - }; - - framing::ProtocolInitiation protocolInit; - Broker::shared_ptr broker; - OutputToInputHandler brokerOut; - broker::Connection brokerConnection; - OutputToInputHandler clientOut; -}; - -std::ostream& operator<<( - std::ostream& out, const InProcessBroker::TaggedFrame& tf) -{ - return out << (tf.fromBroker()? "BROKER: ":"CLIENT: ") << *tf.frame; -} - -std::ostream& operator<<( - std::ostream& out, const InProcessBroker::Conversation& conv) -{ - copy(conv.begin(), conv.end(), - std::ostream_iterator<InProcessBroker::TaggedFrame>(out, "\n")); - return out; -} - -} // namespace broker - - -namespace client { -/** An in-process client+broker all in one. */ -class InProcessBrokerClient : public client::Connection { - public: - broker::InProcessBroker broker; - broker::InProcessBroker::Conversation& conversation; - - /** Constructor creates broker and opens client connection. */ - InProcessBrokerClient( - u_int32_t max_frame_size=65536, - framing::ProtocolVersion version= framing::highestProtocolVersion - ) : client::Connection(false, max_frame_size, version), - broker(version), - conversation(broker.conversation) - { - setConnector(broker); - open(""); - } -}; - - -}} // namespace qpid::client - - -#endif // _tests_InProcessBroker_h diff --git a/qpid/cpp/tests/LazyLoadedContentTest.cpp b/qpid/cpp/tests/LazyLoadedContentTest.cpp deleted file mode 100644 index 4cd8e9b307..0000000000 --- a/qpid/cpp/tests/LazyLoadedContentTest.cpp +++ /dev/null @@ -1,112 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <LazyLoadedContent.h> -#include <AMQP_HighestVersion.h> -#include <NullMessageStore.h> -#include <qpid_test_plugin.h> -#include <iostream> -#include <list> -#include <sstream> -#include "AMQFrame.h" -#include "MockChannel.h" -using std::list; -using std::string; -using boost::dynamic_pointer_cast; -using namespace qpid::broker; -using namespace qpid::framing; - - - -class LazyLoadedContentTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(LazyLoadedContentTest); - CPPUNIT_TEST(testFragmented); - CPPUNIT_TEST(testWhole); - CPPUNIT_TEST(testHalved); - CPPUNIT_TEST_SUITE_END(); - - class TestMessageStore : public NullMessageStore - { - const string content; - - public: - TestMessageStore(const string& _content) : content(_content) {} - - void loadContent(PersistableMessage&, string& data, uint64_t offset, uint32_t length) - { - if (offset + length <= content.size()) { - data = content.substr(offset, length); - } else{ - std::stringstream error; - error << "Invalid segment: offset=" << offset << ", length=" << length << ", content_length=" << content.size(); - throw qpid::Exception(error.str()); - } - } - }; - - -public: - void testFragmented() - { - string data = "abcdefghijklmnopqrstuvwxyz"; - uint32_t framesize = 5; - string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvwxy", "z"}; - load(data, 6, out, framesize); - } - - void testWhole() - { - string data = "abcdefghijklmnopqrstuvwxyz"; - uint32_t framesize = 50; - string out[] = {data}; - load(data, 1, out, framesize); - } - - void testHalved() - { - string data = "abcdefghijklmnopqrstuvwxyz"; - uint32_t framesize = 13; - string out[] = {"abcdefghijklm", "nopqrstuvwxyz"}; - load(data, 2, out, framesize); - } - - void load(string& in, size_t outCount, string* out, uint32_t framesize) - { - TestMessageStore store(in); - LazyLoadedContent content(&store, 0, in.size()); - MockChannel channel(3); - content.send(channel, framesize); - CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); - - for (unsigned int i = 0; i < outCount; i++) { - AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[i].getBody())); - CPPUNIT_ASSERT(chunk); - CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData()); - CPPUNIT_ASSERT_EQUAL( - ChannelId(3), channel.out.frames[i].getChannel()); - } - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(LazyLoadedContentTest); - diff --git a/qpid/cpp/tests/Makefile.am b/qpid/cpp/tests/Makefile.am deleted file mode 100644 index 6ae3598a98..0000000000 --- a/qpid/cpp/tests/Makefile.am +++ /dev/null @@ -1,123 +0,0 @@ -AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) -INCLUDES = \ - -I$(top_srcdir)/gen \ - -I$(top_srcdir)/lib \ - -I$(top_srcdir)/lib/client \ - -I$(top_srcdir)/lib/broker \ - -I$(top_srcdir)/lib/common \ - -I$(top_srcdir)/lib/common/sys \ - -I$(top_srcdir)/lib/common/framing \ - $(APR_CXXFLAGS) - -# Unit tests -broker_tests = \ - AccumulatedAckTest \ - BrokerChannelTest \ - ConfigurationTest \ - ExchangeTest \ - HeadersExchangeTest \ - InMemoryContentTest \ - LazyLoadedContentTest \ - MessageBuilderTest \ - MessageTest \ - ReferenceTest \ - QueueRegistryTest \ - QueueTest \ - QueuePolicyTest \ - TopicExchangeTest \ - TxAckTest \ - TxBufferTest \ - TxPublishTest \ - ValueTest \ - MessageHandlerTest - -client_tests = \ - ClientChannelTest - -framing_tests = \ - FieldTableTest \ - FramingTest \ - HeaderTest - -misc_tests = \ - ProducerConsumerTest - -posix_tests = \ - EventChannelTest \ - EventChannelThreadsTest - -unit_tests = \ - $(broker_tests) \ - $(client_tests) \ - $(framing_tests) \ - $(misc_tests) - -# Executable client tests - -client_exe_tests = \ - client_test \ - echo_service \ - topic_listener \ - topic_publisher - -noinst_PROGRAMS = $(client_exe_tests) - -TESTS_ENVIRONMENT = \ - VALGRIND=$(VALGRIND) \ - abs_builddir='$(abs_builddir)' \ - PATH="$(abs_builddir)/../src$(PATH_SEPARATOR)$$PATH" \ - abs_srcdir='$(abs_srcdir)' - -CLIENT_TESTS = client_test quick_topictest -TESTS = run-unit-tests start_broker $(CLIENT_TESTS) python_tests kill_broker - -EXTRA_DIST = \ - $(TESTS) \ - .vg-supp \ - InProcessBroker.h \ - MockChannel.h \ - MockConnectionInputHandler.h \ - qpid_test_plugin.h \ - setup \ - topicall \ - topictest \ - APRBaseTest.cpp - -CLEANFILES=qpidd.log -DISTCLEANFILES=gen.mk - -include gen.mk - -check_LTLIBRARIES += libdlclose_noop.la -libdlclose_noop_la_LDFLAGS = -module -rpath /home/aconway/svn/qpid/cpp/tests -libdlclose_noop_la_SOURCES = dlclose_noop.c - - -abs_builddir = @abs_builddir@ -extra_libs = $(CPPUNIT_LIBS) -lib_client = $(abs_builddir)/../lib/client/libqpidclient.la -lib_common = $(abs_builddir)/../lib/common/libqpidcommon.la -lib_broker = $(abs_builddir)/../lib/broker/libqpidbroker.la - -gen.mk: Makefile.am - ( \ - for i in $(client_exe_tests); do \ - echo $${i}_SOURCES = $$i.cpp; \ - echo $${i}_LDADD = '$$(lib_client) $$(lib_common) $$(extra_libs)'; \ - done; \ - libs=; \ - for i in $(unit_tests); do \ - libs="$$libs $${i}.la"; \ - echo $${i}_la_SOURCES = $$i.cpp; \ - echo $${i}_la_LIBADD = '$$(lib_common) $$(lib_client)'; \ - echo $${i}_la_LIBADD += '$$(lib_broker) $$(extra_libs)'; \ - echo $${i}_la_LDFLAGS = "-module -rpath `pwd`"; \ - done; \ - echo "check_LTLIBRARIES =$$libs"; \ - ) \ - > $@-t - mv $@-t $@ - -check: $(check_LTLIBRARIES) $(lib_common) $(lib_client) $(lib_broker) - -# Rule to run unit tests from an individual test module. diff --git a/qpid/cpp/tests/MessageBuilderTest.cpp b/qpid/cpp/tests/MessageBuilderTest.cpp deleted file mode 100644 index 68e5abf60e..0000000000 --- a/qpid/cpp/tests/MessageBuilderTest.cpp +++ /dev/null @@ -1,225 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <Exception.h> -#include <BrokerMessage.h> -#include <MessageBuilder.h> -#include <NullMessageStore.h> -#include <Buffer.h> -#include <qpid_test_plugin.h> -#include <iostream> -#include <memory> -#include "MockChannel.h" - -using namespace boost; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -class MessageBuilderTest : public CppUnit::TestCase -{ - struct MockHandler : CompletionHandler { - Message::shared_ptr msg; - - virtual void complete(Message::shared_ptr _msg){ - msg = _msg; - } - }; - - class TestMessageStore : public NullMessageStore - { - Buffer* header; - Buffer* content; - const uint32_t contentBufferSize; - - public: - - void stage(PersistableMessage& msg) - { - if (msg.getPersistenceId() == 0) { - header = new Buffer(msg.encodedSize()); - msg.encode(*header); - content = new Buffer(contentBufferSize); - msg.setPersistenceId(1); - } else { - throw qpid::Exception("Message already staged!"); - } - } - - void appendContent(PersistableMessage& msg, const string& data) - { - if (msg.getPersistenceId() == 1) { - content->putRawData(data); - } else { - throw qpid::Exception("Invalid message id!"); - } - } - - using NullMessageStore::destroy; - - void destroy(PersistableMessage& msg) - { - CPPUNIT_ASSERT(msg.getPersistenceId()); - } - - BasicMessage::shared_ptr getRestoredMessage() - { - BasicMessage::shared_ptr msg(new BasicMessage()); - if (header) { - header->flip(); - msg->decodeHeader(*header); - delete header; - header = 0; - if (content) { - content->flip(); - msg->decodeContent(*content); - delete content; - content = 0; - } - } - return msg; - } - - //dont care about any of the other methods: - TestMessageStore(uint32_t _contentBufferSize) : NullMessageStore(), header(0), content(0), - contentBufferSize(_contentBufferSize) {} - ~TestMessageStore(){} - }; - - CPPUNIT_TEST_SUITE(MessageBuilderTest); - CPPUNIT_TEST(testHeaderOnly); - CPPUNIT_TEST(test1ContentFrame); - CPPUNIT_TEST(test2ContentFrames); - CPPUNIT_TEST(testStaging); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testHeaderOnly(){ - MockHandler handler; - MessageBuilder builder(&handler); - - Message::shared_ptr message( - new BasicMessage( - 0, "test", "my_routing_key", false, false, - MockChannel::basicGetBody())); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(0); - - builder.initialise(message); - CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(header); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); - } - - void test1ContentFrame(){ - MockHandler handler; - MessageBuilder builder(&handler); - - string data1("abcdefg"); - - Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false, - MockChannel::basicGetBody())); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(7); - AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); - - builder.initialise(message); - CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(header); - CPPUNIT_ASSERT(!handler.msg); - builder.addContent(part1); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); - } - - void test2ContentFrames(){ - MockHandler handler; - MessageBuilder builder(&handler); - - string data1("abcdefg"); - string data2("hijklmn"); - - Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false, - MockChannel::basicGetBody())); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(14); - AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); - AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); - - builder.initialise(message); - CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(header); - CPPUNIT_ASSERT(!handler.msg); - builder.addContent(part1); - CPPUNIT_ASSERT(!handler.msg); - builder.addContent(part2); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); - } - - void testStaging(){ - //store must be the last thing to be destroyed or destructor - //of Message fails (it uses the store to call destroy if lazy - //loaded content is in use) - TestMessageStore store(14); - { - MockHandler handler; - MessageBuilder builder(&handler, &store, 5); - - string data1("abcdefg"); - string data2("hijklmn"); - - Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false, - MockChannel::basicGetBody())); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(14); - BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); - properties->setMessageId("MyMessage"); - properties->getHeaders().setString("abc", "xyz"); - - AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); - AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); - - builder.initialise(message); - builder.setHeader(header); - builder.addContent(part1); - builder.addContent(part2); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); - - BasicMessage::shared_ptr restored = store.getRestoredMessage(); - CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange()); - CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey()); - CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId()); - CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"), - restored->getHeaderProperties()->getHeaders().getString("abc")); - CPPUNIT_ASSERT_EQUAL((uint64_t) 14, restored->contentSize()); - } - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(MessageBuilderTest); diff --git a/qpid/cpp/tests/MessageHandlerTest.cpp b/qpid/cpp/tests/MessageHandlerTest.cpp deleted file mode 100644 index 55971355f6..0000000000 --- a/qpid/cpp/tests/MessageHandlerTest.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -//#include <iostream> -//#include <AMQP_HighestVersion.h> -#include <amqp_framing.h> -#include <qpid_test_plugin.h> - -#include <BrokerAdapter.h> - -using namespace qpid::framing; -using namespace qpid::broker; - -class MessageHandlerTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(MessageHandlerTest); - CPPUNIT_TEST(testOpenMethod); - CPPUNIT_TEST_SUITE_END(); -private: - -public: - - MessageHandlerTest() - { - } - - void testOpenMethod() - { - //AMQFrame frame(highestProtocolVersion, 0, method); - //TestBodyHandler handler(method); - //handler.handleBody(frame.getBody()); - } - -}; - - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(MessageHandlerTest); - diff --git a/qpid/cpp/tests/MessageTest.cpp b/qpid/cpp/tests/MessageTest.cpp deleted file mode 100644 index 6d766c2260..0000000000 --- a/qpid/cpp/tests/MessageTest.cpp +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <BrokerMessage.h> -#include <qpid_test_plugin.h> -#include <iostream> -#include <AMQP_HighestVersion.h> -#include "AMQFrame.h" -#include "MockChannel.h" - -using namespace boost; -using namespace qpid::broker; -using namespace qpid::framing; - -class MessageTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(MessageTest); - CPPUNIT_TEST(testEncodeDecode); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testEncodeDecode() - { - string exchange = "MyExchange"; - string routingKey = "MyRoutingKey"; - string messageId = "MyMessage"; - string data1("abcdefg"); - string data2("hijklmn"); - - BasicMessage::shared_ptr msg( - new BasicMessage(0, exchange, routingKey, false, false, - MockChannel::basicGetBody())); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(14); - AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); - AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); - msg->setHeader(header); - msg->addContent(part1); - msg->addContent(part2); - - msg->getHeaderProperties()->setMessageId(messageId); - msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); - msg->getHeaderProperties()->getHeaders().setString("abc", "xyz"); - - Buffer buffer(msg->encodedSize()); - msg->encode(buffer); - buffer.flip(); - - msg.reset(new BasicMessage()); - msg->decode(buffer); - CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange()); - CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey()); - CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId()); - CPPUNIT_ASSERT_EQUAL(PERSISTENT, msg->getHeaderProperties()->getDeliveryMode()); - CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc")); - CPPUNIT_ASSERT_EQUAL((uint64_t) 14, msg->contentSize()); - - MockChannel channel(1); - msg->deliver(channel, "ignore", 0, 100); - CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size()); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[2].getBody())); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData()); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(MessageTest); - diff --git a/qpid/cpp/tests/MockChannel.h b/qpid/cpp/tests/MockChannel.h deleted file mode 100644 index 79aa1d35af..0000000000 --- a/qpid/cpp/tests/MockChannel.h +++ /dev/null @@ -1,70 +0,0 @@ -#ifndef _tests_MockChannel_h -#define _tests_MockChannel_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "framing/MethodContext.h" -#include "framing/ChannelAdapter.h" -#include "framing/OutputHandler.h" -#include "framing/AMQFrame.h" -#include "BasicGetBody.h" -#include <boost/shared_ptr.hpp> -#include <boost/ptr_container/ptr_vector.hpp> - -/** Mock output handler to collect frames */ -struct MockOutputHandler : public qpid::framing::OutputHandler { - boost::ptr_vector<qpid::framing::AMQFrame> frames; - void send(qpid::framing::AMQFrame* frame){ frames.push_back(frame); } -}; - -/** - * Combination mock OutputHandler and ChannelAdapter for tests. - */ -struct MockChannel : public qpid::framing::ChannelAdapter -{ - typedef qpid::framing::BasicGetBody Body; - static Body::shared_ptr basicGetBody() { - return Body::shared_ptr( - new Body(qpid::framing::ProtocolVersion())); - } - - MockOutputHandler out; - - MockChannel(qpid::framing::ChannelId id) { - init(id, out, qpid::framing::ProtocolVersion()); - } - - bool isOpen() const { return true; } - - void handleHeader( - boost::shared_ptr<qpid::framing::AMQHeaderBody> b) { send(b); } - void handleContent( - boost::shared_ptr<qpid::framing::AMQContentBody> b) { send(b); } - void handleHeartbeat( - boost::shared_ptr<qpid::framing::AMQHeartbeatBody> b) { send(b); } - void handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const qpid::framing::MethodContext& context) - { - context.channel->send(method); - }; - -}; - -#endif /*!_tests_MockChannel_h*/ diff --git a/qpid/cpp/tests/MockConnectionInputHandler.h b/qpid/cpp/tests/MockConnectionInputHandler.h deleted file mode 100644 index 55dbceaf44..0000000000 --- a/qpid/cpp/tests/MockConnectionInputHandler.h +++ /dev/null @@ -1,113 +0,0 @@ -#ifndef _tests_MockConnectionInputHandler_h -#define _tests_MockConnectionInputHandler_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "sys/ConnectionInputHandler.h" -#include "sys/ConnectionInputHandlerFactory.h" -#include "sys/Monitor.h" -#include "framing/ProtocolInitiation.h" - -struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler { - - MockConnectionInputHandler() : state(START) {} - - ~MockConnectionInputHandler() {} - - void initiated(const qpid::framing::ProtocolInitiation& pi) { - qpid::sys::Monitor::ScopedLock l(monitor); - init = pi; - setState(GOT_INIT); - } - - void received(qpid::framing::AMQFrame* framep) { - qpid::sys::Monitor::ScopedLock l(monitor); - frame = *framep; - setState(GOT_FRAME); - } - - qpid::framing::ProtocolInitiation waitForProtocolInit() { - waitFor(GOT_INIT); - return init; - } - - qpid::framing::AMQFrame waitForFrame() { - waitFor(GOT_FRAME); - return frame; - } - - void waitForClosed() { - waitFor(CLOSED); - } - - void closed() { - qpid::sys::Monitor::ScopedLock l(monitor); - setState(CLOSED); - } - - void idleOut() {} - void idleIn() {} - - private: - typedef enum { START, GOT_INIT, GOT_FRAME, CLOSED } State; - - void setState(State s) { - state = s; - monitor.notify(); - } - - void waitFor(State s) { - qpid::sys::Monitor::ScopedLock l(monitor); - qpid::sys::Time deadline = qpid::sys::now() + 10*qpid::sys::TIME_SEC; - while (state != s) - CPPUNIT_ASSERT(monitor.wait(deadline)); - } - - qpid::sys::Monitor monitor; - State state; - qpid::framing::ProtocolInitiation init; - qpid::framing::AMQFrame frame; -}; - - -struct MockConnectionInputHandlerFactory : public qpid::sys::ConnectionInputHandlerFactory { - MockConnectionInputHandlerFactory() : handler(0) {} - - qpid::sys::ConnectionInputHandler* create(qpid::sys::ConnectionOutputHandler*) { - qpid::sys::Monitor::ScopedLock lock(monitor); - handler = new MockConnectionInputHandler(); - monitor.notifyAll(); - return handler; - } - - void waitForHandler() { - qpid::sys::Monitor::ScopedLock lock(monitor); - qpid::sys::Time deadline = - qpid::sys::now() + 500 * qpid::sys::TIME_SEC; - while (handler == 0) - CPPUNIT_ASSERT(monitor.wait(deadline)); - } - - MockConnectionInputHandler* handler; - qpid::sys::Monitor monitor; -}; - - - -#endif /*!_tests_MockConnectionInputHandler_h*/ diff --git a/qpid/cpp/tests/ProducerConsumerTest.cpp b/qpid/cpp/tests/ProducerConsumerTest.cpp deleted file mode 100644 index ee94a56c55..0000000000 --- a/qpid/cpp/tests/ProducerConsumerTest.cpp +++ /dev/null @@ -1,284 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <vector> -#include <iostream> - -#include <boost/bind.hpp> - -#include <qpid_test_plugin.h> -#include "InProcessBroker.h" -#include "sys/ProducerConsumer.h" -#include "sys/Thread.h" -#include "AMQP_HighestVersion.h" -#include "sys/AtomicCount.h" - -using namespace qpid; -using namespace sys; -using namespace framing; -using namespace boost; -using namespace std; - -/** A counter that notifies a monitor when changed */ -class WatchedCounter : public Monitor { - public: - WatchedCounter(int i=0) : count(i) {} - WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {} - - WatchedCounter& operator=(const WatchedCounter& x) { - return *this = int(x); - } - - WatchedCounter& operator=(int i) { - Lock l(*this); - count = i; - return *this; - } - - int operator++() { - Lock l(*this); - notifyAll(); - return ++count; - } - - int operator++(int) { - Lock l(*this); - notifyAll(); - return count++; - } - - bool operator==(int i) const { - Lock l(const_cast<WatchedCounter&>(*this)); - return i == count; - } - - operator int() const { - Lock l(const_cast<WatchedCounter&>(*this)); - return count; - } - - bool waitFor(int i, Time timeout=TIME_SEC) { - Lock l(*this); - Time deadline = timeout+now(); - while (count != i) { - if (!wait(deadline)) - return false; - } - assert(count == i); - return true; - } - - private: - typedef Mutex::ScopedLock Lock; - int count; -}; - -class ProducerConsumerTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ProducerConsumerTest); - CPPUNIT_TEST(testProduceConsume); - CPPUNIT_TEST(testTimeout); - CPPUNIT_TEST(testShutdown); - CPPUNIT_TEST(testCancel); - CPPUNIT_TEST_SUITE_END(); - - public: - client::InProcessBrokerClient client; - ProducerConsumer pc; - - WatchedCounter shutdown; - WatchedCounter timeout; - WatchedCounter consumed; - WatchedCounter produced; - - struct ConsumeRunnable : public Runnable { - ProducerConsumerTest& test; - ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {} - void run() { test.consume(); } - }; - - struct ConsumeTimeoutRunnable : public Runnable { - ProducerConsumerTest& test; - Time timeout; - ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Time& t) - : test(test_), timeout(t) {} - void run() { test.consumeTimeout(timeout); } - }; - - - void consumeInternal(ProducerConsumer::ConsumerLock& consumer) { - if (pc.isShutdown()) { - ++shutdown; - return; - } - if (consumer.isTimedOut()) { - ++timeout; - return; - } - CPPUNIT_ASSERT(consumer.isOk()); - CPPUNIT_ASSERT(pc.available() > 0); - consumer.confirm(); - consumed++; - } - - void consume() { - ProducerConsumer::ConsumerLock consumer(pc); - consumeInternal(consumer); - }; - - void consumeTimeout(const Time& timeout) { - ProducerConsumer::ConsumerLock consumer(pc, timeout); - consumeInternal(consumer); - }; - - void produce() { - ProducerConsumer::ProducerLock producer(pc); - CPPUNIT_ASSERT(producer.isOk()); - producer.confirm(); - produced++; - } - - void join(vector<Thread>& threads) { - for_each(threads.begin(), threads.end(), bind(&Thread::join,_1)); - } - - vector<Thread> startThreads(size_t n, Runnable& runnable) { - vector<Thread> threads(n); - while (n > 0) - threads[--n] = Thread(runnable); - return threads; - } - -public: - ProducerConsumerTest() : client() {} - - void testProduceConsume() { - ConsumeRunnable runMe(*this); - produce(); - produce(); - CPPUNIT_ASSERT(produced.waitFor(2)); - vector<Thread> threads = startThreads(1, runMe); - CPPUNIT_ASSERT(consumed.waitFor(1)); - join(threads); - - threads = startThreads(1, runMe); - CPPUNIT_ASSERT(consumed.waitFor(2)); - join(threads); - - threads = startThreads(3, runMe); - produce(); - produce(); - CPPUNIT_ASSERT(consumed.waitFor(4)); - produce(); - CPPUNIT_ASSERT(consumed.waitFor(5)); - join(threads); - CPPUNIT_ASSERT_EQUAL(0, int(shutdown)); - } - - void testTimeout() { - try { - // 0 timeout no items available throws exception - ProducerConsumer::ConsumerLock consumer(pc, 0); - CPPUNIT_FAIL("Expected exception"); - } catch(...){} - - produce(); - CPPUNIT_ASSERT(produced.waitFor(1)); - CPPUNIT_ASSERT_EQUAL(1, int(pc.available())); - { - // 0 timeout succeeds if there's an item available. - ProducerConsumer::ConsumerLock consume(pc, 0); - CPPUNIT_ASSERT(consume.isOk()); - consume.confirm(); - } - CPPUNIT_ASSERT_EQUAL(0, int(pc.available())); - - // Produce an item within the timeout. - ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC); - vector<Thread> threads = startThreads(1, runMe); - produce(); - CPPUNIT_ASSERT(consumed.waitFor(1)); - join(threads); - } - - - void testShutdown() { - ConsumeRunnable runMe(*this); - vector<Thread> threads = startThreads(2, runMe); - while (pc.consumers() != 2) - Thread::yield(); - pc.shutdown(); - CPPUNIT_ASSERT(shutdown.waitFor(2)); - join(threads); - - threads = startThreads(1, runMe); // Should shutdown immediately. - CPPUNIT_ASSERT(shutdown.waitFor(3)); - join(threads); - - // Produce/consume while shutdown should return isShutdown and - // throw on confirm. - try { - ProducerConsumer::ProducerLock p(pc); - CPPUNIT_ASSERT(pc.isShutdown()); - CPPUNIT_FAIL("Expected exception"); - } - catch (...) {} // Expected - try { - ProducerConsumer::ConsumerLock c(pc); - CPPUNIT_ASSERT(pc.isShutdown()); - CPPUNIT_FAIL("Expected exception"); - } - catch (...) {} // Expected - } - - void testCancel() { - CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available()); - { - ProducerConsumer::ProducerLock p(pc); - CPPUNIT_ASSERT(p.isOk()); - p.cancel(); - } - // Nothing was produced. - CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available()); - { - ProducerConsumer::ConsumerLock c(pc, 0); - CPPUNIT_ASSERT(c.isTimedOut()); - } - // Now produce but cancel the consume - { - ProducerConsumer::ProducerLock p(pc); - CPPUNIT_ASSERT(p.isOk()); - p.confirm(); - } - CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available()); - { - ProducerConsumer::ConsumerLock c(pc); - CPPUNIT_ASSERT(c.isOk()); - c.cancel(); - } - CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available()); - } -}; - - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest); - diff --git a/qpid/cpp/tests/QueuePolicyTest.cpp b/qpid/cpp/tests/QueuePolicyTest.cpp deleted file mode 100644 index 20917dcd6a..0000000000 --- a/qpid/cpp/tests/QueuePolicyTest.cpp +++ /dev/null @@ -1,89 +0,0 @@ - /* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <QueuePolicy.h> -#include <qpid_test_plugin.h> - -using namespace qpid::broker; -using namespace qpid::framing; - -class QueuePolicyTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(QueuePolicyTest); - CPPUNIT_TEST(testCount); - CPPUNIT_TEST(testSize); - CPPUNIT_TEST(testBoth); - CPPUNIT_TEST(testSettings); - CPPUNIT_TEST_SUITE_END(); - - public: - void testCount(){ - QueuePolicy policy(5, 0); - CPPUNIT_ASSERT(!policy.limitExceeded()); - for (int i = 0; i < 5; i++) policy.enqueued(10); - CPPUNIT_ASSERT_EQUAL((uint64_t) 0, policy.getMaxSize()); - CPPUNIT_ASSERT_EQUAL((uint32_t) 5, policy.getMaxCount()); - CPPUNIT_ASSERT(!policy.limitExceeded()); - policy.enqueued(10); - CPPUNIT_ASSERT(policy.limitExceeded()); - policy.dequeued(10); - CPPUNIT_ASSERT(!policy.limitExceeded()); - policy.enqueued(10); - CPPUNIT_ASSERT(policy.limitExceeded()); - } - - void testSize(){ - QueuePolicy policy(0, 50); - for (int i = 0; i < 5; i++) policy.enqueued(10); - CPPUNIT_ASSERT(!policy.limitExceeded()); - policy.enqueued(10); - CPPUNIT_ASSERT(policy.limitExceeded()); - policy.dequeued(10); - CPPUNIT_ASSERT(!policy.limitExceeded()); - policy.enqueued(10); - CPPUNIT_ASSERT(policy.limitExceeded()); - } - - void testBoth(){ - QueuePolicy policy(5, 50); - for (int i = 0; i < 5; i++) policy.enqueued(11); - CPPUNIT_ASSERT(policy.limitExceeded()); - policy.dequeued(20); - CPPUNIT_ASSERT(!policy.limitExceeded());//fails - policy.enqueued(5); - policy.enqueued(10); - CPPUNIT_ASSERT(policy.limitExceeded()); - } - - void testSettings(){ - //test reading and writing the policy from/to field table - FieldTable settings; - QueuePolicy a(101, 303); - a.update(settings); - QueuePolicy b(settings); - CPPUNIT_ASSERT_EQUAL(a.getMaxCount(), b.getMaxCount()); - CPPUNIT_ASSERT_EQUAL(a.getMaxSize(), b.getMaxSize()); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(QueuePolicyTest); - diff --git a/qpid/cpp/tests/QueueRegistryTest.cpp b/qpid/cpp/tests/QueueRegistryTest.cpp deleted file mode 100644 index 3926d56292..0000000000 --- a/qpid/cpp/tests/QueueRegistryTest.cpp +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <QueueRegistry.h> -#include <qpid_test_plugin.h> -#include <string> - -using namespace qpid::broker; - -class QueueRegistryTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(QueueRegistryTest); - CPPUNIT_TEST(testDeclare); - CPPUNIT_TEST(testDeclareTmp); - CPPUNIT_TEST(testFind); - CPPUNIT_TEST(testDestroy); - CPPUNIT_TEST_SUITE_END(); - - private: - std::string foo, bar; - QueueRegistry reg; - std::pair<Queue::shared_ptr, bool> qc; - - public: - void setUp() { - foo = "foo"; - bar = "bar"; - } - - void testDeclare() { - qc = reg.declare(foo, false, 0, 0); - Queue::shared_ptr q = qc.first; - CPPUNIT_ASSERT(q); - CPPUNIT_ASSERT(qc.second); // New queue - CPPUNIT_ASSERT_EQUAL(foo, q->getName()); - - qc = reg.declare(foo, false, 0, 0); - CPPUNIT_ASSERT_EQUAL(q, qc.first); - CPPUNIT_ASSERT(!qc.second); - - qc = reg.declare(bar, false, 0, 0); - q = qc.first; - CPPUNIT_ASSERT(q); - CPPUNIT_ASSERT_EQUAL(true, qc.second); - CPPUNIT_ASSERT_EQUAL(bar, q->getName()); - } - - void testDeclareTmp() - { - qc = reg.declare(std::string(), false, 0, 0); - CPPUNIT_ASSERT(qc.second); - CPPUNIT_ASSERT_EQUAL(std::string("tmp_1"), qc.first->getName()); - } - - void testFind() { - CPPUNIT_ASSERT(reg.find(foo) == 0); - - reg.declare(foo, false, 0, 0); - reg.declare(bar, false, 0, 0); - Queue::shared_ptr q = reg.find(bar); - CPPUNIT_ASSERT(q); - CPPUNIT_ASSERT_EQUAL(bar, q->getName()); - } - - void testDestroy() { - qc = reg.declare(foo, false, 0, 0); - reg.destroy(foo); - // Queue is gone from the registry. - CPPUNIT_ASSERT(reg.find(foo) == 0); - // Queue is not actually destroyed till we drop our reference. - CPPUNIT_ASSERT_EQUAL(foo, qc.first->getName()); - // We shoud be the only reference. - CPPUNIT_ASSERT_EQUAL(1L, qc.first.use_count()); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(QueueRegistryTest); diff --git a/qpid/cpp/tests/QueueTest.cpp b/qpid/cpp/tests/QueueTest.cpp deleted file mode 100644 index e90c9259cf..0000000000 --- a/qpid/cpp/tests/QueueTest.cpp +++ /dev/null @@ -1,149 +0,0 @@ - /* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <BrokerQueue.h> -#include <QueueRegistry.h> -#include <qpid_test_plugin.h> -#include <iostream> -#include "MockChannel.h" - -using namespace qpid::broker; -using namespace qpid::sys; - - -class TestConsumer : public virtual Consumer{ -public: - Message::shared_ptr last; - - virtual bool deliver(Message::shared_ptr& msg); -}; - - -class QueueTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(QueueTest); - CPPUNIT_TEST(testConsumers); - CPPUNIT_TEST(testRegistry); - CPPUNIT_TEST(testDequeue); - CPPUNIT_TEST_SUITE_END(); - - public: - Message::shared_ptr message(std::string exchange, std::string routingKey) { - return Message::shared_ptr( - new BasicMessage(0, exchange, routingKey, true, true, - MockChannel::basicGetBody())); - } - - void testConsumers(){ - Queue::shared_ptr queue(new Queue("my_queue", true)); - - //Test adding consumers: - TestConsumer c1; - TestConsumer c2; - queue->consume(&c1); - queue->consume(&c2); - - CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getConsumerCount()); - - //Test basic delivery: - Message::shared_ptr msg1 = message("e", "A"); - Message::shared_ptr msg2 = message("e", "B"); - Message::shared_ptr msg3 = message("e", "C"); - - queue->deliver(msg1); - CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); - - queue->deliver(msg2); - CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get()); - - queue->deliver(msg3); - CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get()); - - //Test cancellation: - queue->cancel(&c1); - CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getConsumerCount()); - queue->cancel(&c2); - CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getConsumerCount()); - } - - void testRegistry(){ - //Test use of queues in registry: - QueueRegistry registry; - registry.declare("queue1", true, true); - registry.declare("queue2", true, true); - registry.declare("queue3", true, true); - - CPPUNIT_ASSERT(registry.find("queue1")); - CPPUNIT_ASSERT(registry.find("queue2")); - CPPUNIT_ASSERT(registry.find("queue3")); - - registry.destroy("queue1"); - registry.destroy("queue2"); - registry.destroy("queue3"); - - CPPUNIT_ASSERT(!registry.find("queue1")); - CPPUNIT_ASSERT(!registry.find("queue2")); - CPPUNIT_ASSERT(!registry.find("queue3")); - } - - void testDequeue(){ - Queue::shared_ptr queue(new Queue("my_queue", true)); - Message::shared_ptr msg1 = message("e", "A"); - Message::shared_ptr msg2 = message("e", "B"); - Message::shared_ptr msg3 = message("e", "C"); - Message::shared_ptr received; - - queue->deliver(msg1); - queue->deliver(msg2); - queue->deliver(msg3); - - CPPUNIT_ASSERT_EQUAL(uint32_t(3), queue->getMessageCount()); - - received = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get()); - CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getMessageCount()); - - received = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get()); - CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount()); - - TestConsumer consumer; - queue->consume(&consumer); - queue->dispatch(); - CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get()); - CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount()); - - received = queue->dequeue(); - CPPUNIT_ASSERT(!received); - CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount()); - - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest); - -//TestConsumer -bool TestConsumer::deliver(Message::shared_ptr& msg){ - last = msg; - return true; -} - diff --git a/qpid/cpp/tests/ReferenceTest.cpp b/qpid/cpp/tests/ReferenceTest.cpp deleted file mode 100644 index 753f68ee75..0000000000 --- a/qpid/cpp/tests/ReferenceTest.cpp +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <memory> -#include "qpid_test_plugin.h" -#include "Reference.h" -#include "BrokerMessageMessage.h" -#include "MessageTransferBody.h" -#include "MessageAppendBody.h" -#include "CompletionHandler.h" - -using namespace boost; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace std; - -class ReferenceTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ReferenceTest); - CPPUNIT_TEST(testRegistry); - CPPUNIT_TEST(testReference); - CPPUNIT_TEST_SUITE_END(); - - ProtocolVersion v; - ReferenceRegistry registry; - Reference::shared_ptr r1; - MessageTransferBody::shared_ptr t1, t2; - MessageMessage::shared_ptr m1, m2; - MessageAppendBody::shared_ptr a1, a2; - public: - - ReferenceTest() : - r1(registry.open("bar")), - t1(new MessageTransferBody(v)), - t2(new MessageTransferBody(v)), - m1(new MessageMessage(0, 1, t1, r1)), - m2(new MessageMessage(0, 2, t2, r1)), - a1(new MessageAppendBody(v)), - a2(new MessageAppendBody(v)) - {} - - void testRegistry() { - Reference::shared_ptr ref = registry.open("foo"); - CPPUNIT_ASSERT_EQUAL(string("foo"), ref->getId()); - CPPUNIT_ASSERT(ref == registry.get("foo")); - try { - registry.get("none"); - CPPUNIT_FAIL("Expected exception"); - } catch (...) {} - try { - registry.open("foo"); - CPPUNIT_FAIL("Expected exception"); - } catch(...) {} - } - - void testReference() { - r1->addMessage(m1); - r1->addMessage(m2); - CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getMessages().size()); - r1->append(a1); - r1->append(a2); - CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size()); - const vector<MessageMessage::shared_ptr> messages = r1->getMessages(); - r1->close(); - try { - registry.open("bar"); - CPPUNIT_FAIL("Expected exception"); - } catch(...) {} - - CPPUNIT_ASSERT_EQUAL(messages[0], m1); - CPPUNIT_ASSERT_EQUAL(messages[0]->getReference()->getAppends()[0], a1); - CPPUNIT_ASSERT_EQUAL(messages[0]->getReference()->getAppends()[1], a2); - - CPPUNIT_ASSERT_EQUAL(messages[1], m2); - CPPUNIT_ASSERT_EQUAL(messages[1]->getReference()->getAppends()[0], a1); - CPPUNIT_ASSERT_EQUAL(messages[1]->getReference()->getAppends()[1], a2); - } - - -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ReferenceTest); diff --git a/qpid/cpp/tests/TopicExchangeTest.cpp b/qpid/cpp/tests/TopicExchangeTest.cpp deleted file mode 100644 index 4ba9cdd6e5..0000000000 --- a/qpid/cpp/tests/TopicExchangeTest.cpp +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#include <TopicExchange.h> -#include <qpid_test_plugin.h> - -using namespace qpid::broker; - -Tokens makeTokens(char** begin, char** end) -{ - Tokens t; - t.insert(t.end(), begin, end); - return t; -} - -// Calculate size of an array. -#define LEN(a) (sizeof(a)/sizeof(a[0])) - -// Convert array to token vector -#define TOKENS(a) makeTokens(a, a + LEN(a)) - -// Allow CPPUNIT_EQUALS to print a Tokens. -CppUnit::OStringStream& operator <<(CppUnit::OStringStream& out, const Tokens& v) -{ - out << "[ "; - for (Tokens::const_iterator i = v.begin(); - i != v.end(); ++i) - { - out << '"' << *i << '"' << (i+1 == v.end() ? "]" : ", "); - } - return out; -} - - -class TokensTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(TokensTest); - CPPUNIT_TEST(testTokens); - CPPUNIT_TEST_SUITE_END(); - - public: - void testTokens() - { - Tokens tokens("hello.world"); - char* expect[] = {"hello", "world"}; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect), tokens); - - tokens = "a.b.c"; - char* expect2[] = { "a", "b", "c" }; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect2), tokens); - - tokens = ""; - CPPUNIT_ASSERT(tokens.empty()); - - tokens = "x"; - char* expect3[] = { "x" }; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect3), tokens); - - tokens = (".x"); - char* expect4[] = { "", "x" }; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect4), tokens); - - tokens = ("x."); - char* expect5[] = { "x", "" }; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect5), tokens); - - tokens = ("."); - char* expect6[] = { "", "" }; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect6), tokens); - - tokens = (".."); - char* expect7[] = { "", "", "" }; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect7), tokens); - } - -}; - -#define ASSERT_NORMALIZED(expect, pattern) \ - CPPUNIT_ASSERT_EQUAL(Tokens(expect), static_cast<Tokens>(TopicPattern(pattern))) -class TopicPatternTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(TopicPatternTest); - CPPUNIT_TEST(testNormalize); - CPPUNIT_TEST(testPlain); - CPPUNIT_TEST(testStar); - CPPUNIT_TEST(testHash); - CPPUNIT_TEST(testMixed); - CPPUNIT_TEST(testCombo); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testNormalize() - { - CPPUNIT_ASSERT(TopicPattern("").empty()); - ASSERT_NORMALIZED("a.b.c", "a.b.c"); - ASSERT_NORMALIZED("a.*.c", "a.*.c"); - ASSERT_NORMALIZED("#", "#"); - ASSERT_NORMALIZED("#", "#.#.#.#"); - ASSERT_NORMALIZED("*.*.*.#", "#.*.#.*.#.#.*"); - ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*.#"); - ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*"); - } - - void testPlain() { - TopicPattern p("ab.cd.e"); - CPPUNIT_ASSERT(p.match("ab.cd.e")); - CPPUNIT_ASSERT(!p.match("abx.cd.e")); - CPPUNIT_ASSERT(!p.match("ab.cd")); - CPPUNIT_ASSERT(!p.match("ab.cd..e.")); - CPPUNIT_ASSERT(!p.match("ab.cd.e.")); - CPPUNIT_ASSERT(!p.match(".ab.cd.e")); - - p = ""; - CPPUNIT_ASSERT(p.match("")); - - p = "."; - CPPUNIT_ASSERT(p.match(".")); - } - - - void testStar() - { - TopicPattern p("a.*.b"); - CPPUNIT_ASSERT(p.match("a.xx.b")); - CPPUNIT_ASSERT(!p.match("a.b")); - - p = "*.x"; - CPPUNIT_ASSERT(p.match("y.x")); - CPPUNIT_ASSERT(p.match(".x")); - CPPUNIT_ASSERT(!p.match("x")); - - p = "x.x.*"; - CPPUNIT_ASSERT(p.match("x.x.y")); - CPPUNIT_ASSERT(p.match("x.x.")); - CPPUNIT_ASSERT(!p.match("x.x")); - CPPUNIT_ASSERT(!p.match("q.x.y")); - } - - void testHash() - { - TopicPattern p("a.#.b"); - CPPUNIT_ASSERT(p.match("a.b")); - CPPUNIT_ASSERT(p.match("a.x.b")); - CPPUNIT_ASSERT(p.match("a..x.y.zz.b")); - CPPUNIT_ASSERT(!p.match("a.b.")); - CPPUNIT_ASSERT(!p.match("q.x.b")); - - p = "a.#"; - CPPUNIT_ASSERT(p.match("a")); - CPPUNIT_ASSERT(p.match("a.b")); - CPPUNIT_ASSERT(p.match("a.b.c")); - - p = "#.a"; - CPPUNIT_ASSERT(p.match("a")); - CPPUNIT_ASSERT(p.match("x.y.a")); - } - - void testMixed() - { - TopicPattern p("*.x.#.y"); - CPPUNIT_ASSERT(p.match("a.x.y")); - CPPUNIT_ASSERT(p.match("a.x.p.qq.y")); - CPPUNIT_ASSERT(!p.match("a.a.x.y")); - CPPUNIT_ASSERT(!p.match("aa.x.b.c")); - - p = "a.#.b.*"; - CPPUNIT_ASSERT(p.match("a.b.x")); - CPPUNIT_ASSERT(p.match("a.x.x.x.b.x")); - } - - void testCombo() { - TopicPattern p("*.#.#.*.*.#"); - CPPUNIT_ASSERT(p.match("x.y.z")); - CPPUNIT_ASSERT(p.match("x.y.z.a.b.c")); - CPPUNIT_ASSERT(!p.match("x.y")); - CPPUNIT_ASSERT(!p.match("x")); - } -}; - - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(TopicPatternTest); -CPPUNIT_TEST_SUITE_REGISTRATION(TokensTest); diff --git a/qpid/cpp/tests/TxAckTest.cpp b/qpid/cpp/tests/TxAckTest.cpp deleted file mode 100644 index eb4ada4ac8..0000000000 --- a/qpid/cpp/tests/TxAckTest.cpp +++ /dev/null @@ -1,113 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <NullMessageStore.h> -#include <RecoveryManager.h> -#include <TxAck.h> -#include <qpid_test_plugin.h> -#include <iostream> -#include <list> -#include <vector> -#include "MockChannel.h" - -using std::list; -using std::vector; -using namespace qpid::broker; -using namespace qpid::framing; - -class TxAckTest : public CppUnit::TestCase -{ - - class TestMessageStore : public NullMessageStore - { - public: - vector<PersistableMessage*> dequeued; - - void dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& /*queue*/) - { - dequeued.push_back(&msg); - } - - TestMessageStore() : NullMessageStore() {} - ~TestMessageStore(){} - }; - - CPPUNIT_TEST_SUITE(TxAckTest); - CPPUNIT_TEST(testPrepare); - CPPUNIT_TEST(testCommit); - CPPUNIT_TEST_SUITE_END(); - - - AccumulatedAck acked; - TestMessageStore store; - Queue::shared_ptr queue; - vector<Message::shared_ptr> messages; - list<DeliveryRecord> deliveries; - TxAck op; - - -public: - - TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) - { - for(int i = 0; i < 10; i++){ - Message::shared_ptr msg( - new BasicMessage(0, "exchange", "routing_key", false, false, - MockChannel::basicGetBody())); - msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); - msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); - messages.push_back(msg); - deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1))); - } - - //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not) - acked.range = 5; - acked.individual.push_back(7); - acked.individual.push_back(9); - } - - void testPrepare() - { - //ensure acked messages are discarded, i.e. dequeued from store - op.prepare(0); - CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size()); - CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size()); - int dequeued[] = {0, 1, 2, 3, 4, 6, 8}; - for (int i = 0; i < 7; i++) { - CPPUNIT_ASSERT_EQUAL((PersistableMessage*) messages[dequeued[i]].get(), store.dequeued[i]); - } - } - - void testCommit() - { - //emsure acked messages are removed from list - op.commit(); - CPPUNIT_ASSERT_EQUAL((size_t) 3, deliveries.size()); - list<DeliveryRecord>::iterator i = deliveries.begin(); - CPPUNIT_ASSERT(i->matches(6));//msg 6 - CPPUNIT_ASSERT((++i)->matches(8));//msg 8 - CPPUNIT_ASSERT((++i)->matches(10));//msg 10 - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(TxAckTest); - diff --git a/qpid/cpp/tests/TxBufferTest.cpp b/qpid/cpp/tests/TxBufferTest.cpp deleted file mode 100644 index bd8ae3796b..0000000000 --- a/qpid/cpp/tests/TxBufferTest.cpp +++ /dev/null @@ -1,269 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <TxBuffer.h> -#include <qpid_test_plugin.h> -#include <iostream> -#include <vector> - -using namespace qpid::broker; - -template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){ - unsigned int i = 0; - while(i < expected.size() && i < actual.size()){ - CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]); - i++; - } - CPPUNIT_ASSERT(i == expected.size()); - CPPUNIT_ASSERT(i == actual.size()); -} - -class TxBufferTest : public CppUnit::TestCase -{ - class MockTxOp : public TxOp{ - enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8}; - std::vector<int> expected; - std::vector<int> actual; - bool failOnPrepare; - public: - MockTxOp() : failOnPrepare(false) {} - MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {} - - bool prepare(TransactionContext*) throw(){ - actual.push_back(PREPARE); - return !failOnPrepare; - } - void commit() throw(){ - actual.push_back(COMMIT); - } - void rollback() throw(){ - actual.push_back(ROLLBACK); - } - MockTxOp& expectPrepare(){ - expected.push_back(PREPARE); - return *this; - } - MockTxOp& expectCommit(){ - expected.push_back(COMMIT); - return *this; - } - MockTxOp& expectRollback(){ - expected.push_back(ROLLBACK); - return *this; - } - void check(){ - assertEqualVector(expected, actual); - } - ~MockTxOp(){} - }; - - class MockTransactionalStore : public TransactionalStore{ - enum op_codes {BEGIN=2, COMMIT=4, ABORT=8}; - std::vector<int> expected; - std::vector<int> actual; - - enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3}; - int state; - - class TestTransactionContext : public TransactionContext{ - MockTransactionalStore* store; - public: - TestTransactionContext(MockTransactionalStore* _store) : store(_store) {} - void commit(){ - if(store->state != OPEN) throw "txn already completed"; - store->state = COMMITTED; - } - - void abort(){ - if(store->state != OPEN) throw "txn already completed"; - store->state = ABORTED; - } - ~TestTransactionContext(){} - }; - - - public: - MockTransactionalStore() : state(OPEN){} - - std::auto_ptr<TPCTransactionContext> begin(const std::string&){ - throw "Operation not supported"; - } - void prepare(TPCTransactionContext&){ - throw "Operation not supported"; - } - - std::auto_ptr<TransactionContext> begin(){ - actual.push_back(BEGIN); - std::auto_ptr<TransactionContext> txn(new TestTransactionContext(this)); - return txn; - } - void commit(TransactionContext& ctxt){ - actual.push_back(COMMIT); - dynamic_cast<TestTransactionContext&>(ctxt).commit(); - } - void abort(TransactionContext& ctxt){ - actual.push_back(ABORT); - dynamic_cast<TestTransactionContext&>(ctxt).abort(); - } - MockTransactionalStore& expectBegin(){ - expected.push_back(BEGIN); - return *this; - } - MockTransactionalStore& expectCommit(){ - expected.push_back(COMMIT); - return *this; - } - MockTransactionalStore& expectAbort(){ - expected.push_back(ABORT); - return *this; - } - void check(){ - assertEqualVector(expected, actual); - } - - bool isCommitted(){ - return state == COMMITTED; - } - - bool isAborted(){ - return state == ABORTED; - } - - bool isOpen() const{ - return state == OPEN; - } - ~MockTransactionalStore(){} - }; - - CPPUNIT_TEST_SUITE(TxBufferTest); - CPPUNIT_TEST(testPrepareAndCommit); - CPPUNIT_TEST(testFailOnPrepare); - CPPUNIT_TEST(testRollback); - CPPUNIT_TEST(testBufferIsClearedAfterRollback); - CPPUNIT_TEST(testBufferIsClearedAfterCommit); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testPrepareAndCommit(){ - MockTransactionalStore store; - store.expectBegin().expectCommit(); - - MockTxOp opA; - opA.expectPrepare().expectCommit(); - MockTxOp opB; - opB.expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test reative order - MockTxOp opC; - opC.expectPrepare().expectCommit(); - - TxBuffer buffer; - buffer.enlist(&opA); - buffer.enlist(&opB); - buffer.enlist(&opB);//opB enlisted twice - buffer.enlist(&opC); - - CPPUNIT_ASSERT(buffer.prepare(&store)); - buffer.commit(); - store.check(); - CPPUNIT_ASSERT(store.isCommitted()); - opA.check(); - opB.check(); - opC.check(); - } - - void testFailOnPrepare(){ - MockTransactionalStore store; - store.expectBegin().expectAbort(); - - MockTxOp opA; - opA.expectPrepare(); - MockTxOp opB(true); - opB.expectPrepare(); - MockTxOp opC;//will never get prepare as b will fail - - TxBuffer buffer; - buffer.enlist(&opA); - buffer.enlist(&opB); - buffer.enlist(&opC); - - CPPUNIT_ASSERT(!buffer.prepare(&store)); - store.check(); - CPPUNIT_ASSERT(store.isAborted()); - opA.check(); - opB.check(); - opC.check(); - } - - void testRollback(){ - MockTxOp opA; - opA.expectRollback(); - MockTxOp opB(true); - opB.expectRollback(); - MockTxOp opC; - opC.expectRollback(); - - TxBuffer buffer; - buffer.enlist(&opA); - buffer.enlist(&opB); - buffer.enlist(&opC); - - buffer.rollback(); - opA.check(); - opB.check(); - opC.check(); - } - - void testBufferIsClearedAfterRollback(){ - MockTxOp opA; - opA.expectRollback(); - MockTxOp opB; - opB.expectRollback(); - - TxBuffer buffer; - buffer.enlist(&opA); - buffer.enlist(&opB); - - buffer.rollback(); - buffer.commit();//second call should not reach ops - opA.check(); - opB.check(); - } - - void testBufferIsClearedAfterCommit(){ - MockTxOp opA; - opA.expectCommit(); - MockTxOp opB; - opB.expectCommit(); - - TxBuffer buffer; - buffer.enlist(&opA); - buffer.enlist(&opB); - - buffer.commit(); - buffer.rollback();//second call should not reach ops - opA.check(); - opB.check(); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(TxBufferTest); - diff --git a/qpid/cpp/tests/TxPublishTest.cpp b/qpid/cpp/tests/TxPublishTest.cpp deleted file mode 100644 index 8ce0da6508..0000000000 --- a/qpid/cpp/tests/TxPublishTest.cpp +++ /dev/null @@ -1,108 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <NullMessageStore.h> -#include <RecoveryManager.h> -#include <TxPublish.h> -#include <qpid_test_plugin.h> -#include <iostream> -#include <list> -#include <vector> -#include "MockChannel.h" - -using std::list; -using std::pair; -using std::vector; -using namespace qpid::broker; -using namespace qpid::framing; - -class TxPublishTest : public CppUnit::TestCase -{ - typedef std::pair<string, PersistableMessage*> msg_queue_pair; - - class TestMessageStore : public NullMessageStore - { - public: - vector<msg_queue_pair> enqueued; - - void enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue) - { - enqueued.push_back(msg_queue_pair(queue.getName(), &msg)); - } - - //dont care about any of the other methods: - TestMessageStore() : NullMessageStore(false) {} - ~TestMessageStore(){} - }; - - CPPUNIT_TEST_SUITE(TxPublishTest); - CPPUNIT_TEST(testPrepare); - CPPUNIT_TEST(testCommit); - CPPUNIT_TEST_SUITE_END(); - - - TestMessageStore store; - Queue::shared_ptr queue1; - Queue::shared_ptr queue2; - Message::shared_ptr const msg; - TxPublish op; - -public: - - TxPublishTest() : - queue1(new Queue("queue1", false, &store, 0)), - queue2(new Queue("queue2", false, &store, 0)), - msg(new BasicMessage(0, "exchange", "routing_key", false, false, - MockChannel::basicGetBody())), - op(msg) - { - msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); - msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); - op.deliverTo(queue1); - op.deliverTo(queue2); - } - - void testPrepare() - { - //ensure messages are enqueued in store - op.prepare(0); - CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size()); - CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first); - CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[0].second); - CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first); - CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[1].second); - } - - void testCommit() - { - //ensure messages are delivered to queue - op.commit(); - CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue1->getMessageCount()); - CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue()); - - CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue2->getMessageCount()); - CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue()); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(TxPublishTest); - diff --git a/qpid/cpp/tests/ValueTest.cpp b/qpid/cpp/tests/ValueTest.cpp deleted file mode 100644 index a3f9ec2541..0000000000 --- a/qpid/cpp/tests/ValueTest.cpp +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#include <Value.h> -#include <qpid_test_plugin.h> - -using namespace qpid::framing; - - -class ValueTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ValueTest); - CPPUNIT_TEST(testStringValueEquals); - CPPUNIT_TEST(testIntegerValueEquals); - CPPUNIT_TEST(testDecimalValueEquals); - CPPUNIT_TEST(testFieldTableValueEquals); - CPPUNIT_TEST_SUITE_END(); - - StringValue s; - IntegerValue i; - DecimalValue d; - FieldTableValue ft; - EmptyValue e; - - public: - ValueTest() : - s("abc"), - i(42), - d(1234,2) - - { - ft.getValue().setString("foo", "FOO"); - ft.getValue().setInt("magic", 7); - } - - void testStringValueEquals() - { - - CPPUNIT_ASSERT(StringValue("abc") == s); - CPPUNIT_ASSERT(s != StringValue("foo")); - CPPUNIT_ASSERT(s != e); - CPPUNIT_ASSERT(e != d); - CPPUNIT_ASSERT(e != ft); - } - - void testIntegerValueEquals() - { - CPPUNIT_ASSERT(IntegerValue(42) == i); - CPPUNIT_ASSERT(IntegerValue(5) != i); - CPPUNIT_ASSERT(i != e); - CPPUNIT_ASSERT(i != d); - } - - void testDecimalValueEquals() - { - CPPUNIT_ASSERT(DecimalValue(1234, 2) == d); - CPPUNIT_ASSERT(DecimalValue(12345, 2) != d); - CPPUNIT_ASSERT(DecimalValue(1234, 3) != d); - CPPUNIT_ASSERT(d != s); - } - - - void testFieldTableValueEquals() - { - CPPUNIT_ASSERT_EQUAL(std::string("FOO"), - ft.getValue().getString("foo")); - CPPUNIT_ASSERT_EQUAL(7, ft.getValue().getInt("magic")); - - FieldTableValue f2; - CPPUNIT_ASSERT(ft != f2); - f2.getValue().setString("foo", "FOO"); - CPPUNIT_ASSERT(ft != f2); - f2.getValue().setInt("magic", 7); - CPPUNIT_ASSERT_EQUAL(ft,f2); - CPPUNIT_ASSERT(ft == f2); - f2.getValue().setString("foo", "BAR"); - CPPUNIT_ASSERT(ft != f2); - CPPUNIT_ASSERT(ft != i); - } - -}; - - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ValueTest); - diff --git a/qpid/cpp/tests/client_test.cpp b/qpid/cpp/tests/client_test.cpp deleted file mode 100644 index 92952c69b1..0000000000 --- a/qpid/cpp/tests/client_test.cpp +++ /dev/null @@ -1,138 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * This file provides a simple test (and example) of basic - * functionality including declaring an exchange and a queue, binding - * these together, publishing a message and receiving that message - * asynchronously. - */ - -#include <iostream> - -#include <QpidError.h> -#include <ClientChannel.h> -#include <Connection.h> -#include <ClientMessage.h> -#include <MessageListener.h> -#include <sys/Monitor.h> -#include <FieldTable.h> - -using namespace qpid::client; -using namespace qpid::sys; -using std::string; - -bool verbose = false; - -/** - * A simple message listener implementation that prints out the - * message content then notifies a montitor allowing the test to - * complete. - */ -class SimpleListener : public virtual MessageListener{ - Monitor* monitor; - -public: - inline SimpleListener(Monitor* _monitor) : monitor(_monitor){} - - inline virtual void received(Message& msg){ - if (verbose) - std::cout << "Received message " << msg.getData() << std::endl; - monitor->notify(); - } -}; - -int main(int argc, char**) -{ - verbose = argc > 1; - try { - //Use a custom exchange - Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE); - //Use a named, temporary queue - Queue queue("MyQueue", true); - - - Connection con(verbose); - string host("localhost"); - con.open(host, 5672, "guest", "guest", "/test"); - if (verbose) - std::cout << "Opened connection." << std::endl; - - //Create and open a channel on the connection through which - //most functionality is exposed - Channel channel; - con.openChannel(channel); - if (verbose) std::cout << "Opened channel." << std::endl; - - //'declare' the exchange and the queue, which will create them - //as they don't exist - channel.declareExchange(exchange); - if (verbose) std::cout << "Declared exchange." << std::endl; - channel.declareQueue(queue); - if (verbose) std::cout << "Declared queue." << std::endl; - - //now bind the queue to the exchange - qpid::framing::FieldTable args; - channel.bind(exchange, queue, "MyTopic", args); - if (verbose) std::cout << "Bound queue to exchange." << std::endl; - - //Set up a message listener to receive any messages that - //arrive in our queue on the broker. We only expect one, and - //as it will be received on another thread, we create a - //montior to use to notify the main thread when that message - //is received. - Monitor monitor; - SimpleListener listener(&monitor); - string tag("MyTag"); - channel.consume(queue, tag, &listener); - if (verbose) std::cout << "Registered consumer." << std::endl; - - //we need to enable the message dispatching for this channel - //and we want that to occur on another thread so we call - //start(). - channel.start(); - - //Now we create and publish a message to our exchange with a - //routing key that will cause it to be routed to our queue - Message msg; - string data("MyMessage"); - msg.setData(data); - channel.publish(msg, exchange, "MyTopic"); - if (verbose) std::cout << "Published message: " << data << std::endl; - - { - Monitor::ScopedLock l(monitor); - //now we wait until we receive notification that the - //message was received - monitor.wait(); - } - - //close the channel & connection - channel.close(); - if (verbose) std::cout << "Closed channel." << std::endl; - con.close(); - if (verbose) std::cout << "Closed connection." << std::endl; - return 0; - } catch(const std::exception& e) { - std::cout << e.what() << std::endl; - } - return 1; -} diff --git a/qpid/cpp/tests/dlclose_noop.c b/qpid/cpp/tests/dlclose_noop.c deleted file mode 100644 index ba2fa75891..0000000000 --- a/qpid/cpp/tests/dlclose_noop.c +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/* - * Loaded via LD_PRELOAD this will turn dlclose into a no-op. - * - * Allows valgrind to generate useful reports from programs that - * dynamically unload libraries before exit, such as CppUnit's - * DllPlugInTester. - * - */ - -#include <stdio.h> -void* dlclose(void* handle) {} - diff --git a/qpid/cpp/tests/echo_service.cpp b/qpid/cpp/tests/echo_service.cpp deleted file mode 100644 index ff11a336fe..0000000000 --- a/qpid/cpp/tests/echo_service.cpp +++ /dev/null @@ -1,230 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * This class provides an example of using AMQP for a request-response - * style system. 'Requests' are messages sent to a well known - * destination. A 'service' process consumes these message and - * responds by echoing the message back to the sender on a - * sender-specified private queue. - */ - -#include <QpidError.h> -#include <ClientChannel.h> -#include <Connection.h> -#include <ClientExchange.h> -#include <MessageListener.h> -#include <ClientQueue.h> -#include <sys/Time.h> -#include <iostream> -#include <sstream> - -using namespace qpid::client; -using namespace qpid::sys; -using std::string; - - -/** - * A message listener implementation representing the 'service', this - * will 'echo' any requests received. - */ -class EchoServer : public MessageListener{ - Channel* const channel; -public: - EchoServer(Channel* channel); - virtual void received(Message& msg); -}; - -/** - * A message listener implementation that merely prints received - * messages to the console. Used to report on 'echo' responses. - */ -class LoggingListener : public MessageListener{ -public: - virtual void received(Message& msg); -}; - -/** - * A utility class that manages the command line options needed to run - * the example confirgurably. - */ -class Args{ - string host; - int port; - bool trace; - bool help; - bool client; -public: - inline Args() : host("localhost"), port(5672), trace(false), help(false), client(false){} - void parse(int argc, char** argv); - void usage(); - - inline const string& getHost() const { return host;} - inline int getPort() const { return port; } - inline bool getTrace() const { return trace; } - inline bool getHelp() const { return help; } - inline bool getClient() const { return client; } -}; - -/** - * The main test path. There are two basic modes: 'client' and - * 'service'. First one or more services are started, then one or more - * clients are started and messages can be sent. - */ -int main(int argc, char** argv){ - const std::string echo_service("echo_service"); - Args args; - args.parse(argc, argv); - if (args.getHelp()) { - args.usage(); - } else if (args.getClient()) { - //we have been started in 'client' mode, i.e. we will send an - //echo requests and print responses received. - try { - //Create connection & open a channel - Connection connection(args.getTrace()); - connection.open(args.getHost(), args.getPort()); - Channel channel; - connection.openChannel(channel); - - //Setup: declare the private 'response' queue and bind it - //to the direct exchange by its name which will be - //generated by the server - Queue response; - channel.declareQueue(response); - qpid::framing::FieldTable emptyArgs; - channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, response, response.getName(), emptyArgs); - - //Consume from the response queue, logging all echoed message to console: - LoggingListener listener; - std::string tag; - channel.consume(response, tag, &listener); - - //Process incoming requests on a new thread - channel.start(); - - //get messages from console and send them: - std::string text; - std::cout << "Enter text to send:" << std::endl; - while (std::getline(std::cin, text)) { - std::cout << "Sending " << text << " to echo server." << std::endl; - Message msg; - msg.getHeaders().setString("RESPONSE_QUEUE", response.getName()); - msg.setData(text); - channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service); - - std::cout << "Enter text to send:" << std::endl; - } - - connection.close(); - } catch(qpid::QpidError error) { - std::cout << error.what() << std::endl; - } - } else { - // we are in 'service' mode, i.e. we will consume messages - // from the request queue and echo each request back to the - // senders own private response queue. - try { - //Create connection & open a channel - Connection connection(args.getTrace()); - connection.open(args.getHost(), args.getPort()); - Channel channel; - connection.openChannel(channel); - - //Setup: declare the 'request' queue and bind it to the direct exchange with a 'well known' name - Queue request("request"); - channel.declareQueue(request); - qpid::framing::FieldTable emptyArgs; - channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, request, echo_service, emptyArgs); - - //Consume from the request queue, echoing back all messages received to the client that sent them - EchoServer server(&channel); - std::string tag = "server_tag"; - channel.consume(request, tag, &server); - - //Process incoming requests on the main thread - channel.run(); - - connection.close(); - } catch(qpid::QpidError error) { - std::cout << error.what() << std::endl; - } - } -} - -EchoServer::EchoServer(Channel* _channel) : channel(_channel){} - -void EchoServer::received(Message& message) -{ - //get name of response queues binding to the default direct exchange: - const std::string name = message.getHeaders().getString("RESPONSE_QUEUE"); - - if (name.empty()) { - std::cout << "Cannot echo " << message.getData() << ", no response queue specified." << std::endl; - } else { - //print message to console: - std::cout << "Echoing " << message.getData() << " back to " << name << std::endl; - - //'echo' the message back: - channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name); - } -} - -void LoggingListener::received(Message& message) -{ - //print message to console: - std::cout << "Received echo: " << message.getData() << std::endl; -} - - -void Args::parse(int argc, char** argv){ - for(int i = 1; i < argc; i++){ - string name(argv[i]); - if("-help" == name){ - help = true; - break; - }else if("-host" == name){ - host = argv[++i]; - }else if("-port" == name){ - port = atoi(argv[++i]); - }else if("-trace" == name){ - trace = true; - }else if("-client" == name){ - client = true; - }else{ - std::cout << "Warning: unrecognised option " << name << std::endl; - } - } -} - -void Args::usage(){ - std::cout << "Options:" << std::endl; - std::cout << " -help" << std::endl; - std::cout << " Prints this usage message" << std::endl; - std::cout << " -host <host>" << std::endl; - std::cout << " Specifies host to connect to (default is localhost)" << std::endl; - std::cout << " -port <port>" << std::endl; - std::cout << " Specifies port to conect to (default is 5762)" << std::endl; - std::cout << " -trace" << std::endl; - std::cout << " Indicates that the frames sent and received should be logged" << std::endl; - std::cout << " -client" << std::endl; - std::cout << " Run as a client (else will run as a server)" << std::endl; -} diff --git a/qpid/cpp/tests/examples.Makefile b/qpid/cpp/tests/examples.Makefile deleted file mode 100644 index 45999f7852..0000000000 --- a/qpid/cpp/tests/examples.Makefile +++ /dev/null @@ -1,66 +0,0 @@ -# -# XXX: Edit these locations to suit. -# -BOOST_LOCATION := $(HOME)/local/boost-1.33.1 -APR_LOCATION := $(HOME)/local/apr-1.2.7 - -CXXFLAGS := -DNDEBUG -DUSE_APR -MMD -fpic - -# -# Configure Boost. -# -BOOST_CFLAGS := -I$(BOOST_LOCATION)/include/boost-1_33_1 -CXXFLAGS := $(CXXFLAGS) $(BOOST_CFLAGS) - -# -# Configure APR. -# -APR_CFLAGS := -I$(APR_LOCATION)/include/apr-1 -APR_LDFLAGS := $(shell $(APR_LOCATION)/bin/apr-1-config --libs) -L$(APR_LOCATION)/lib -lapr-1 -CXXFLAGS := $(CXXFLAGS) $(APR_CFLAGS) -LDFLAGS := $(LDFLAGS) $(APR_LDFLAGS) - -# -# Configure Qpid cpp client. -# -QPID_CLIENT_LDFLAGS := ../lib/libcommon.so ../lib/libclient.so -includeDir := ../include -QPID_CLIENT_CFLAGS := \ - -I$(includeDir)/gen \ - -I$(includeDir)/client \ - -I$(includeDir)/broker \ - -I$(includeDir)/common \ - -I$(includeDir)/common/sys \ - -I$(includeDir)/common/framing - -CXXFLAGS := $(CXXFLAGS) $(QPID_CLIENT_CFLAGS) -LDFLAGS := $(LDFLAGS) $(QPID_CLIENT_LDFLAGS) - -CXX := g++ - -# -# Add rule to build examples. -# -.SUFFIX: .cpp -%: %.cpp - $(CXX) $(CXXFLAGS) $(LDFLAGS) $< -o $@ - -# -# Define targets. -# - -EXAMPLES := client_test topic_listener topic_publisher echo_service - -cppFiles := $(wildcard *.cpp) -programs = $(foreach cppFile, $(cppFiles), $(subst .cpp, ,$(cppFile))) - -.PHONY: -all: $(programs) - -debug: - @echo cppFiles = $(cppFiles) - @echo programs = $(programs) - -.PHONY: -clean: - -rm $(EXAMPLES) diff --git a/qpid/cpp/tests/examples.README b/qpid/cpp/tests/examples.README deleted file mode 100644 index 65f908c249..0000000000 --- a/qpid/cpp/tests/examples.README +++ /dev/null @@ -1,18 +0,0 @@ -Building the examples ---------------------- - -You had better edit the Makefile and provide the locations for APR and boost. - -Then just type 'make'. - - -Running the examples --------------------- - -Before running the examples ensure that you have setup your LD_LIBRARY_PATH. - -Most of the examples take the following connection parameters for your -AMQP broker: - - -host host - -port port diff --git a/qpid/cpp/tests/kill_broker b/qpid/cpp/tests/kill_broker deleted file mode 100755 index b71ca22ffd..0000000000 --- a/qpid/cpp/tests/kill_broker +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh -PID=qpidd.pid -if [ -f $PID ] ; then kill -9 `cat $PID` ; rm -f $PID ; fi diff --git a/qpid/cpp/tests/python_tests b/qpid/cpp/tests/python_tests deleted file mode 100755 index 4ee177ce6a..0000000000 --- a/qpid/cpp/tests/python_tests +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/sh -# Run the python tests. -if test -d ../../python ; then - cd ../../python && ./run-tests -v -s "0-9" -I cpp_failing_0-9.txt -else - echo Warning: python tests not found. -fi - diff --git a/qpid/cpp/tests/qpid_test_plugin.h b/qpid/cpp/tests/qpid_test_plugin.h deleted file mode 100644 index b2f4a8ffed..0000000000 --- a/qpid/cpp/tests/qpid_test_plugin.h +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef _qpid_test_plugin_ -#define _qpid_test_plugin_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * Convenience to include cppunit headers needed by qpid test plugins and - * workaround for warning from superfluous main() declaration - * in cppunit/TestPlugIn.h - */ - -#include <cppunit/TestCase.h> -#include <cppunit/TextTestRunner.h> -#include <cppunit/extensions/HelperMacros.h> -#include <cppunit/plugin/TestPlugIn.h> - -// Redefine CPPUNIT_PLUGIN_IMPLEMENT_MAIN to a dummy typedef to avoid warnings. -// -#if defined(CPPUNIT_HAVE_UNIX_DLL_LOADER) || defined(CPPUNIT_HAVE_UNIX_SHL_LOADER) -#undef CPPUNIT_PLUGIN_IMPLEMENT_MAIN -#define CPPUNIT_PLUGIN_IMPLEMENT_MAIN() typedef char __CppUnitPlugInImplementMainDummyTypeDef -#endif - -#endif /*!_qpid_test_plugin_*/ diff --git a/qpid/cpp/tests/quick_topictest b/qpid/cpp/tests/quick_topictest deleted file mode 100755 index 9df5b5c84c..0000000000 --- a/qpid/cpp/tests/quick_topictest +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/sh -# Quick and quiet topic test for make check. -./topictest -s2 -m2 -b1 > topictest.log 2>&1 || { - echo See topictest.log. - exit 1 -} -rm topictest.log diff --git a/qpid/cpp/tests/run-python-tests b/qpid/cpp/tests/run-python-tests deleted file mode 100755 index e69de29bb2..0000000000 --- a/qpid/cpp/tests/run-python-tests +++ /dev/null diff --git a/qpid/cpp/tests/run-unit-tests b/qpid/cpp/tests/run-unit-tests deleted file mode 100755 index f066a38205..0000000000 --- a/qpid/cpp/tests/run-unit-tests +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/sh -# -# Library names (without path or .so) and CppUnit test paths can be -# specified on the command line or in env var UNIT_TESTS. For example: -# -# Selected test classes: -# ./run-unit-tests ValueTest ClientChannelTest -# -# Individual test method -# ./run-unit-tests ValueTest :ValueTest::testStringValueEquals -# -# Build and run selected tests: -# make check TESTS=run-unit-tests UNIT_TESTS=ClientChannelTest -# - -# Default VALGRIND from the path and $srcdir to . but -# don't override values set by make. -test -z "$VALGRIND" -a -z "$MAKEFLAGS" && VALGRIND=`which valgrind` 2>/dev/null -test -z "$srcdir" && srcdir=. - -rm -f valgrind.out -vg_log=--log-file-exactly=valgrind.out -source $srcdir/setup -for u in $* $UNIT_TESTS ; do - case $u in - :*) TEST_ARGS="$TEST_ARGS $u" ;; # A test path. - *) TEST_ARGS="$TEST_ARGS $pwd/.libs/$u.so" ;; # A test library. - esac -done -# If none specified, run all tests in .libs -test -z "$TEST_ARGS" && TEST_ARGS="$pwd/.libs/*Test.so" -fail=0 - -$vg DllPlugInTester -c -b $TEST_ARGS || fail=1 -vg_check valgrind.out || fail=1 - -exit $fail diff --git a/qpid/cpp/tests/setup b/qpid/cpp/tests/setup deleted file mode 100644 index aaa3afd9b8..0000000000 --- a/qpid/cpp/tests/setup +++ /dev/null @@ -1,81 +0,0 @@ -# -*- sh -*- - -test "$VERBOSE" = yes && set -x - -pwd=`pwd` -test -z "$abs_srcdir" && abs_srcdir=$pwd - -t0=`echo "$0"|sed 's,.*/,,'`.tmp; tmp=$t0/$$ -pid=0 -test -z "$TEST_DEBUG" && -trap 's=$?;test $pid = 0||kill -2 $pid;cd "$pwd" && rm -rf $t0 && exit $s' 0 -test -z "$TEST_DEBUG" && trap '(exit $?); exit $?' 1 2 13 15 - -framework_failure=0 -mkdir -p $tmp || framework_failure=1 -cd $tmp || framework_failure=1 - -gen_supp=--gen-suppressions=all -# This option makes valgrind significantly slower. -full_leak_check=--leak-check=full -demangle=--demangle=yes - -vg_options=" - --suppressions=$abs_srcdir/.vg-supp - --num-callers=25 - --track-fds=yes - $demangle - $full_leak_check - $gen_supp - $vg_log - " -# configure tests for the existence of valgrind. -# If it's not available, then make $vg and vg_check no-ops. -if test x$VALGRIND = x; then - vg= -else - vg="libtool --mode=execute $VALGRIND `echo $vg_options` --" - # Suppress dlclose or valgrind traces wont have test library symbols. - vg="env LD_PRELOAD=$pwd/.libs/libdlclose_noop.so $vg" -fi - - -vg_leak_check() -{ - local file=$1 - local fail - # If we detect a leak, dump all output to stderr. - grep -E '^==[0-9]+== +definitely lost: [^0]' $file \ - && { fail=1; cat $file 1>&2; - echo "found memory leaks (see log file, $file); see above" 1>&2; } - test "$fail" = '' -} - - -# Ensure 1) that there is an ERROR SUMMARY line, and -# 2) that the number of errors is 0. -# An offending line looks like this: -# ==29302== ERROR SUMMARY: 4 errors from 2 contexts (suppressed: 16 from 5) -vg_error_check() -{ - local file=$1 - local fail - # If we detect a leak, dump all output to stderr. - grep -E '^==[0-9]+== ERROR SUMMARY:' $file > /dev/null \ - || { fail=1; cat $file 1>&2; - echo "no valgrind ERROR SUMMARY line in $file" 1>&2; } - if test "$fail" = ''; then - grep -E '^==[0-9]+== ERROR SUMMARY: [^0] ' $file \ - && { fail=1; cat $file 1>&2; - echo "valgrind reported errors in $file; see above" 1>&2; } - fi - test "$fail" = '' -} - -vg_check() -{ - local file=$1 - if test x$VALGRIND != x; then - vg_error_check $file && vg_leak_check $file - fi -} diff --git a/qpid/cpp/tests/start_broker b/qpid/cpp/tests/start_broker deleted file mode 100755 index fe30458463..0000000000 --- a/qpid/cpp/tests/start_broker +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/sh -set -e - -LOG=`pwd`/qpidd.log -PID=`pwd`/qpidd.pid - -rm -rf $LOG $PID - -# Start the daemon, recording its PID. -../src/qpidd > $LOG 2>&1 & echo $! > $PID - -# FIXME aconway 2007-01-18: qpidd should not return till it is accepting -# connections, remove arbitrary sleep. -sleep 5 diff --git a/qpid/cpp/tests/topic_listener.cpp b/qpid/cpp/tests/topic_listener.cpp deleted file mode 100644 index 5928dac49a..0000000000 --- a/qpid/cpp/tests/topic_listener.cpp +++ /dev/null @@ -1,217 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * This file provides one half of a test and example of a pub-sub - * style of interaction. See topic_publisher.cpp for the other half, - * in which the logic for publishing is defined. - * - * This file contains the listener logic. A listener will subscribe to - * a logical 'topic'. It will count the number of messages it receives - * and the time elapsed between the first one and the last one. It - * recognises two types of 'special' message that tell it to (a) send - * a report containing this information, (b) shutdown (i.e. stop - * listening). - */ - -#include <QpidError.h> -#include <ClientChannel.h> -#include <Connection.h> -#include <ClientExchange.h> -#include <MessageListener.h> -#include <ClientQueue.h> -#include <sys/Time.h> -#include <iostream> -#include <sstream> - -using namespace qpid::client; -using namespace qpid::sys; -using namespace std; - -/** - * A message listener implementation in which the runtime logic is - * defined. - */ -class Listener : public MessageListener{ - Channel* const channel; - const string responseQueue; - const bool transactional; - bool init; - int count; - Time start; - - void shutdown(); - void report(); -public: - Listener(Channel* channel, const string& reponseQueue, bool tx); - virtual void received(Message& msg); -}; - -/** - * A utility class for managing the options passed in. - */ -class Args{ - string host; - int port; - AckMode ackMode; - bool transactional; - int prefetch; - bool trace; - bool help; -public: - inline Args() : host("localhost"), port(5672), ackMode(NO_ACK), transactional(false), prefetch(1000), trace(false), help(false){} - void parse(int argc, char** argv); - void usage(); - - const string& getHost() const { return host;} - int getPort() const { return port; } - AckMode getAckMode(){ return ackMode; } - bool getTransactional() const { return transactional; } - int getPrefetch(){ return prefetch; } - bool getTrace() const { return trace; } - bool getHelp() const { return help; } -}; - -/** - * The main routine creates a Listener instance and sets it up to - * consume from a private queue bound to the exchange with the - * appropriate topic name. - */ -int main(int argc, char** argv){ - Args args; - args.parse(argc, argv); - if(args.getHelp()){ - args.usage(); - }else{ - try{ - cout << "topic_listener: Started." << endl; - Connection connection(args.getTrace()); - connection.open(args.getHost(), args.getPort(), "guest", "guest", "/test"); - Channel channel(args.getTransactional(), args.getPrefetch()); - connection.openChannel(channel); - - //declare exchange, queue and bind them: - Queue response("response"); - channel.declareQueue(response); - - Queue control; - channel.declareQueue(control); - qpid::framing::FieldTable bindArgs; - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "topic_control", bindArgs); - //set up listener - Listener listener(&channel, response.getName(), args.getTransactional()); - string tag; - channel.consume(control, tag, &listener, args.getAckMode()); - cout << "topic_listener: Consuming." << endl; - channel.run(); - connection.close(); - cout << "topic_listener: normal exit" << endl; - return 0; - }catch(qpid::QpidError error){ - cout << "topic_listener: " << error.what() << endl; - } - } - return 1; -} - -Listener::Listener(Channel* _channel, const string& _responseq, bool tx) : - channel(_channel), responseQueue(_responseq), transactional(tx), init(false), count(0){} - -void Listener::received(Message& message){ - if(!init){ - start = now(); - count = 0; - init = true; - } - string type(message.getHeaders().getString("TYPE")); - - if(type == "TERMINATION_REQUEST"){ - shutdown(); - }else if(type == "REPORT_REQUEST"){ - //send a report: - report(); - init = false; - }else if (++count % 100 == 0){ - cout <<"Received " << count << " messages." << endl; - } -} - -void Listener::shutdown(){ - channel->close(); -} - -void Listener::report(){ - Time finish = now(); - Time time = finish - start; - stringstream reportstr; - reportstr << "Received " << count << " messages in " - << time/TIME_MSEC << " ms."; - Message msg(reportstr.str()); - msg.getHeaders().setString("TYPE", "REPORT"); - channel->publish(msg, string(), responseQueue); - if(transactional){ - channel->commit(); - } -} - - -void Args::parse(int argc, char** argv){ - for(int i = 1; i < argc; i++){ - string name(argv[i]); - if("-help" == name){ - help = true; - break; - }else if("-host" == name){ - host = argv[++i]; - }else if("-port" == name){ - port = atoi(argv[++i]); - }else if("-ack_mode" == name){ - ackMode = AckMode(atoi(argv[++i])); - }else if("-transactional" == name){ - transactional = true; - }else if("-prefetch" == name){ - prefetch = atoi(argv[++i]); - }else if("-trace" == name){ - trace = true; - }else{ - cout << "Warning: unrecognised option " << name << endl; - } - } -} - -void Args::usage(){ - cout << "Options:" << endl; - cout << " -help" << endl; - cout << " Prints this usage message" << endl; - cout << " -host <host>" << endl; - cout << " Specifies host to connect to (default is localhost)" << endl; - cout << " -port <port>" << endl; - cout << " Specifies port to conect to (default is 5762)" << endl; - cout << " -ack_mode <mode>" << endl; - cout << " Sets the acknowledgement mode" << endl; - cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << endl; - cout << " -transactional" << endl; - cout << " Indicates the client should use transactions" << endl; - cout << " -prefetch <count>" << endl; - cout << " Specifies the prefetch count (default is 1000)" << endl; - cout << " -trace" << endl; - cout << " Indicates that the frames sent and received should be logged" << endl; -} diff --git a/qpid/cpp/tests/topic_publisher.cpp b/qpid/cpp/tests/topic_publisher.cpp deleted file mode 100644 index 2fd1e6b810..0000000000 --- a/qpid/cpp/tests/topic_publisher.cpp +++ /dev/null @@ -1,287 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * This file provides one half of a test and example of a pub-sub - * style of interaction. See topic_listener.cpp for the other half, in - * which the logic for subscribers is defined. - * - * This file contains the publisher logic. The publisher will send a - * number of messages to the exchange with the appropriate routing key - * for the logical 'topic'. Once it has done this it will then send a - * request that each subscriber report back with the number of message - * it has received and the time that elapsed between receiving the - * first one and receiving the report request. Once the expected - * number of reports are received, it sends out a request that each - * subscriber shutdown. - */ - -#include <QpidError.h> -#include <ClientChannel.h> -#include <Connection.h> -#include <ClientExchange.h> -#include <MessageListener.h> -#include <ClientQueue.h> -#include <sys/Monitor.h> -#include "unistd.h" -#include <sys/Time.h> -#include <cstdlib> -#include <iostream> - -using namespace qpid::client; -using namespace qpid::sys; -using std::string; - -/** - * The publishing logic is defined in this class. It implements - * message listener and can therfore be used to receive messages sent - * back by the subscribers. - */ -class Publisher : public MessageListener{ - Channel* const channel; - const std::string controlTopic; - const bool transactional; - Monitor monitor; - int count; - - void waitForCompletion(int msgs); - string generateData(int size); - -public: - Publisher(Channel* channel, const std::string& controlTopic, bool tx); - virtual void received(Message& msg); - int64_t publish(int msgs, int listeners, int size); - void terminate(); -}; - -/** - * A utility class for managing the options passed in to the test - */ -class Args{ - string host; - int port; - int messages; - int subscribers; - AckMode ackMode; - bool transactional; - int prefetch; - int batches; - int delay; - int size; - bool trace; - bool help; -public: - inline Args() : host("localhost"), port(5672), messages(1000), subscribers(1), - ackMode(NO_ACK), transactional(false), prefetch(1000), batches(1), - delay(0), size(256), trace(false), help(false){} - - void parse(int argc, char** argv); - void usage(); - - const string& getHost() const { return host;} - int getPort() const { return port; } - int getMessages() const { return messages; } - int getSubscribers() const { return subscribers; } - AckMode getAckMode(){ return ackMode; } - bool getTransactional() const { return transactional; } - int getPrefetch(){ return prefetch; } - int getBatches(){ return batches; } - int getDelay(){ return delay; } - int getSize(){ return size; } - bool getTrace() const { return trace; } - bool getHelp() const { return help; } -}; - -int main(int argc, char** argv) { - Args args; - args.parse(argc, argv); - if(args.getHelp()){ - args.usage(); - } else { - try{ - Connection connection(args.getTrace()); - connection.open(args.getHost(), args.getPort(), "guest", "guest", "/test"); - Channel channel(args.getTransactional(), args.getPrefetch()); - connection.openChannel(channel); - - //declare queue (relying on default binding): - Queue response("response"); - channel.declareQueue(response); - - //set up listener - Publisher publisher(&channel, "topic_control", args.getTransactional()); - std::string tag("mytag"); - channel.consume(response, tag, &publisher, args.getAckMode()); - channel.start(); - - int batchSize(args.getBatches()); - int64_t max(0); - int64_t min(0); - int64_t sum(0); - for(int i = 0; i < batchSize; i++){ - if(i > 0 && args.getDelay()) sleep(args.getDelay()); - int64_t msecs = - publisher.publish(args.getMessages(), - args.getSubscribers(), - args.getSize()) / TIME_MSEC; - if(!max || msecs > max) max = msecs; - if(!min || msecs < min) min = msecs; - sum += msecs; - std::cout << "Completed " << (i+1) << " of " << batchSize - << " in " << msecs << "ms" << std::endl; - } - publisher.terminate(); - int64_t avg = sum / batchSize; - if(batchSize > 1){ - std::cout << batchSize << " batches completed. avg=" << avg << - ", max=" << max << ", min=" << min << std::endl; - } - channel.close(); - connection.close(); - return 0; - }catch(qpid::QpidError error) { - std::cout << error.what() << std::endl; - } - } - return 1; -} - -Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) : - channel(_channel), controlTopic(_controlTopic), transactional(tx){} - -void Publisher::received(Message& ){ - //count responses and when all are received end the current batch - Monitor::ScopedLock l(monitor); - if(--count == 0){ - monitor.notify(); - } -} - -void Publisher::waitForCompletion(int msgs){ - count = msgs; - monitor.wait(); -} - -int64_t Publisher::publish(int msgs, int listeners, int size){ - Message msg; - msg.setData(generateData(size)); - Time start = now(); - { - Monitor::ScopedLock l(monitor); - for(int i = 0; i < msgs; i++){ - channel->publish( - msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); - } - //send report request - Message reportRequest; - reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); - channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); - if(transactional){ - channel->commit(); - } - - waitForCompletion(listeners); - } - - Time finish = now(); - return finish - start; -} - -string Publisher::generateData(int size){ - string data; - for(int i = 0; i < size; i++){ - data += ('A' + (i / 26)); - } - return data; -} - -void Publisher::terminate(){ - //send termination request - Message terminationRequest; - terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); - channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); - if(transactional){ - channel->commit(); - } -} - -void Args::parse(int argc, char** argv){ - for(int i = 1; i < argc; i++){ - string name(argv[i]); - if("-help" == name){ - help = true; - break; - }else if("-host" == name){ - host = argv[++i]; - }else if("-port" == name){ - port = atoi(argv[++i]); - }else if("-messages" == name){ - messages = atoi(argv[++i]); - }else if("-subscribers" == name){ - subscribers = atoi(argv[++i]); - }else if("-ack_mode" == name){ - ackMode = AckMode(atoi(argv[++i])); - }else if("-transactional" == name){ - transactional = true; - }else if("-prefetch" == name){ - prefetch = atoi(argv[++i]); - }else if("-batches" == name){ - batches = atoi(argv[++i]); - }else if("-delay" == name){ - delay = atoi(argv[++i]); - }else if("-size" == name){ - size = atoi(argv[++i]); - }else if("-trace" == name){ - trace = true; - }else{ - std::cout << "Warning: unrecognised option " << name << std::endl; - } - } -} - -void Args::usage(){ - std::cout << "Options:" << std::endl; - std::cout << " -help" << std::endl; - std::cout << " Prints this usage message" << std::endl; - std::cout << " -host <host>" << std::endl; - std::cout << " Specifies host to connect to (default is localhost)" << std::endl; - std::cout << " -port <port>" << std::endl; - std::cout << " Specifies port to conect to (default is 5762)" << std::endl; - std::cout << " -messages <count>" << std::endl; - std::cout << " Specifies how many messages to send" << std::endl; - std::cout << " -subscribers <count>" << std::endl; - std::cout << " Specifies how many subscribers to expect reports from" << std::endl; - std::cout << " -ack_mode <mode>" << std::endl; - std::cout << " Sets the acknowledgement mode" << std::endl; - std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl; - std::cout << " -transactional" << std::endl; - std::cout << " Indicates the client should use transactions" << std::endl; - std::cout << " -prefetch <count>" << std::endl; - std::cout << " Specifies the prefetch count (default is 1000)" << std::endl; - std::cout << " -batches <count>" << std::endl; - std::cout << " Specifies how many batches to run" << std::endl; - std::cout << " -delay <seconds>" << std::endl; - std::cout << " Causes a delay between each batch" << std::endl; - std::cout << " -size <bytes>" << std::endl; - std::cout << " Sets the size of the published messages (default is 256 bytes)" << std::endl; - std::cout << " -trace" << std::endl; - std::cout << " Indicates that the frames sent and received should be logged" << std::endl; -} diff --git a/qpid/cpp/tests/topictest b/qpid/cpp/tests/topictest deleted file mode 100755 index 92e40b2c37..0000000000 --- a/qpid/cpp/tests/topictest +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash -# Run the C++ topic test - -# Clean up old log files -rm -f subscriber_*.log - -# Defaults values -SUBSCRIBERS=10 -MESSAGES=2000 -BATCHES=10 - -while getopts "s:m:b:" opt ; do - case $opt in - s) SUBSCRIBERS=$OPTARG ;; - m) MESSAGES=$OPTARG ;; - b) BATCHES=$OPTARG ;; - ?) - echo "Usage: %0 [-s <subscribers>] [-m <messages.] [-b <batches>]" - exit 1 - ;; - esac -done - -subscribe() { - echo Start subscriber $1 - LOG="subscriber_$1.log" - ./topic_listener > $LOG 2>&1 && rm -f $LOG -} - -publish() { - ./topic_publisher -messages $MESSAGES -batches $BATCHES -subscribers $SUBSCRIBERS -} - -for ((i=$SUBSCRIBERS ; i--; )); do - subscribe $i & -done -# FIXME aconway 2007-03-27: Hack around startup race. Fix topic test. -sleep 1 -publish 2>&1 || exit 1 |
