diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-09-20 18:59:30 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-09-20 18:59:30 +0000 |
| commit | c70bf3ea28cdf6bafd8571690d3e5c466a0658a2 (patch) | |
| tree | 68b24940e433f3f9c278b054d9ea1622389bd332 /qpid/cpp/src/tests/test_store.cpp | |
| parent | fcdf1723c7b5cdf0772054a93edb6e7d97c4bb1e (diff) | |
| download | qpid-python-c70bf3ea28cdf6bafd8571690d3e5c466a0658a2.tar.gz | |
QPID-4984: WIP - Merge from trunk r.1525056
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1525101 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/test_store.cpp')
| -rw-r--r-- | qpid/cpp/src/tests/test_store.cpp | 121 |
1 files changed, 109 insertions, 12 deletions
diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp index eac4deda2d..e299161c68 100644 --- a/qpid/cpp/src/tests/test_store.cpp +++ b/qpid/cpp/src/tests/test_store.cpp @@ -40,14 +40,19 @@ #include "qpid/sys/Thread.h" #include "qpid/Plugin.h" #include "qpid/Options.h" +#include "qpid/RefCounted.h" +#include "qpid/Msg.h" #include <boost/cast.hpp> #include <boost/lexical_cast.hpp> #include <memory> +#include <ostream> #include <fstream> +#include <sstream> -using namespace qpid; -using namespace broker; using namespace std; +using namespace boost; +using namespace qpid; +using namespace qpid::broker; using namespace qpid::sys; namespace qpid { @@ -57,11 +62,19 @@ struct TestStoreOptions : public Options { string name; string dump; + string events; + vector<string> throwMsg; // Throw exception if message content matches. TestStoreOptions() : Options("Test Store Options") { addOptions() - ("test-store-name", optValue(name, "NAME"), "Name of test store instance.") - ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.") + ("test-store-name", optValue(name, "NAME"), + "Name of test store instance.") + ("test-store-dump", optValue(dump, "FILE"), + "File to dump enqueued messages.") + ("test-store-events", optValue(events, "FILE"), + "File to log events, 1 line per event.") + ("test-store-throw", optValue(throwMsg, "CONTENT"), + "Throw exception if message content matches.") ; } }; @@ -82,24 +95,76 @@ class TestStore : public NullMessageStore { TestStore(const TestStoreOptions& opts, Broker& broker_) : options(opts), name(opts.name), broker(broker_) { - QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump); - if (!options.dump.empty()) + QPID_LOG(info, "TestStore name=" << name + << " dump=" << options.dump + << " events=" << options.events + << " throw messages =" << options.throwMsg.size()); + + if (!options.dump.empty()) dump.reset(new ofstream(options.dump.c_str())); + if (!options.events.empty()) + events.reset(new ofstream(options.events.c_str())); } ~TestStore() { for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1)); } - virtual bool isNull() const { return false; } - - void enqueue(TransactionContext* , + // Dummy transaction context. + struct TxContext : public TPCTransactionContext { + static int nextId; + string id; + TxContext() : id(lexical_cast<string>(nextId++)) {} + TxContext(string xid) : id(xid) {} + }; + + static string getId(const TransactionContext& tx) { + const TxContext* tc = dynamic_cast<const TxContext*>(&tx); + assert(tc); + return tc->id; + } + + + bool isNull() const { return false; } + + void log(const string& msg) { + QPID_LOG(info, "test_store: " << msg); + if (events.get()) *events << msg << endl << std::flush; + } + + auto_ptr<TransactionContext> begin() { + auto_ptr<TxContext> tx(new TxContext()); + log(Msg() << "<begin tx " << tx->id << ">"); + return auto_ptr<TransactionContext>(tx); + } + + auto_ptr<TPCTransactionContext> begin(const std::string& xid) { + auto_ptr<TxContext> tx(new TxContext(xid)); + log(Msg() << "<begin tx " << tx->id << ">"); + return auto_ptr<TPCTransactionContext>(tx); + } + + string getContent(const intrusive_ptr<PersistableMessage>& msg) { + intrusive_ptr<broker::Message::Encoding> enc( + dynamic_pointer_cast<broker::Message::Encoding>(msg)); + return enc->getContent(); + } + + void enqueue(TransactionContext* tx, const boost::intrusive_ptr<PersistableMessage>& pmsg, - const PersistableQueue& ) + const PersistableQueue& queue) { - qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get()); + QPID_LOG(debug, "TestStore enqueue " << queue.getName()); + qpid::broker::amqp_0_10::MessageTransfer* msg = + dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get()); assert(msg); + ostringstream o; + o << "<enqueue " << queue.getName() << " " << getContent(msg); + if (tx) o << " tx=" << getId(*tx); + o << ">"; + log(o.str()); + // Dump the message if there is a dump file. if (dump.get()) { msg->getFrames().getMethod()->print(*dump); @@ -113,7 +178,11 @@ class TestStore : public NullMessageStore { string data = msg->getFrames().getContent(); size_t i = string::npos; size_t j = string::npos; - if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0 + const vector<string>& throwMsg(options.throwMsg); + if (find(throwMsg.begin(), throwMsg.end(), data) != throwMsg.end()) { + throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data)); + } + else if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0 && (i = data.find(name+"[")) != string::npos && (j = data.find("]", i)) != string::npos) { @@ -144,6 +213,31 @@ class TestStore : public NullMessageStore { msg->enqueueComplete(); } + void dequeue(TransactionContext* tx, + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) + { + QPID_LOG(debug, "TestStore dequeue " << queue.getName()); + ostringstream o; + o<< "<dequeue " << queue.getName() << " " << getContent(msg); + if (tx) o << " tx=" << getId(*tx); + o << ">"; + log(o.str()); + } + + void prepare(TPCTransactionContext& txn) { + log(Msg() << "<prepare tx=" << getId(txn) << ">"); + } + + void commit(TransactionContext& txn) { + log(Msg() << "<commit tx=" << getId(txn) << ">"); + } + + void abort(TransactionContext& txn) { + log(Msg() << "<abort tx=" << getId(txn) << ">"); + } + + private: static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC; TestStoreOptions options; @@ -151,8 +245,11 @@ class TestStore : public NullMessageStore { Broker& broker; vector<Thread> threads; std::auto_ptr<ofstream> dump; + std::auto_ptr<ofstream> events; }; +int TestStore::TxContext::nextId(1); + const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: "; const string TestStore::EXCEPTION = "exception"; const string TestStore::EXIT_PROCESS = "exit_process"; |
