From 014f0f39d9cfb6242bea173eadbc0f8229ba5f7f Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 1 Aug 2013 20:27:26 +0000 Subject: QPID-4327: HA TX transactions: basic replication. On primary a PrimaryTxObserver observes a transaction's TxBuffer and generates transaction events on a tx-replication-queue. On the backup a TxReplicator receives the events and constructs a TxBuffer equivalent to the one in the primary. Unfinished: - Primary does not wait for backups to prepare() before committing. - All connected backups are assumed to be in the transaction, there are race conditions around brokers joining/leavinv where this assumption is invalid. - Need more tests. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1509423 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/test_store.cpp | 101 +++++++++++++++++++++++++++++++++++--- 1 file changed, 93 insertions(+), 8 deletions(-) (limited to 'qpid/cpp/src/tests/test_store.cpp') diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp index eac4deda2d..fc44889f33 100644 --- a/qpid/cpp/src/tests/test_store.cpp +++ b/qpid/cpp/src/tests/test_store.cpp @@ -40,14 +40,19 @@ #include "qpid/sys/Thread.h" #include "qpid/Plugin.h" #include "qpid/Options.h" +#include "qpid/RefCounted.h" +#include "qpid/Msg.h" #include #include #include +#include #include +#include -using namespace qpid; -using namespace broker; using namespace std; +using namespace boost; +using namespace qpid; +using namespace qpid::broker; using namespace qpid::sys; namespace qpid { @@ -57,11 +62,13 @@ struct TestStoreOptions : public Options { string name; string dump; + string events; TestStoreOptions() : Options("Test Store Options") { addOptions() ("test-store-name", optValue(name, "NAME"), "Name of test store instance.") ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.") + ("test-store-events", optValue(events, "FILE"), "File to log events, 1 line per event.") ; } }; @@ -82,24 +89,74 @@ class TestStore : public NullMessageStore { TestStore(const TestStoreOptions& opts, Broker& broker_) : options(opts), name(opts.name), broker(broker_) { - QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump); - if (!options.dump.empty()) + QPID_LOG(info, "TestStore name=" << name + << " dump=" << options.dump + << " events=" << options.events); + + if (!options.dump.empty()) dump.reset(new ofstream(options.dump.c_str())); + if (!options.events.empty()) + events.reset(new ofstream(options.events.c_str())); } ~TestStore() { for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1)); } - virtual bool isNull() const { return false; } - - void enqueue(TransactionContext* , + // Dummy transaction context. + struct TxContext : public TPCTransactionContext { + static int nextId; + string id; + TxContext() : id(lexical_cast(nextId++)) {} + TxContext(string xid) : id(xid) {} + }; + + static string getId(const TransactionContext& tx) { + const TxContext* tc = dynamic_cast(&tx); + assert(tc); + return tc->id; + } + + + bool isNull() const { return false; } + + void log(const string& msg) { + QPID_LOG(info, "test_store: " << msg); + if (events.get()) *events << msg << endl << std::flush; + } + + auto_ptr begin() { + auto_ptr tx(new TxContext()); + log(Msg() << "id << ">"); + return auto_ptr(tx); + } + + auto_ptr begin(const std::string& xid) { + auto_ptr tx(new TxContext(xid)); + log(Msg() << "id << ">"); + return auto_ptr(tx); + } + + string getContent(const intrusive_ptr& msg) { + intrusive_ptr enc( + dynamic_pointer_cast(msg)); + return enc->getContent(); + } + + void enqueue(TransactionContext* tx, const boost::intrusive_ptr& pmsg, - const PersistableQueue& ) + const PersistableQueue& queue) { + QPID_LOG(debug, "TestStore enqueue " << queue.getName()); qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast(pmsg.get()); assert(msg); + ostringstream o; + o << ""; + log(o.str()); + // Dump the message if there is a dump file. if (dump.get()) { msg->getFrames().getMethod()->print(*dump); @@ -144,6 +201,31 @@ class TestStore : public NullMessageStore { msg->enqueueComplete(); } + void dequeue(TransactionContext* tx, + const boost::intrusive_ptr& msg, + const PersistableQueue& queue) + { + QPID_LOG(debug, "TestStore dequeue " << queue.getName()); + ostringstream o; + o<< ""; + log(o.str()); + } + + void prepare(TPCTransactionContext& txn) { + log(Msg() << ""); + } + + void commit(TransactionContext& txn) { + log(Msg() << ""); + } + + void abort(TransactionContext& txn) { + log(Msg() << ""); + } + + private: static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC; TestStoreOptions options; @@ -151,8 +233,11 @@ class TestStore : public NullMessageStore { Broker& broker; vector threads; std::auto_ptr dump; + std::auto_ptr events; }; +int TestStore::TxContext::nextId(1); + const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: "; const string TestStore::EXCEPTION = "exception"; const string TestStore::EXIT_PROCESS = "exit_process"; -- cgit v1.2.1