summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/msg_group_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/msg_group_test.cpp')
-rw-r--r--qpid/cpp/src/tests/msg_group_test.cpp641
1 files changed, 0 insertions, 641 deletions
diff --git a/qpid/cpp/src/tests/msg_group_test.cpp b/qpid/cpp/src/tests/msg_group_test.cpp
deleted file mode 100644
index ca87197ff3..0000000000
--- a/qpid/cpp/src/tests/msg_group_test.cpp
+++ /dev/null
@@ -1,641 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <qpid/messaging/Address.h>
-#include <qpid/messaging/Connection.h>
-#include <qpid/messaging/Receiver.h>
-#include <qpid/messaging/Sender.h>
-#include <qpid/messaging/Session.h>
-#include <qpid/messaging/Message.h>
-#include <qpid/messaging/FailoverUpdates.h>
-#include <qpid/Options.h>
-#include <qpid/log/Logger.h>
-#include <qpid/log/Options.h>
-#include "qpid/log/Statement.h"
-#include "qpid/sys/Time.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/SystemInfo.h"
-
-#include <iostream>
-#include <memory>
-#include <stdlib.h>
-
-using namespace qpid::messaging;
-using namespace qpid::types;
-using namespace std;
-
-namespace qpid {
-namespace tests {
-
-struct Options : public qpid::Options
-{
- bool help;
- std::string url;
- std::string address;
- std::string connectionOptions;
- uint messages;
- uint capacity;
- uint ackFrequency;
- bool failoverUpdates;
- qpid::log::Options log;
- uint senders;
- uint receivers;
- uint groupSize;
- bool printReport;
- std::string groupKey;
- bool durable;
- bool allowDuplicates;
- bool randomizeSize;
- bool stickyConsumer;
- uint timeout;
- uint interleave;
- std::string prefix;
- uint sendRate;
-
- Options(const std::string& argv0=std::string())
- : qpid::Options("Options"),
- help(false),
- url("amqp:tcp:127.0.0.1"),
- messages(10000),
- capacity(1000),
- ackFrequency(100),
- failoverUpdates(false),
- log(argv0),
- senders(2),
- receivers(2),
- groupSize(10),
- printReport(false),
- groupKey("qpid.no_group"),
- durable(false),
- allowDuplicates(false),
- randomizeSize(false),
- stickyConsumer(false),
- timeout(10),
- interleave(1),
- sendRate(0)
- {
- addOptions()
- ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
- ("address,a", qpid::optValue(address, "ADDRESS"), "address to send and receive from")
- ("allow-duplicates", qpid::optValue(allowDuplicates), "Ignore the delivery of duplicated messages")
- ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
- ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
- ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
- ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
- ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
- ("group-key", qpid::optValue(groupKey, "KEY"), "Key of the message header containing the group identifier.")
- ("group-prefix", qpid::optValue(prefix, "STRING"), "Add 'prefix' to the start of all generated group identifiers.")
- ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group.")
- ("interleave", qpid::optValue(interleave, "N"), "Simultaineously interleave messages from N different groups.")
- ("messages,m", qpid::optValue(messages, "N"), "Number of messages to send per each sender.")
- ("receivers,r", qpid::optValue(receivers, "N"), "Number of message consumers.")
- ("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].")
- ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.")
- ("senders,s", qpid::optValue(senders, "N"), "Number of message producers.")
- ("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].")
- ("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers remain idle for timeout seconds.")
- ("print-report", qpid::optValue(printReport), "Dump message group statistics to stdout.")
- ("help", qpid::optValue(help), "print this usage statement");
- add(log);
- //("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
- //("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
- //("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
- }
-
- bool parse(int argc, char** argv)
- {
- try {
- qpid::Options::parse(argc, argv);
- if (address.empty()) throw qpid::Exception("Address must be specified!");
- if (senders == 0 && receivers == 0) throw qpid::Exception("No senders and No receivers?");
- if (messages == 0) throw qpid::Exception("The message count cannot be zero.");
- qpid::log::Logger::instance().configure(log);
- if (help) {
- std::cout << *this << std::endl << std::endl
- << "Verifies the behavior of grouped messages." << std::endl;
- return false;
- } else {
- return true;
- }
- } catch (const std::exception& e) {
- std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
- return false;
- }
- }
-};
-
-const string EOS("eos");
-const string SN("sn");
-
-
-// class that monitors group state across all publishers and consumers. tracks the next
-// expected sequence for each group, and total messages consumed.
-class GroupChecker
-{
- qpid::sys::Mutex lock;
-
- uint consumerCt;
- uint producerCt;
- uint totalMsgs;
- uint totalMsgsConsumed;
- uint totalMsgsPublished;
- bool allowDuplicates;
- uint duplicateMsgs;
-
- typedef std::map<std::string, uint> SequenceMap;
- SequenceMap sequenceMap;
-
- // Statistics - for each group, store the names of all clients that consumed messages
- // from that group, and the number of messages consumed per client.
- typedef std::map<std::string, uint> ClientCounter;
- typedef std::map<std::string, ClientCounter> GroupStatistics;
- GroupStatistics statistics;
-
-public:
-
- GroupChecker( uint messages, uint consumers, uint producers, bool d) :
- consumerCt(consumers), producerCt(producers),
- totalMsgs(0), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d),
- duplicateMsgs(0)
- {
- // if consumering only - we a draining a queue of 'messages' queued messages.
- if (producerCt != 0) {
- totalMsgs = producers * messages;
- } else {
- totalMsgs = messages;
- }
- }
-
- bool checkSequence( const std::string& groupId,
- uint sequence, const std::string& client )
- {
- qpid::sys::Mutex::ScopedLock l(lock);
-
- QPID_LOG(debug, "Client " << client << " has received " << groupId << ":" << sequence);
-
- GroupStatistics::iterator gs = statistics.find(groupId);
- if (gs == statistics.end()) {
- statistics[groupId][client] = 1;
- } else {
- gs->second[client]++;
- }
- // now verify
- SequenceMap::iterator s = sequenceMap.find(groupId);
- if (s == sequenceMap.end()) {
- QPID_LOG(debug, "Client " << client << " thinks this is the first message from group " << groupId << ":" << sequence);
- // if duplication allowed, it is possible that the last msg(s) of an old sequence are redelivered on reconnect.
- // in this case, set the sequence from the first msg.
- sequenceMap[groupId] = (allowDuplicates) ? sequence : 0;
- s = sequenceMap.find(groupId);
- } else if (sequence < s->second) {
- duplicateMsgs++;
- QPID_LOG(debug, "Client " << client << " thinks this message is a duplicate! " << groupId << ":" << sequence);
- return allowDuplicates;
- }
- totalMsgsConsumed++;
- return sequence == s->second++;
- }
-
- void sendingSequence( const std::string& groupId,
- uint sequence, bool eos,
- const std::string& client )
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- ++totalMsgsPublished;
-
- QPID_LOG(debug, "Client " << client << " sending " << groupId << ":" << sequence <<
- ((eos) ? " (last)" : ""));
- }
-
- bool eraseGroup( const std::string& groupId, const std::string& name )
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, "Deleting group " << groupId << " (by client " << name << ")");
- return sequenceMap.erase( groupId ) == 1;
- }
-
- uint getNextExpectedSequence( const std::string& groupId )
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- return sequenceMap[groupId];
- }
-
- bool allMsgsPublished() // true when done publishing msgs
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- return (producerCt == 0 || totalMsgsPublished >= totalMsgs);
- }
-
- bool allMsgsConsumed() // true when done consuming msgs
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- return (consumerCt == 0 ||
- (totalMsgsConsumed >= totalMsgs && sequenceMap.size() == 0));
- }
-
- uint getTotalMessages()
- {
- return totalMsgs;
- }
-
- uint getConsumedTotal()
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- return totalMsgsConsumed;
- }
-
- uint getPublishedTotal()
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- return totalMsgsPublished;
- }
-
- ostream& print(ostream& out)
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- out << "Total Published: " << totalMsgsPublished << ", Total Consumed: " << totalMsgsConsumed <<
- ", Duplicates detected: " << duplicateMsgs << std::endl;
- out << "Total Groups: " << statistics.size() << std::endl;
- unsigned long consumers = 0;
- for (GroupStatistics::iterator gs = statistics.begin(); gs != statistics.end(); ++gs) {
- out << " GroupId: " << gs->first;
- consumers += gs->second.size(); // # of consumers that processed this group
- if (gs->second.size() == 1)
- out << " completely consumed by a single client." << std::endl;
- else
- out << " consumed by " << gs->second.size() << " different clients." << std::endl;
-
- for (ClientCounter::iterator cc = gs->second.begin(); cc != gs->second.end(); ++cc) {
- out << " Client: " << cc->first << " consumed " << cc->second << " messages from the group." << std::endl;
- }
- }
- out << "Average # of consumers per group: " << ((statistics.size() != 0) ? (double(consumers)/statistics.size()) : 0) << std::endl;
- return out;
- }
-};
-
-
-namespace {
- // rand() is not thread safe. Create a singleton obj to hold a lock while calling
- // rand() so it can be called safely by multiple concurrent clients.
- class Randomizer {
- qpid::sys::Mutex lock;
- public:
- uint operator()(uint max) {
- qpid::sys::Mutex::ScopedLock l(lock);
- return (rand() % max) + 1;
- }
- };
-
- static Randomizer randomizer;
-}
-
-
-// tag each generated message with a group identifer
-//
-class GroupGenerator {
-
- const std::string groupPrefix;
- const uint groupSize;
- const bool randomizeSize;
- const uint interleave;
-
- uint groupSuffix;
- uint total;
-
- struct GroupState {
- std::string id;
- const uint size;
- uint count;
- GroupState( const std::string& i, const uint s )
- : id(i), size(s), count(0) {}
- };
- typedef std::list<GroupState> GroupList;
- GroupList groups;
- GroupList::iterator current;
-
- // add a new group identifier to the list
- void newGroup() {
- std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate);
- groupId << std::string(":") << groupSuffix++;
- uint size = (randomizeSize) ? randomizer(groupSize) : groupSize;
- QPID_LOG(trace, "New group: GROUPID=[" << groupId.str() << "] size=" << size << " this=" << this);
- GroupState group( groupId.str(), size );
- groups.push_back( group );
- }
-
-public:
- GroupGenerator( const std::string& prefix,
- const uint t,
- const uint size,
- const bool randomize,
- const uint i)
- : groupPrefix(prefix), groupSize(size),
- randomizeSize(randomize), interleave(i), groupSuffix(0), total(t)
- {
- QPID_LOG(trace, "New group generator: PREFIX=[" << prefix << "] total=" << total << " size=" << size << " rand=" << randomize << " interleave=" << interleave << " this=" << this);
- for (uint i = 0; i < 1 || i < interleave; ++i) {
- newGroup();
- }
- current = groups.begin();
- }
-
- bool genGroup(std::string& groupId, uint& seq, bool& eos)
- {
- if (!total) return false;
- --total;
- if (current == groups.end())
- current = groups.begin();
- groupId = current->id;
- seq = current->count++;
- if (current->count == current->size) {
- QPID_LOG(trace, "Last msg for " << current->id << ", " << current->count << " this=" << this);
- eos = true;
- if (total >= interleave) { // need a new group to replace this one
- newGroup();
- groups.erase(current++);
- } else ++current;
- } else {
- ++current;
- eos = total < interleave; // mark eos on the last message of each group
- }
- QPID_LOG(trace, "SENDING GROUPID=[" << groupId << "] seq=" << seq << " eos=" << eos << " this=" << this);
- return true;
- }
-};
-
-
-
-class Client : public qpid::sys::Runnable
-{
-public:
- typedef boost::shared_ptr<Client> shared_ptr;
- enum State {ACTIVE, DONE, FAILURE};
- Client( const std::string& n, const Options& o ) : name(n), opts(o), state(ACTIVE), stopped(false) {}
- virtual ~Client() {}
- State getState() { return state; }
- void testFailed( const std::string& reason ) { state = FAILURE; error << "Client '" << name << "' failed: " << reason; }
- void clientDone() { if (state == ACTIVE) state = DONE; }
- qpid::sys::Thread& getThread() { return thread; }
- const std::string getErrorMsg() { return error.str(); }
- void stop() {stopped = true;}
- const std::string& getName() { return name; }
-
-protected:
- const std::string name;
- const Options& opts;
- qpid::sys::Thread thread;
- ostringstream error;
- State state;
- bool stopped;
-};
-
-
-class Consumer : public Client
-{
- GroupChecker& checker;
-
-public:
- Consumer(const std::string& n, const Options& o, GroupChecker& c ) : Client(n, o), checker(c) {};
- virtual ~Consumer() {};
-
- void run()
- {
- Connection connection;
- try {
- connection = Connection(opts.url, opts.connectionOptions);
- connection.open();
- std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
- Session session = connection.createSession();
- Receiver receiver = session.createReceiver(opts.address);
- receiver.setCapacity(opts.capacity);
- Message msg;
- uint count = 0;
-
- while (!stopped) {
- if (receiver.fetch(msg, Duration::SECOND)) { // msg retrieved
- qpid::types::Variant::Map& properties = msg.getProperties();
- std::string groupId = properties[opts.groupKey];
- uint groupSeq = properties[SN];
- bool eof = properties[EOS];
-
- QPID_LOG(trace, "RECVING GROUPID=[" << groupId << "] seq=" << groupSeq << " eos=" << eof << " name=" << name);
-
- qpid::sys::usleep(10);
-
- if (!checker.checkSequence( groupId, groupSeq, name )) {
- ostringstream msg;
- msg << "Check sequence failed. Group=" << groupId << " rcvd seq=" << groupSeq << " expected=" << checker.getNextExpectedSequence( groupId );
- testFailed( msg.str() );
- break;
- } else if (eof) {
- if (!checker.eraseGroup( groupId, name )) {
- ostringstream msg;
- msg << "Erase group failed. Group=" << groupId << " rcvd seq=" << groupSeq;
- testFailed( msg.str() );
- break;
- }
- }
-
- ++count;
- if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
- session.acknowledge();
- }
- // Clear out message properties & content for next iteration.
- msg = Message(); // TODO aconway 2010-12-01: should be done by fetch
- } else if (checker.allMsgsConsumed()) // timed out, nothing else to do?
- break;
- }
- session.acknowledge();
- session.close();
- connection.close();
- } catch(const std::exception& error) {
- ostringstream msg;
- msg << "consumer error: " << error.what();
- testFailed( msg.str() );
- connection.close();
- }
- clientDone();
- QPID_LOG(trace, "Consuming client " << name << " completed.");
- }
-};
-
-
-
-class Producer : public Client
-{
- GroupChecker& checker;
- GroupGenerator generator;
-
-public:
- Producer(const std::string& n, const Options& o, GroupChecker& c)
- : Client(n, o), checker(c),
- generator( n, o.messages, o.groupSize, o.randomizeSize, o.interleave )
- {};
- virtual ~Producer() {};
-
- void run()
- {
- Connection connection;
- try {
- connection = Connection(opts.url, opts.connectionOptions);
- connection.open();
- std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
- Session session = connection.createSession();
- Sender sender = session.createSender(opts.address);
- if (opts.capacity) sender.setCapacity(opts.capacity);
- Message msg;
- msg.setDurable(opts.durable);
- std::string groupId;
- uint seq;
- bool eos;
- uint sent = 0;
-
- qpid::sys::AbsTime start = qpid::sys::now();
- int64_t interval = 0;
- if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
-
- while (!stopped && generator.genGroup(groupId, seq, eos)) {
- msg.getProperties()[opts.groupKey] = groupId;
- msg.getProperties()[SN] = seq;
- msg.getProperties()[EOS] = eos;
- checker.sendingSequence( groupId, seq, eos, name );
-
- sender.send(msg);
- ++sent;
-
- if (opts.sendRate) {
- qpid::sys::AbsTime waitTill(start, sent*interval);
- int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
- if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
- }
- }
- session.sync();
- session.close();
- connection.close();
- } catch(const std::exception& error) {
- ostringstream msg;
- msg << "producer '" << name << "' error: " << error.what();
- testFailed(msg.str());
- connection.close();
- }
- clientDone();
- QPID_LOG(trace, "Producing client " << name << " completed.");
- }
-};
-
-
-}} // namespace qpid::tests
-
-using namespace qpid::tests;
-
-int main(int argc, char ** argv)
-{
- int status = 0;
- try {
- Options opts;
- if (opts.parse(argc, argv)) {
-
- GroupChecker state( opts.messages,
- opts.receivers,
- opts.senders,
- opts.allowDuplicates);
- std::vector<Client::shared_ptr> clients;
-
- if (opts.randomizeSize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId());
-
- // fire off the producers && consumers
- for (size_t j = 0; j < opts.senders; ++j) {
- ostringstream name;
- name << opts.prefix << "P_" << j;
- clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state )));
- clients.back()->getThread() = qpid::sys::Thread(*clients.back());
- }
- for (size_t j = 0; j < opts.receivers; ++j) {
- ostringstream name;
- name << opts.prefix << "C_" << j;
- clients.push_back(Client::shared_ptr(new Consumer( name.str(), opts, state )));
- clients.back()->getThread() = qpid::sys::Thread(*clients.back());
- }
-
- // wait for all pubs/subs to finish.... or for consumers to fail or stall.
- uint stalledTime = 0;
- bool clientFailed = false;
- while (!clientFailed && (!state.allMsgsPublished() || !state.allMsgsConsumed())) {
- uint lastCount;
-
- lastCount = state.getConsumedTotal();
- qpid::sys::usleep( 1000000 );
-
- // check each client for failures
- for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
- i != clients.end(); ++i) {
- QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState());
- if ((*i)->getState() == Client::FAILURE) {
- QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg());
- clientFailed = true;
- break; // exit test.
- }
- }
-
- // check for stalled consumers
- if (!clientFailed && !state.allMsgsConsumed()) {
- if (lastCount == state.getConsumedTotal()) {
- if (++stalledTime >= opts.timeout) {
- clientFailed = true;
- break; // exit test
- }
- } else {
- stalledTime = 0;
- }
- }
- QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() <<
- " Published to date = " << state.getPublishedTotal() <<
- " total=" << state.getTotalMessages());
- }
-
- if (clientFailed) {
- if (stalledTime >= opts.timeout) {
- QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." );
- status = 2;
- } else {
- status = 1;
- }
- }
-
- // Wait for started threads.
- for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
- i != clients.end(); ++i) {
- (*i)->stop();
- (*i)->getThread().join();
- }
-
- if (opts.printReport && !status) state.print(std::cout);
- } else status = 4;
- } catch(const std::exception& error) {
- QPID_LOG(error, argv[0] << ": " << error.what());
- status = 3;
- }
- QPID_LOG(trace, "TEST DONE [" << status << "]");
-
- return status;
-}