From c70bf3ea28cdf6bafd8571690d3e5c466a0658a2 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 20 Sep 2013 18:59:30 +0000 Subject: QPID-4984: WIP - Merge from trunk r.1525056 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1525101 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/test_store.cpp | 121 ++++++++++++++++++++++++++++++++++---- 1 file changed, 109 insertions(+), 12 deletions(-) (limited to 'qpid/cpp/src/tests/test_store.cpp') diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp index eac4deda2d..e299161c68 100644 --- a/qpid/cpp/src/tests/test_store.cpp +++ b/qpid/cpp/src/tests/test_store.cpp @@ -40,14 +40,19 @@ #include "qpid/sys/Thread.h" #include "qpid/Plugin.h" #include "qpid/Options.h" +#include "qpid/RefCounted.h" +#include "qpid/Msg.h" #include #include #include +#include #include +#include -using namespace qpid; -using namespace broker; using namespace std; +using namespace boost; +using namespace qpid; +using namespace qpid::broker; using namespace qpid::sys; namespace qpid { @@ -57,11 +62,19 @@ struct TestStoreOptions : public Options { string name; string dump; + string events; + vector throwMsg; // Throw exception if message content matches. TestStoreOptions() : Options("Test Store Options") { addOptions() - ("test-store-name", optValue(name, "NAME"), "Name of test store instance.") - ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.") + ("test-store-name", optValue(name, "NAME"), + "Name of test store instance.") + ("test-store-dump", optValue(dump, "FILE"), + "File to dump enqueued messages.") + ("test-store-events", optValue(events, "FILE"), + "File to log events, 1 line per event.") + ("test-store-throw", optValue(throwMsg, "CONTENT"), + "Throw exception if message content matches.") ; } }; @@ -82,24 +95,76 @@ class TestStore : public NullMessageStore { TestStore(const TestStoreOptions& opts, Broker& broker_) : options(opts), name(opts.name), broker(broker_) { - QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump); - if (!options.dump.empty()) + QPID_LOG(info, "TestStore name=" << name + << " dump=" << options.dump + << " events=" << options.events + << " throw messages =" << options.throwMsg.size()); + + if (!options.dump.empty()) dump.reset(new ofstream(options.dump.c_str())); + if (!options.events.empty()) + events.reset(new ofstream(options.events.c_str())); } ~TestStore() { for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1)); } - virtual bool isNull() const { return false; } - - void enqueue(TransactionContext* , + // Dummy transaction context. + struct TxContext : public TPCTransactionContext { + static int nextId; + string id; + TxContext() : id(lexical_cast(nextId++)) {} + TxContext(string xid) : id(xid) {} + }; + + static string getId(const TransactionContext& tx) { + const TxContext* tc = dynamic_cast(&tx); + assert(tc); + return tc->id; + } + + + bool isNull() const { return false; } + + void log(const string& msg) { + QPID_LOG(info, "test_store: " << msg); + if (events.get()) *events << msg << endl << std::flush; + } + + auto_ptr begin() { + auto_ptr tx(new TxContext()); + log(Msg() << "id << ">"); + return auto_ptr(tx); + } + + auto_ptr begin(const std::string& xid) { + auto_ptr tx(new TxContext(xid)); + log(Msg() << "id << ">"); + return auto_ptr(tx); + } + + string getContent(const intrusive_ptr& msg) { + intrusive_ptr enc( + dynamic_pointer_cast(msg)); + return enc->getContent(); + } + + void enqueue(TransactionContext* tx, const boost::intrusive_ptr& pmsg, - const PersistableQueue& ) + const PersistableQueue& queue) { - qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast(pmsg.get()); + QPID_LOG(debug, "TestStore enqueue " << queue.getName()); + qpid::broker::amqp_0_10::MessageTransfer* msg = + dynamic_cast(pmsg.get()); assert(msg); + ostringstream o; + o << ""; + log(o.str()); + // Dump the message if there is a dump file. if (dump.get()) { msg->getFrames().getMethod()->print(*dump); @@ -113,7 +178,11 @@ class TestStore : public NullMessageStore { string data = msg->getFrames().getContent(); size_t i = string::npos; size_t j = string::npos; - if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0 + const vector& throwMsg(options.throwMsg); + if (find(throwMsg.begin(), throwMsg.end(), data) != throwMsg.end()) { + throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data)); + } + else if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0 && (i = data.find(name+"[")) != string::npos && (j = data.find("]", i)) != string::npos) { @@ -144,6 +213,31 @@ class TestStore : public NullMessageStore { msg->enqueueComplete(); } + void dequeue(TransactionContext* tx, + const boost::intrusive_ptr& msg, + const PersistableQueue& queue) + { + QPID_LOG(debug, "TestStore dequeue " << queue.getName()); + ostringstream o; + o<< ""; + log(o.str()); + } + + void prepare(TPCTransactionContext& txn) { + log(Msg() << ""); + } + + void commit(TransactionContext& txn) { + log(Msg() << ""); + } + + void abort(TransactionContext& txn) { + log(Msg() << ""); + } + + private: static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC; TestStoreOptions options; @@ -151,8 +245,11 @@ class TestStore : public NullMessageStore { Broker& broker; vector threads; std::auto_ptr dump; + std::auto_ptr events; }; +int TestStore::TxContext::nextId(1); + const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: "; const string TestStore::EXCEPTION = "exception"; const string TestStore::EXIT_PROCESS = "exit_process"; -- cgit v1.2.1