summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/test_store.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-09-20 18:59:30 +0000
committerKim van der Riet <kpvdr@apache.org>2013-09-20 18:59:30 +0000
commitc70bf3ea28cdf6bafd8571690d3e5c466a0658a2 (patch)
tree68b24940e433f3f9c278b054d9ea1622389bd332 /qpid/cpp/src/tests/test_store.cpp
parentfcdf1723c7b5cdf0772054a93edb6e7d97c4bb1e (diff)
downloadqpid-python-c70bf3ea28cdf6bafd8571690d3e5c466a0658a2.tar.gz
QPID-4984: WIP - Merge from trunk r.1525056
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1525101 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/test_store.cpp')
-rw-r--r--qpid/cpp/src/tests/test_store.cpp121
1 files changed, 109 insertions, 12 deletions
diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp
index eac4deda2d..e299161c68 100644
--- a/qpid/cpp/src/tests/test_store.cpp
+++ b/qpid/cpp/src/tests/test_store.cpp
@@ -40,14 +40,19 @@
#include "qpid/sys/Thread.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
+#include "qpid/RefCounted.h"
+#include "qpid/Msg.h"
#include <boost/cast.hpp>
#include <boost/lexical_cast.hpp>
#include <memory>
+#include <ostream>
#include <fstream>
+#include <sstream>
-using namespace qpid;
-using namespace broker;
using namespace std;
+using namespace boost;
+using namespace qpid;
+using namespace qpid::broker;
using namespace qpid::sys;
namespace qpid {
@@ -57,11 +62,19 @@ struct TestStoreOptions : public Options {
string name;
string dump;
+ string events;
+ vector<string> throwMsg; // Throw exception if message content matches.
TestStoreOptions() : Options("Test Store Options") {
addOptions()
- ("test-store-name", optValue(name, "NAME"), "Name of test store instance.")
- ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.")
+ ("test-store-name", optValue(name, "NAME"),
+ "Name of test store instance.")
+ ("test-store-dump", optValue(dump, "FILE"),
+ "File to dump enqueued messages.")
+ ("test-store-events", optValue(events, "FILE"),
+ "File to log events, 1 line per event.")
+ ("test-store-throw", optValue(throwMsg, "CONTENT"),
+ "Throw exception if message content matches.")
;
}
};
@@ -82,24 +95,76 @@ class TestStore : public NullMessageStore {
TestStore(const TestStoreOptions& opts, Broker& broker_)
: options(opts), name(opts.name), broker(broker_)
{
- QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump);
- if (!options.dump.empty())
+ QPID_LOG(info, "TestStore name=" << name
+ << " dump=" << options.dump
+ << " events=" << options.events
+ << " throw messages =" << options.throwMsg.size());
+
+ if (!options.dump.empty())
dump.reset(new ofstream(options.dump.c_str()));
+ if (!options.events.empty())
+ events.reset(new ofstream(options.events.c_str()));
}
~TestStore() {
for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
}
- virtual bool isNull() const { return false; }
-
- void enqueue(TransactionContext* ,
+ // Dummy transaction context.
+ struct TxContext : public TPCTransactionContext {
+ static int nextId;
+ string id;
+ TxContext() : id(lexical_cast<string>(nextId++)) {}
+ TxContext(string xid) : id(xid) {}
+ };
+
+ static string getId(const TransactionContext& tx) {
+ const TxContext* tc = dynamic_cast<const TxContext*>(&tx);
+ assert(tc);
+ return tc->id;
+ }
+
+
+ bool isNull() const { return false; }
+
+ void log(const string& msg) {
+ QPID_LOG(info, "test_store: " << msg);
+ if (events.get()) *events << msg << endl << std::flush;
+ }
+
+ auto_ptr<TransactionContext> begin() {
+ auto_ptr<TxContext> tx(new TxContext());
+ log(Msg() << "<begin tx " << tx->id << ">");
+ return auto_ptr<TransactionContext>(tx);
+ }
+
+ auto_ptr<TPCTransactionContext> begin(const std::string& xid) {
+ auto_ptr<TxContext> tx(new TxContext(xid));
+ log(Msg() << "<begin tx " << tx->id << ">");
+ return auto_ptr<TPCTransactionContext>(tx);
+ }
+
+ string getContent(const intrusive_ptr<PersistableMessage>& msg) {
+ intrusive_ptr<broker::Message::Encoding> enc(
+ dynamic_pointer_cast<broker::Message::Encoding>(msg));
+ return enc->getContent();
+ }
+
+ void enqueue(TransactionContext* tx,
const boost::intrusive_ptr<PersistableMessage>& pmsg,
- const PersistableQueue& )
+ const PersistableQueue& queue)
{
- qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
+ QPID_LOG(debug, "TestStore enqueue " << queue.getName());
+ qpid::broker::amqp_0_10::MessageTransfer* msg =
+ dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
assert(msg);
+ ostringstream o;
+ o << "<enqueue " << queue.getName() << " " << getContent(msg);
+ if (tx) o << " tx=" << getId(*tx);
+ o << ">";
+ log(o.str());
+
// Dump the message if there is a dump file.
if (dump.get()) {
msg->getFrames().getMethod()->print(*dump);
@@ -113,7 +178,11 @@ class TestStore : public NullMessageStore {
string data = msg->getFrames().getContent();
size_t i = string::npos;
size_t j = string::npos;
- if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
+ const vector<string>& throwMsg(options.throwMsg);
+ if (find(throwMsg.begin(), throwMsg.end(), data) != throwMsg.end()) {
+ throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
+ }
+ else if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) == 0
&& (i = data.find(name+"[")) != string::npos
&& (j = data.find("]", i)) != string::npos)
{
@@ -144,6 +213,31 @@ class TestStore : public NullMessageStore {
msg->enqueueComplete();
}
+ void dequeue(TransactionContext* tx,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
+ {
+ QPID_LOG(debug, "TestStore dequeue " << queue.getName());
+ ostringstream o;
+ o<< "<dequeue " << queue.getName() << " " << getContent(msg);
+ if (tx) o << " tx=" << getId(*tx);
+ o << ">";
+ log(o.str());
+ }
+
+ void prepare(TPCTransactionContext& txn) {
+ log(Msg() << "<prepare tx=" << getId(txn) << ">");
+ }
+
+ void commit(TransactionContext& txn) {
+ log(Msg() << "<commit tx=" << getId(txn) << ">");
+ }
+
+ void abort(TransactionContext& txn) {
+ log(Msg() << "<abort tx=" << getId(txn) << ">");
+ }
+
+
private:
static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
TestStoreOptions options;
@@ -151,8 +245,11 @@ class TestStore : public NullMessageStore {
Broker& broker;
vector<Thread> threads;
std::auto_ptr<ofstream> dump;
+ std::auto_ptr<ofstream> events;
};
+int TestStore::TxContext::nextId(1);
+
const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
const string TestStore::EXCEPTION = "exception";
const string TestStore::EXIT_PROCESS = "exit_process";