diff options
Diffstat (limited to 'qpid/cpp/src/tests/msg_group_test.cpp')
-rw-r--r-- | qpid/cpp/src/tests/msg_group_test.cpp | 641 |
1 files changed, 0 insertions, 641 deletions
diff --git a/qpid/cpp/src/tests/msg_group_test.cpp b/qpid/cpp/src/tests/msg_group_test.cpp deleted file mode 100644 index ca87197ff3..0000000000 --- a/qpid/cpp/src/tests/msg_group_test.cpp +++ /dev/null @@ -1,641 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/messaging/Address.h> -#include <qpid/messaging/Connection.h> -#include <qpid/messaging/Receiver.h> -#include <qpid/messaging/Sender.h> -#include <qpid/messaging/Session.h> -#include <qpid/messaging/Message.h> -#include <qpid/messaging/FailoverUpdates.h> -#include <qpid/Options.h> -#include <qpid/log/Logger.h> -#include <qpid/log/Options.h> -#include "qpid/log/Statement.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/SystemInfo.h" - -#include <iostream> -#include <memory> -#include <stdlib.h> - -using namespace qpid::messaging; -using namespace qpid::types; -using namespace std; - -namespace qpid { -namespace tests { - -struct Options : public qpid::Options -{ - bool help; - std::string url; - std::string address; - std::string connectionOptions; - uint messages; - uint capacity; - uint ackFrequency; - bool failoverUpdates; - qpid::log::Options log; - uint senders; - uint receivers; - uint groupSize; - bool printReport; - std::string groupKey; - bool durable; - bool allowDuplicates; - bool randomizeSize; - bool stickyConsumer; - uint timeout; - uint interleave; - std::string prefix; - uint sendRate; - - Options(const std::string& argv0=std::string()) - : qpid::Options("Options"), - help(false), - url("amqp:tcp:127.0.0.1"), - messages(10000), - capacity(1000), - ackFrequency(100), - failoverUpdates(false), - log(argv0), - senders(2), - receivers(2), - groupSize(10), - printReport(false), - groupKey("qpid.no_group"), - durable(false), - allowDuplicates(false), - randomizeSize(false), - stickyConsumer(false), - timeout(10), - interleave(1), - sendRate(0) - { - addOptions() - ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") - ("address,a", qpid::optValue(address, "ADDRESS"), "address to send and receive from") - ("allow-duplicates", qpid::optValue(allowDuplicates), "Ignore the delivery of duplicated messages") - ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") - ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") - ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") - ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") - ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") - ("group-key", qpid::optValue(groupKey, "KEY"), "Key of the message header containing the group identifier.") - ("group-prefix", qpid::optValue(prefix, "STRING"), "Add 'prefix' to the start of all generated group identifiers.") - ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group.") - ("interleave", qpid::optValue(interleave, "N"), "Simultaineously interleave messages from N different groups.") - ("messages,m", qpid::optValue(messages, "N"), "Number of messages to send per each sender.") - ("receivers,r", qpid::optValue(receivers, "N"), "Number of message consumers.") - ("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].") - ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.") - ("senders,s", qpid::optValue(senders, "N"), "Number of message producers.") - ("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].") - ("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers remain idle for timeout seconds.") - ("print-report", qpid::optValue(printReport), "Dump message group statistics to stdout.") - ("help", qpid::optValue(help), "print this usage statement"); - add(log); - //("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)") - //("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") - //("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") - } - - bool parse(int argc, char** argv) - { - try { - qpid::Options::parse(argc, argv); - if (address.empty()) throw qpid::Exception("Address must be specified!"); - if (senders == 0 && receivers == 0) throw qpid::Exception("No senders and No receivers?"); - if (messages == 0) throw qpid::Exception("The message count cannot be zero."); - qpid::log::Logger::instance().configure(log); - if (help) { - std::cout << *this << std::endl << std::endl - << "Verifies the behavior of grouped messages." << std::endl; - return false; - } else { - return true; - } - } catch (const std::exception& e) { - std::cerr << *this << std::endl << std::endl << e.what() << std::endl; - return false; - } - } -}; - -const string EOS("eos"); -const string SN("sn"); - - -// class that monitors group state across all publishers and consumers. tracks the next -// expected sequence for each group, and total messages consumed. -class GroupChecker -{ - qpid::sys::Mutex lock; - - uint consumerCt; - uint producerCt; - uint totalMsgs; - uint totalMsgsConsumed; - uint totalMsgsPublished; - bool allowDuplicates; - uint duplicateMsgs; - - typedef std::map<std::string, uint> SequenceMap; - SequenceMap sequenceMap; - - // Statistics - for each group, store the names of all clients that consumed messages - // from that group, and the number of messages consumed per client. - typedef std::map<std::string, uint> ClientCounter; - typedef std::map<std::string, ClientCounter> GroupStatistics; - GroupStatistics statistics; - -public: - - GroupChecker( uint messages, uint consumers, uint producers, bool d) : - consumerCt(consumers), producerCt(producers), - totalMsgs(0), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d), - duplicateMsgs(0) - { - // if consumering only - we a draining a queue of 'messages' queued messages. - if (producerCt != 0) { - totalMsgs = producers * messages; - } else { - totalMsgs = messages; - } - } - - bool checkSequence( const std::string& groupId, - uint sequence, const std::string& client ) - { - qpid::sys::Mutex::ScopedLock l(lock); - - QPID_LOG(debug, "Client " << client << " has received " << groupId << ":" << sequence); - - GroupStatistics::iterator gs = statistics.find(groupId); - if (gs == statistics.end()) { - statistics[groupId][client] = 1; - } else { - gs->second[client]++; - } - // now verify - SequenceMap::iterator s = sequenceMap.find(groupId); - if (s == sequenceMap.end()) { - QPID_LOG(debug, "Client " << client << " thinks this is the first message from group " << groupId << ":" << sequence); - // if duplication allowed, it is possible that the last msg(s) of an old sequence are redelivered on reconnect. - // in this case, set the sequence from the first msg. - sequenceMap[groupId] = (allowDuplicates) ? sequence : 0; - s = sequenceMap.find(groupId); - } else if (sequence < s->second) { - duplicateMsgs++; - QPID_LOG(debug, "Client " << client << " thinks this message is a duplicate! " << groupId << ":" << sequence); - return allowDuplicates; - } - totalMsgsConsumed++; - return sequence == s->second++; - } - - void sendingSequence( const std::string& groupId, - uint sequence, bool eos, - const std::string& client ) - { - qpid::sys::Mutex::ScopedLock l(lock); - ++totalMsgsPublished; - - QPID_LOG(debug, "Client " << client << " sending " << groupId << ":" << sequence << - ((eos) ? " (last)" : "")); - } - - bool eraseGroup( const std::string& groupId, const std::string& name ) - { - qpid::sys::Mutex::ScopedLock l(lock); - QPID_LOG(debug, "Deleting group " << groupId << " (by client " << name << ")"); - return sequenceMap.erase( groupId ) == 1; - } - - uint getNextExpectedSequence( const std::string& groupId ) - { - qpid::sys::Mutex::ScopedLock l(lock); - return sequenceMap[groupId]; - } - - bool allMsgsPublished() // true when done publishing msgs - { - qpid::sys::Mutex::ScopedLock l(lock); - return (producerCt == 0 || totalMsgsPublished >= totalMsgs); - } - - bool allMsgsConsumed() // true when done consuming msgs - { - qpid::sys::Mutex::ScopedLock l(lock); - return (consumerCt == 0 || - (totalMsgsConsumed >= totalMsgs && sequenceMap.size() == 0)); - } - - uint getTotalMessages() - { - return totalMsgs; - } - - uint getConsumedTotal() - { - qpid::sys::Mutex::ScopedLock l(lock); - return totalMsgsConsumed; - } - - uint getPublishedTotal() - { - qpid::sys::Mutex::ScopedLock l(lock); - return totalMsgsPublished; - } - - ostream& print(ostream& out) - { - qpid::sys::Mutex::ScopedLock l(lock); - out << "Total Published: " << totalMsgsPublished << ", Total Consumed: " << totalMsgsConsumed << - ", Duplicates detected: " << duplicateMsgs << std::endl; - out << "Total Groups: " << statistics.size() << std::endl; - unsigned long consumers = 0; - for (GroupStatistics::iterator gs = statistics.begin(); gs != statistics.end(); ++gs) { - out << " GroupId: " << gs->first; - consumers += gs->second.size(); // # of consumers that processed this group - if (gs->second.size() == 1) - out << " completely consumed by a single client." << std::endl; - else - out << " consumed by " << gs->second.size() << " different clients." << std::endl; - - for (ClientCounter::iterator cc = gs->second.begin(); cc != gs->second.end(); ++cc) { - out << " Client: " << cc->first << " consumed " << cc->second << " messages from the group." << std::endl; - } - } - out << "Average # of consumers per group: " << ((statistics.size() != 0) ? (double(consumers)/statistics.size()) : 0) << std::endl; - return out; - } -}; - - -namespace { - // rand() is not thread safe. Create a singleton obj to hold a lock while calling - // rand() so it can be called safely by multiple concurrent clients. - class Randomizer { - qpid::sys::Mutex lock; - public: - uint operator()(uint max) { - qpid::sys::Mutex::ScopedLock l(lock); - return (rand() % max) + 1; - } - }; - - static Randomizer randomizer; -} - - -// tag each generated message with a group identifer -// -class GroupGenerator { - - const std::string groupPrefix; - const uint groupSize; - const bool randomizeSize; - const uint interleave; - - uint groupSuffix; - uint total; - - struct GroupState { - std::string id; - const uint size; - uint count; - GroupState( const std::string& i, const uint s ) - : id(i), size(s), count(0) {} - }; - typedef std::list<GroupState> GroupList; - GroupList groups; - GroupList::iterator current; - - // add a new group identifier to the list - void newGroup() { - std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate); - groupId << std::string(":") << groupSuffix++; - uint size = (randomizeSize) ? randomizer(groupSize) : groupSize; - QPID_LOG(trace, "New group: GROUPID=[" << groupId.str() << "] size=" << size << " this=" << this); - GroupState group( groupId.str(), size ); - groups.push_back( group ); - } - -public: - GroupGenerator( const std::string& prefix, - const uint t, - const uint size, - const bool randomize, - const uint i) - : groupPrefix(prefix), groupSize(size), - randomizeSize(randomize), interleave(i), groupSuffix(0), total(t) - { - QPID_LOG(trace, "New group generator: PREFIX=[" << prefix << "] total=" << total << " size=" << size << " rand=" << randomize << " interleave=" << interleave << " this=" << this); - for (uint i = 0; i < 1 || i < interleave; ++i) { - newGroup(); - } - current = groups.begin(); - } - - bool genGroup(std::string& groupId, uint& seq, bool& eos) - { - if (!total) return false; - --total; - if (current == groups.end()) - current = groups.begin(); - groupId = current->id; - seq = current->count++; - if (current->count == current->size) { - QPID_LOG(trace, "Last msg for " << current->id << ", " << current->count << " this=" << this); - eos = true; - if (total >= interleave) { // need a new group to replace this one - newGroup(); - groups.erase(current++); - } else ++current; - } else { - ++current; - eos = total < interleave; // mark eos on the last message of each group - } - QPID_LOG(trace, "SENDING GROUPID=[" << groupId << "] seq=" << seq << " eos=" << eos << " this=" << this); - return true; - } -}; - - - -class Client : public qpid::sys::Runnable -{ -public: - typedef boost::shared_ptr<Client> shared_ptr; - enum State {ACTIVE, DONE, FAILURE}; - Client( const std::string& n, const Options& o ) : name(n), opts(o), state(ACTIVE), stopped(false) {} - virtual ~Client() {} - State getState() { return state; } - void testFailed( const std::string& reason ) { state = FAILURE; error << "Client '" << name << "' failed: " << reason; } - void clientDone() { if (state == ACTIVE) state = DONE; } - qpid::sys::Thread& getThread() { return thread; } - const std::string getErrorMsg() { return error.str(); } - void stop() {stopped = true;} - const std::string& getName() { return name; } - -protected: - const std::string name; - const Options& opts; - qpid::sys::Thread thread; - ostringstream error; - State state; - bool stopped; -}; - - -class Consumer : public Client -{ - GroupChecker& checker; - -public: - Consumer(const std::string& n, const Options& o, GroupChecker& c ) : Client(n, o), checker(c) {}; - virtual ~Consumer() {}; - - void run() - { - Connection connection; - try { - connection = Connection(opts.url, opts.connectionOptions); - connection.open(); - std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); - Session session = connection.createSession(); - Receiver receiver = session.createReceiver(opts.address); - receiver.setCapacity(opts.capacity); - Message msg; - uint count = 0; - - while (!stopped) { - if (receiver.fetch(msg, Duration::SECOND)) { // msg retrieved - qpid::types::Variant::Map& properties = msg.getProperties(); - std::string groupId = properties[opts.groupKey]; - uint groupSeq = properties[SN]; - bool eof = properties[EOS]; - - QPID_LOG(trace, "RECVING GROUPID=[" << groupId << "] seq=" << groupSeq << " eos=" << eof << " name=" << name); - - qpid::sys::usleep(10); - - if (!checker.checkSequence( groupId, groupSeq, name )) { - ostringstream msg; - msg << "Check sequence failed. Group=" << groupId << " rcvd seq=" << groupSeq << " expected=" << checker.getNextExpectedSequence( groupId ); - testFailed( msg.str() ); - break; - } else if (eof) { - if (!checker.eraseGroup( groupId, name )) { - ostringstream msg; - msg << "Erase group failed. Group=" << groupId << " rcvd seq=" << groupSeq; - testFailed( msg.str() ); - break; - } - } - - ++count; - if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { - session.acknowledge(); - } - // Clear out message properties & content for next iteration. - msg = Message(); // TODO aconway 2010-12-01: should be done by fetch - } else if (checker.allMsgsConsumed()) // timed out, nothing else to do? - break; - } - session.acknowledge(); - session.close(); - connection.close(); - } catch(const std::exception& error) { - ostringstream msg; - msg << "consumer error: " << error.what(); - testFailed( msg.str() ); - connection.close(); - } - clientDone(); - QPID_LOG(trace, "Consuming client " << name << " completed."); - } -}; - - - -class Producer : public Client -{ - GroupChecker& checker; - GroupGenerator generator; - -public: - Producer(const std::string& n, const Options& o, GroupChecker& c) - : Client(n, o), checker(c), - generator( n, o.messages, o.groupSize, o.randomizeSize, o.interleave ) - {}; - virtual ~Producer() {}; - - void run() - { - Connection connection; - try { - connection = Connection(opts.url, opts.connectionOptions); - connection.open(); - std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); - Session session = connection.createSession(); - Sender sender = session.createSender(opts.address); - if (opts.capacity) sender.setCapacity(opts.capacity); - Message msg; - msg.setDurable(opts.durable); - std::string groupId; - uint seq; - bool eos; - uint sent = 0; - - qpid::sys::AbsTime start = qpid::sys::now(); - int64_t interval = 0; - if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate; - - while (!stopped && generator.genGroup(groupId, seq, eos)) { - msg.getProperties()[opts.groupKey] = groupId; - msg.getProperties()[SN] = seq; - msg.getProperties()[EOS] = eos; - checker.sendingSequence( groupId, seq, eos, name ); - - sender.send(msg); - ++sent; - - if (opts.sendRate) { - qpid::sys::AbsTime waitTill(start, sent*interval); - int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); - if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); - } - } - session.sync(); - session.close(); - connection.close(); - } catch(const std::exception& error) { - ostringstream msg; - msg << "producer '" << name << "' error: " << error.what(); - testFailed(msg.str()); - connection.close(); - } - clientDone(); - QPID_LOG(trace, "Producing client " << name << " completed."); - } -}; - - -}} // namespace qpid::tests - -using namespace qpid::tests; - -int main(int argc, char ** argv) -{ - int status = 0; - try { - Options opts; - if (opts.parse(argc, argv)) { - - GroupChecker state( opts.messages, - opts.receivers, - opts.senders, - opts.allowDuplicates); - std::vector<Client::shared_ptr> clients; - - if (opts.randomizeSize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId()); - - // fire off the producers && consumers - for (size_t j = 0; j < opts.senders; ++j) { - ostringstream name; - name << opts.prefix << "P_" << j; - clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state ))); - clients.back()->getThread() = qpid::sys::Thread(*clients.back()); - } - for (size_t j = 0; j < opts.receivers; ++j) { - ostringstream name; - name << opts.prefix << "C_" << j; - clients.push_back(Client::shared_ptr(new Consumer( name.str(), opts, state ))); - clients.back()->getThread() = qpid::sys::Thread(*clients.back()); - } - - // wait for all pubs/subs to finish.... or for consumers to fail or stall. - uint stalledTime = 0; - bool clientFailed = false; - while (!clientFailed && (!state.allMsgsPublished() || !state.allMsgsConsumed())) { - uint lastCount; - - lastCount = state.getConsumedTotal(); - qpid::sys::usleep( 1000000 ); - - // check each client for failures - for (std::vector<Client::shared_ptr>::iterator i = clients.begin(); - i != clients.end(); ++i) { - QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState()); - if ((*i)->getState() == Client::FAILURE) { - QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg()); - clientFailed = true; - break; // exit test. - } - } - - // check for stalled consumers - if (!clientFailed && !state.allMsgsConsumed()) { - if (lastCount == state.getConsumedTotal()) { - if (++stalledTime >= opts.timeout) { - clientFailed = true; - break; // exit test - } - } else { - stalledTime = 0; - } - } - QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() << - " Published to date = " << state.getPublishedTotal() << - " total=" << state.getTotalMessages()); - } - - if (clientFailed) { - if (stalledTime >= opts.timeout) { - QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." ); - status = 2; - } else { - status = 1; - } - } - - // Wait for started threads. - for (std::vector<Client::shared_ptr>::iterator i = clients.begin(); - i != clients.end(); ++i) { - (*i)->stop(); - (*i)->getThread().join(); - } - - if (opts.printReport && !status) state.print(std::cout); - } else status = 4; - } catch(const std::exception& error) { - QPID_LOG(error, argv[0] << ": " << error.what()); - status = 3; - } - QPID_LOG(trace, "TEST DONE [" << status << "]"); - - return status; -} |