summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/legacystore/SimpleTest.cpp
diff options
context:
space:
mode:
authorCharles E. Rolke <chug@apache.org>2013-01-07 21:24:48 +0000
committerCharles E. Rolke <chug@apache.org>2013-01-07 21:24:48 +0000
commitb1ea5d9b887f7360cdc18ed8f87ccbf4e1299f29 (patch)
treea6fb2f3a38d0af42520b73b01ada8c78653f05b1 /qpid/cpp/src/tests/legacystore/SimpleTest.cpp
parent985928c7cb19799e3e1da7e0dcd18823a18b698c (diff)
downloadqpid-python-b1ea5d9b887f7360cdc18ed8f87ccbf4e1299f29.tar.gz
QPID-1726 ASF licensed Qpid Store
Add unit tests. There are several issues with these tests: 1. Originally the four .cpp unit test sources were compiled into a single unit_test executable. In the current framework those four sources create conflicting brokers that overwrite each other's store and fail to open port 5672. In this checkin there are four unit test executables. Running each serially gets them all to pass. A new strategy is needed to start brokers that don't conflict. 2. The legacystore.so is not integrated with the rest of the tests. Some tests may run with the externally compiled msgstore.so and some use the built in test_store. Plugging legacystore.so into these other tests is TBD. 3. cpp/src/tests/legacystore defines more tests beyond simple unit tests. None of the issues related to wider system tests are addressed yet. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1430018 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/legacystore/SimpleTest.cpp')
-rw-r--r--qpid/cpp/src/tests/legacystore/SimpleTest.cpp497
1 files changed, 497 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/legacystore/SimpleTest.cpp b/qpid/cpp/src/tests/legacystore/SimpleTest.cpp
new file mode 100644
index 0000000000..a49333d876
--- /dev/null
+++ b/qpid/cpp/src/tests/legacystore/SimpleTest.cpp
@@ -0,0 +1,497 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "unit_test.h"
+
+#include "qpid/legacystore/MessageStoreImpl.h"
+#include <iostream>
+#include "tests/legacystore/MessageUtils.h"
+#include "qpid/legacystore/StoreException.h"
+#include "qpid/broker/DirectExchange.h"
+#include <qpid/broker/Queue.h>
+#include <qpid/broker/QueueSettings.h>
+#include <qpid/broker/RecoveryManagerImpl.h>
+#include <qpid/framing/AMQHeaderBody.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
+
+#define SET_LOG_LEVEL(level) \
+ qpid::log::Options opts(""); \
+ opts.selectors.clear(); \
+ opts.selectors.push_back(level); \
+ qpid::log::Logger::instance().configure(opts);
+
+
+using boost::intrusive_ptr;
+using boost::static_pointer_cast;
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace mrg::msgstore;
+using namespace std;
+
+QPID_AUTO_TEST_SUITE(SimpleTest)
+
+const string test_filename("SimpleTest");
+const char* tdp = getenv("TMP_DATA_DIR");
+const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/SimpleTest");
+
+// === Helper fns ===
+
+struct DummyHandler : OutputHandler
+{
+ std::vector<AMQFrame> frames;
+
+ virtual void send(AMQFrame& frame){
+ frames.push_back(frame);
+ }
+};
+
+void recover(MessageStoreImpl& store, QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links)
+{
+ sys::Timer t;
+ DtxManager mgr(t);
+ mgr.setStore (&store);
+ RecoveryManagerImpl recovery(queues, exchanges, links, mgr, br.getProtocolRegistry());
+ store.recover(recovery);
+}
+
+void recover(MessageStoreImpl& store, ExchangeRegistry& exchanges)
+{
+ QueueRegistry queues;
+ LinkRegistry links;
+ recover(store, queues, exchanges, links);
+}
+
+void recover(MessageStoreImpl& store, QueueRegistry& queues)
+{
+ ExchangeRegistry exchanges;
+ LinkRegistry links;
+ recover(store, queues, exchanges, links);
+}
+
+void bindAndUnbind(const string& exchangeName, const string& queueName,
+ const string& key, const FieldTable& args)
+{
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
+ Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
+ store.create(*exchange, qpid::framing::FieldTable());
+ store.create(*queue, qpid::framing::FieldTable());
+ BOOST_REQUIRE(exchange->bind(queue, key, &args));
+ store.bind(*exchange, *queue, key, args);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links;
+
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ Queue::shared_ptr queue = queues.find(queueName);
+ // check exchange args are still set
+ for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) {
+ BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData());
+ }
+ //check it is bound by unbinding
+ BOOST_REQUIRE(exchange->unbind(queue, key, &args));
+ store.unbind(*exchange, *queue, key, args);
+ }
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links;
+
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ Queue::shared_ptr queue = queues.find(queueName);
+ // check exchange args are still set
+ for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) {
+ BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData());
+ }
+ //make sure it is no longer bound
+ BOOST_REQUIRE(!exchange->unbind(queue, key, &args));
+ }
+}
+
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(CreateDelete)
+{
+ SET_LOG_LEVEL("error+"); // This only needs to be set once.
+
+ cout << test_filename << ".CreateDelete: " << flush;
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ string name("CreateDeleteQueue");
+ Queue queue(name, 0, &store, 0);
+ store.create(queue, qpid::framing::FieldTable());
+// TODO - check dir exists
+ BOOST_REQUIRE(queue.getPersistenceId());
+ store.destroy(queue);
+// TODO - check dir is deleted
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(EmptyRecover)
+{
+ cout << test_filename << ".EmptyRecover: " << flush;
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ //nothing to assert, just testing it doesn't blow up
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(QueueCreate)
+{
+ cout << test_filename << ".QueueCreate: " << flush;
+
+ uint64_t id(0);
+ string name("MyDurableQueue");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue queue(name, 0, &store, 0);
+ store.create(queue, qpid::framing::FieldTable());
+ BOOST_REQUIRE(queue.getPersistenceId());
+ id = queue.getPersistenceId();
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ Queue::shared_ptr queue = registry.find(name);
+ BOOST_REQUIRE(queue.get());
+ BOOST_CHECK_EQUAL(id, queue->getPersistenceId());
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(QueueCreateWithSettings)
+{
+ cout << test_filename << ".QueueCreateWithSettings: " << flush;
+
+ FieldTable arguments;
+ arguments.setInt("qpid.max_count", 202);
+ arguments.setInt("qpid.max_size", 1003);
+ QueueSettings settings;
+ settings.populate(arguments, settings.storeSettings);
+ string name("MyDurableQueue");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue queue(name, settings, &store, 0);
+ queue.create();
+ BOOST_REQUIRE(queue.getPersistenceId());
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ Queue::shared_ptr queue = registry.find(name);
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), 202);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), 1003);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), queue->getSettings().maxDepth.getCount());
+ BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), queue->getSettings().maxDepth.getSize());
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(QueueDestroy)
+{
+ cout << test_filename << ".QueueDestroy: " << flush;
+
+ string name("MyDurableQueue");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue queue(name, 0, &store, 0);
+ store.create(queue, qpid::framing::FieldTable());
+ store.destroy(queue);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ BOOST_REQUIRE(!registry.find(name));
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(Enqueue)
+{
+ cout << test_filename << ".Enqueue: " << flush;
+
+ //TODO: this is largely copy & paste'd from MessageTest in
+ //qpid tree. ideally need some helper routines for reducing
+ //this to a simpler less duplicated form
+
+ string name("MyDurableQueue");
+ string exchange("MyExchange");
+ string routingKey("MyRoutingKey");
+ Uuid messageId(true);
+ string data1("abcdefg");
+ string data2("hijklmn");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
+ queue->create();
+
+ Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 14);
+ MessageUtils::addContent(msg, data1);
+ MessageUtils::addContent(msg, data2);
+
+ msg.addAnnotation("abc", "xyz");
+
+ queue->deliver(msg);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ Queue::shared_ptr queue = registry.find(name);
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL((u_int32_t) 1, queue->getMessageCount());
+ Message msg = MessageUtils::get(*queue);
+
+ BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
+ BOOST_CHECK_EQUAL(messageId, MessageUtils::getMessageId(msg));
+ BOOST_CHECK_EQUAL(std::string("xyz"), msg.getAnnotation("abc"));
+ BOOST_CHECK_EQUAL((u_int64_t) 14, msg.getContentSize());
+
+ DummyHandler handler;
+ MessageUtils::deliver(msg, handler, 100);
+ BOOST_CHECK_EQUAL((size_t) 2, handler.frames.size());
+ AMQContentBody* contentBody(dynamic_cast<AMQContentBody*>(handler.frames[1].getBody()));
+ BOOST_REQUIRE(contentBody);
+ BOOST_CHECK_EQUAL(data1.size() + data2.size(), contentBody->getData().size());
+ BOOST_CHECK_EQUAL(data1 + data2, contentBody->getData());
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(Dequeue)
+{
+ cout << test_filename << ".Dequeue: " << flush;
+
+ //TODO: reduce the duplication in these tests
+ string name("MyDurableQueue");
+ {
+ string exchange("MyExchange");
+ string routingKey("MyRoutingKey");
+ Uuid messageId(true);
+ string data("abcdefg");
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
+ queue->create();
+
+ Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 7);
+ MessageUtils::addContent(msg, data);
+
+ queue->deliver(msg);
+
+ QueueCursor cursor;
+ MessageUtils::get(*queue, &cursor);
+ queue->dequeue(0, cursor);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ QueueRegistry registry;
+ registry.setStore (&store);
+ recover(store, registry);
+ Queue::shared_ptr queue = registry.find(name);
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL((u_int32_t) 0, queue->getMessageCount());
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
+{
+ cout << test_filename << ".ExchangeCreateAndDestroy: " << flush;
+
+ uint64_t id(0);
+ string name("MyDurableExchange");
+ string type("direct");
+ FieldTable args;
+ args.setString("a", "A");
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ ExchangeRegistry registry;
+ Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
+ store.create(*exchange, qpid::framing::FieldTable());
+ id = exchange->getPersistenceId();
+ BOOST_REQUIRE(id);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry registry;
+
+ recover(store, registry);
+
+ Exchange::shared_ptr exchange = registry.get(name);
+ BOOST_CHECK_EQUAL(id, exchange->getPersistenceId());
+ BOOST_CHECK_EQUAL(type, exchange->getType());
+ BOOST_REQUIRE(exchange->isDurable());
+ BOOST_CHECK_EQUAL(*args.get("a"), *exchange->getArgs().get("a"));
+ store.destroy(*exchange);
+ }
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry registry;
+
+ recover(store, registry);
+
+ try {
+ Exchange::shared_ptr exchange = registry.get(name);
+ BOOST_FAIL("Expected exchange not to be found");
+ } catch (const SessionException& e) {
+ BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code);
+ }
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbind)
+{
+ cout << test_filename << ".ExchangeBindAndUnbind: " << flush;
+
+ bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", FieldTable());
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgs)
+{
+ cout << test_filename << ".ExchangeBindAndUnbindWithArgs: " << flush;
+
+ FieldTable args;
+ args.setString("a", "A");
+ args.setString("b", "B");
+ bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", args);
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
+{
+ cout << test_filename << ".ExchangeImplicitUnbind: " << flush;
+
+ string exchangeName("MyDurableExchange");
+ string queueName1("MyDurableQueue1");
+ string queueName2("MyDurableQueue2");
+ string key("my-routing-key");
+ FieldTable args;
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1, true); // truncate store
+ Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
+ Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
+ Queue::shared_ptr queue2(new Queue(queueName2, 0, &store, 0));
+ store.create(*exchange, qpid::framing::FieldTable());
+ store.create(*queue1, qpid::framing::FieldTable());
+ store.create(*queue2, qpid::framing::FieldTable());
+ store.bind(*exchange, *queue1, key, args);
+ store.bind(*exchange, *queue2, key, args);
+ //delete queue1:
+ store.destroy(*queue1);
+ }//db will be closed
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links;
+
+ //ensure recovery works ok:
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ BOOST_REQUIRE(!queues.find(queueName1).get());
+ BOOST_REQUIRE(queues.find(queueName2).get());
+
+ //delete exchange:
+ store.destroy(*exchange);
+ }
+ {
+ MessageStoreImpl store(&br);
+ store.init(test_dir, 4, 1);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links;
+
+ //ensure recovery works ok:
+ recover(store, queues, exchanges, links);
+
+ try {
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ BOOST_FAIL("Expected exchange not to be found");
+ } catch (const SessionException& e) {
+ BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code);
+ }
+ Queue::shared_ptr queue = queues.find(queueName2);
+ store.destroy(*queue);
+ }
+
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_SUITE_END()