From c2efe8deabde635f07e78d438fc529ca67d34133 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 24 Jul 2007 19:39:27 +0000 Subject: * Summary: - Wiring (declare/delete/bind) is replicated via AIS. - TestOptions includes all logging options. - Logger automatically parses env vars so logging can be enabled for any program linked with libqpidcommon e.g. by setting QPID_TRACE=1. * src/qpid/cluster/SessionManager.cpp: Handle frames from cluster - Forward to BrokerAdapter for execution. - Suppress responses in proxy. * src/tests/TestOptions.h (Options): Logging options, --help option. * src/qpid/client/ClientConnection.cpp: Removed log initialization. Logs are initialized either in TestOptions or automatically from env vars, e.g. QPID_TRACE, * src/qpid/QpidError.h (class QpidError): Initialize Exception in constructor so messages can be logged. * src/qpid/framing/ChannelAdapter.h: Made send() virtual. * src/tests/Cluster_child.cpp: UUID corrected. * src/qpid/broker/Broker.cpp: Pass chains to updater by ref. * src/qpid/Options.cpp (parse): Fix log settings from environment. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@559171 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/Options.cpp | 33 ++++++---- qpid/cpp/src/qpid/QpidError.cpp | 5 +- qpid/cpp/src/qpid/QpidError.h | 9 ++- qpid/cpp/src/qpid/broker/Broker.cpp | 2 +- qpid/cpp/src/qpid/client/ClientConnection.cpp | 3 - qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp | 3 + qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp | 5 +- qpid/cpp/src/qpid/cluster/SessionManager.cpp | 81 ++++++++++++++++--------- qpid/cpp/src/qpid/cluster/SessionManager.h | 17 ++++-- qpid/cpp/src/qpid/framing/ChannelAdapter.h | 4 +- qpid/cpp/src/tests/Cluster.h | 4 +- qpid/cpp/src/tests/Cluster_child.cpp | 5 +- qpid/cpp/src/tests/TestOptions.h | 26 +++++++- qpid/cpp/src/tests/cluster_client.cpp | 19 +++--- qpid/cpp/src/tests/start_cluster | 8 +-- qpid/cpp/src/tests/stop_cluster | 4 +- 16 files changed, 149 insertions(+), 79 deletions(-) (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/qpid/Options.cpp b/qpid/cpp/src/qpid/Options.cpp index 7f4536ff7b..2b6cff44f6 100644 --- a/qpid/cpp/src/qpid/Options.cpp +++ b/qpid/cpp/src/qpid/Options.cpp @@ -18,6 +18,9 @@ #include "Options.h" #include "qpid/Exception.h" + +#include + #include #include #include @@ -28,18 +31,26 @@ using namespace std; namespace { -char env2optchar(char env) { return (env=='_') ? '-' : tolower(env); } +struct EnvOptMapper { + static bool matchChar(char env, char opt) { + return (env==toupper(opt)) || (strchr("-.", opt) && env=='_'); + } -struct Mapper { - Mapper(const Options& o) : opts(o) {} - string operator()(const string& env) { + static bool matchStr(const string& env, boost::shared_ptr desc) { + return std::equal(env.begin(), env.end(), desc->long_name().begin(), &matchChar); + } + + EnvOptMapper(const Options& o) : opts(o) {} + + string operator()(const string& envVar) { static const std::string prefix("QPID_"); - if (env.substr(0, prefix.size()) == prefix) { - string opt = env.substr(prefix.size()); - transform(opt.begin(), opt.end(), opt.begin(), env2optchar); - // Ignore env vars that don't match to known options. - if (opts.find_nothrow(opt, false)) - return opt; + if (envVar.substr(0, prefix.size()) == prefix) { + string env = envVar.substr(prefix.size()); + typedef const std::vector< boost::shared_ptr > OptDescs; + OptDescs::const_iterator i = + find_if(opts.options().begin(), opts.options().end(), boost::bind(matchStr, env, _1)); + if (i != opts.options().end()) + return (*i)->long_name(); } return string(); } @@ -62,7 +73,7 @@ void Options::parse(int argc, char** argv, const std::string& configFile) if (argc > 0 && argv != 0) po::store(po::parse_command_line(argc, argv, *this), vm); parsing="environment variables"; - po::store(po::parse_environment(*this, Mapper(*this)), vm); + po::store(po::parse_environment(*this, EnvOptMapper(*this)), vm); po::notify(vm); // configFile may be updated from arg/env options. if (!configFile.empty()) { parsing="configuration file "+configFile; diff --git a/qpid/cpp/src/qpid/QpidError.cpp b/qpid/cpp/src/qpid/QpidError.cpp index fcd5af47a5..740ec24e54 100644 --- a/qpid/cpp/src/qpid/QpidError.cpp +++ b/qpid/cpp/src/qpid/QpidError.cpp @@ -34,9 +34,8 @@ Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_pt void QpidError::throwSelf() const { throw *this; } -void QpidError::init() { - whatStr = boost::str(boost::format("Error [%d] %s (%s:%d)") - % code % msg % loc.file % loc.line); +std::string QpidError::message(int code, const std::string& msg, const char* file, int line) { + return (boost::format("Error [%d] %s (%s:%d)") % code % msg % file % line).str(); } diff --git a/qpid/cpp/src/qpid/QpidError.h b/qpid/cpp/src/qpid/QpidError.h index dea0902a0e..2ff6571365 100644 --- a/qpid/cpp/src/qpid/QpidError.h +++ b/qpid/cpp/src/qpid/QpidError.h @@ -48,16 +48,15 @@ class QpidError : public Exception template QpidError(int code_, const T& msg_, const SrcLine& loc_) throw() - : code(code_), loc(loc_), msg(boost::lexical_cast(msg_)) - { init(); } + : Exception(message(code_, boost::lexical_cast(msg_), loc_.file.c_str(), loc_.line)), + code(code_), loc(loc_), msg(boost::lexical_cast(msg_)) {} ~QpidError() throw(); Exception::auto_ptr clone() const throw(); void throwSelf() const; - private: - - void init(); + /** Format message for exception. */ + static std::string message(int code, const std::string& msg, const char* file, int line); }; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 86342b3c43..26ec55ac44 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -164,7 +164,7 @@ void Broker::add(const shared_ptr& updater) { void Broker::update(FrameHandler::Chains& chains) { for_each(handlerUpdaters.begin(), handlerUpdaters.end(), - boost::bind(&HandlerUpdater::update, _1, chains)); + boost::bind(&HandlerUpdater::update, _1, boost::ref(chains))); } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/client/ClientConnection.cpp b/qpid/cpp/src/qpid/client/ClientConnection.cpp index 102de555fd..4b8f32a26f 100644 --- a/qpid/cpp/src/qpid/client/ClientConnection.cpp +++ b/qpid/cpp/src/qpid/client/ClientConnection.cpp @@ -51,9 +51,6 @@ Connection::Connection( isOpen(false), debug(_debug) { setConnector(defaultConnector); - qpid::log::Options o; - o.trace = debug; - qpid::log::Logger::instance().configure(o, "qpid-c++-client"); } Connection::~Connection(){} diff --git a/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp b/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp index 0d0465c89e..1cce126800 100644 --- a/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp @@ -61,6 +61,9 @@ void ClassifierHandler::handle(AMQFrame& frame) { Chain chosen; shared_ptr method = dynamic_pointer_cast(frame.getBody()); + // FIXME aconway 2007-07-05: Need to stop bypassed frames + // from overtaking mcast frames. + // if (method) chosen=map[fullId(method)]; if (chosen) diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index 10b1c44f40..b00152cbcd 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -50,9 +50,10 @@ struct ClusterPlugin : public Plugin { // Only provide to a Broker, and only if the --cluster config is set. if (broker && !options.clusterName.empty()) { assert(!cluster); // A process can only belong to one cluster. - sessions.reset(new SessionManager()); + + sessions.reset(new SessionManager(*broker)); cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions)); - sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10: + sessions->setClusterSend(cluster); broker->add(sessions); } } diff --git a/qpid/cpp/src/qpid/cluster/SessionManager.cpp b/qpid/cpp/src/qpid/cluster/SessionManager.cpp index 24f201535d..9f6438cf92 100644 --- a/qpid/cpp/src/qpid/cluster/SessionManager.cpp +++ b/qpid/cpp/src/qpid/cluster/SessionManager.cpp @@ -16,17 +16,59 @@ * */ +#include "SessionManager.h" +#include "ClassifierHandler.h" + #include "qpid/log/Statement.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQFrame.h" -#include "SessionManager.h" -#include "ClassifierHandler.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/broker/BrokerAdapter.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/BrokerChannel.h" +#include "qpid/framing/ChannelAdapter.h" namespace qpid { namespace cluster { using namespace framing; using namespace sys; +using namespace broker; + +/** Handler to send frames direct to local broker (bypass correlation etc.) */ +struct BrokerHandler : public FrameHandler, private ChannelAdapter { + Connection connection; + Channel channel; + BrokerAdapter adapter; + + // TODO aconway 2007-07-23: Lots of needless flab here (Channel, + // Connection, ChannelAdapter) As these classes are untangled the + // flab can be reduced. The real requirements are: + // - Dispatch methods direct to broker bypassing all the correlation muck + // - Efficiently suppress responses + // For the latter we are now using a ChannelAdapter with noop send() + // A more efficient solution would be a no-op proxy. + // + BrokerHandler(Broker& broker) : + connection(0, broker), + channel(connection, 1, 0), + adapter(channel, connection, broker, *this) {} + + void handle(AMQFrame& frame) { + AMQMethodBody* body=dynamic_cast(frame.body.get()); + assert(body); + body->invoke(adapter, MethodContext()); // TODO aconway 2007-07-24: Remove MethodContext + } + + // Dummy methods. + virtual void handleHeader(boost::shared_ptr){} + virtual void handleContent(boost::shared_ptr){} + virtual void handleHeartbeat(boost::shared_ptr){} + virtual bool isOpen() const{ return true; } + virtual void handleMethodInContext(shared_ptr, const MethodContext&){} + // No-op send. + virtual RequestId send(shared_ptr, Correlator::Action) { return 0; } +}; /** Wrap plain AMQFrames in SessionFrames */ struct FrameWrapperHandler : public FrameHandler { @@ -47,17 +89,15 @@ struct FrameWrapperHandler : public FrameHandler { SessionFrameHandler::Chain next; }; -SessionManager::SessionManager() {} +SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {} -void SessionManager::update(FrameHandler::Chains& chains) -{ +void SessionManager::update(FrameHandler::Chains& chains) { Mutex::ScopedLock l(lock); // Create a new local session, store local chains. Uuid uuid(true); sessions[uuid] = chains; - // Replace local incoming chain. Build from the back. - // + // Replace local in chain. Build from the back. // TODO aconway 2007-07-05: Currently mcast wiring, bypass // everythign else. assert(clusterSend); @@ -65,39 +105,26 @@ void SessionManager::update(FrameHandler::Chains& chains) FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in)); chains.in = classify; - // FIXME aconway 2007-07-05: Need to stop bypassed frames - // from overtaking mcast frames. - // - - // Leave outgoing chain unmodified. + // Leave out chain unmodified. // TODO aconway 2007-07-05: Failover will require replication of // outgoing frames to session replicas. - } void SessionManager::handle(SessionFrame& frame) { - // Incoming from frame. - FrameHandler::Chains chains; + // Incoming from cluster. { Mutex::ScopedLock l(lock); + assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming? SessionMap::iterator i = sessions.find(frame.uuid); if (i == sessions.end()) { - QPID_LOG(trace, "Non-local frame cluster: " << frame.frame); - chains = nonLocal; + // Non local method frame, invoke. + localBroker->handle(frame.frame); } else { - QPID_LOG(trace, "Local frame from cluster: " << frame.frame); - chains = i->second; + // Local frame, continue on local chain + i->second.in->handle(frame.frame); } } - FrameHandler::Chain chain = - chain = frame.isIncoming ? chains.in : chains.out; - // TODO aconway 2007-07-11: Should this be assert(chain) - if (chain) - chain->handle(frame.frame); - - // TODO aconway 2007-07-05: Here's where we should unblock frame - // dispatch for the channel. } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/SessionManager.h b/qpid/cpp/src/qpid/cluster/SessionManager.h index c23efde18e..77fc71733b 100644 --- a/qpid/cpp/src/qpid/cluster/SessionManager.h +++ b/qpid/cpp/src/qpid/cluster/SessionManager.h @@ -19,25 +19,33 @@ * */ -#include "qpid/broker/BrokerChannel.h" #include "qpid/cluster/SessionFrame.h" #include "qpid/framing/HandlerUpdater.h" +#include "qpid/framing/FrameHandler.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" +#include + #include namespace qpid { + +namespace broker { +class Broker; +} + namespace cluster { /** * Manage sessions and handler chains for the cluster. * */ -class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler +class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler, + private boost::noncopyable { public: - SessionManager(); + SessionManager(broker::Broker& broker); /** Set the handler to send to the cluster */ void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; } @@ -52,12 +60,13 @@ class SessionManager : public framing::HandlerUpdater, public SessionFrameHandle framing::ChannelId getChannelId(const framing::Uuid&) const; private: + class SessionOperations; typedef std::map SessionMap; sys::Mutex lock; SessionFrameHandler::Chain clusterSend; + framing::FrameHandler::Chain localBroker; SessionMap sessions; - framing::FrameHandler::Chains nonLocal; }; diff --git a/qpid/cpp/src/qpid/framing/ChannelAdapter.h b/qpid/cpp/src/qpid/framing/ChannelAdapter.h index 50b1c9ff7e..a7c9c61640 100644 --- a/qpid/cpp/src/qpid/framing/ChannelAdapter.h +++ b/qpid/cpp/src/qpid/framing/ChannelAdapter.h @@ -75,8 +75,8 @@ class ChannelAdapter : protected BodyHandler { *response to this frame. Ignored if body is not a Request. *@return If body is a request, the ID assigned else 0. */ - RequestId send(shared_ptr body, - Correlator::Action action=Correlator::Action()); + virtual RequestId send(shared_ptr body, + Correlator::Action action=Correlator::Action()); virtual bool isOpen() const = 0; diff --git a/qpid/cpp/src/tests/Cluster.h b/qpid/cpp/src/tests/Cluster.h index c2909e0c1b..e896fccafe 100644 --- a/qpid/cpp/src/tests/Cluster.h +++ b/qpid/cpp/src/tests/Cluster.h @@ -64,9 +64,9 @@ class TestHandler : public Handler, public vector Mutex::ScopedLock l(lock); BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<size()); AbsTime deadline(now(), 2*TIME_SEC); - while (vector::size() < n && lock.wait(deadline)) + while (this->size() < n && lock.wait(deadline)) ; - return vector::size() >= n; + return this->size() >= n; } }; diff --git a/qpid/cpp/src/tests/Cluster_child.cpp b/qpid/cpp/src/tests/Cluster_child.cpp index 9c119e5238..c509dc1950 100644 --- a/qpid/cpp/src/tests/Cluster_child.cpp +++ b/qpid/cpp/src/tests/Cluster_child.cpp @@ -32,15 +32,14 @@ static const ProtocolVersion VER; /** Chlid part of Cluster::clusterTwo test */ void clusterTwo() { - TestCluster cluster("clusterTwo", "amqp::2"); + TestCluster cluster("clusterTwo", "amqp:child:2"); BOOST_REQUIRE(cluster.received.waitFor(1)); // Frame from parent. BOOST_CHECK(cluster.received[0].isIncoming); BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.received[0].frame.getBody()); BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent AMQFrame frame(VER, 1, new ChannelOkBody(VER)); - Uuid id(true); - SessionFrame sf(id, frame, false); + SessionFrame sf(cluster.received[0].uuid, frame, false); cluster.handle(sf); BOOST_REQUIRE(cluster.received.waitFor(2)); BOOST_CHECK(!cluster.received[1].isIncoming); diff --git a/qpid/cpp/src/tests/TestOptions.h b/qpid/cpp/src/tests/TestOptions.h index ee3af0873a..5b3c0958f5 100644 --- a/qpid/cpp/src/tests/TestOptions.h +++ b/qpid/cpp/src/tests/TestOptions.h @@ -22,13 +22,18 @@ */ #include "qpid/Options.h" +#include "qpid/log/Options.h" #include "qpid/Url.h" +#include "qpid/log/Logger.h" + +#include +#include namespace qpid { struct TestOptions : public qpid::Options { - TestOptions() : Options("Test Options"), host("localhost"), port(TcpAddress::DEFAULT_PORT), clientid("cpp"), trace(false), help(false) + TestOptions() : Options("Test Options"), host("localhost"), port(TcpAddress::DEFAULT_PORT), clientid("cpp"), help(false) { addOptions() ("host,h", optValue(host, "HOST"), "Broker host to connect to") @@ -39,10 +44,26 @@ struct TestOptions : public qpid::Options ("clientname,n", optValue(clientid, "ID"), "unique client identifier") ("username", optValue(username, "USER"), "user name for broker log in.") ("password", optValue(password, "USER"), "password for broker log in.") - ("trace,t", optValue(trace), "Turn on debug tracing.") ("help", optValue(help), "print this usage statement"); + add(log); } + /** As well as parsing, print help & exit if required */ + void parse(int argc, char** argv) { + try { + qpid::Options::parse(argc, argv); + } catch (const std::exception& e) { + std::cout << e.what() << std::endl << *this << std::endl; + exit(1); + } + if (help) { + std::cout << *this << std::endl; + exit(0); + } + trace = log.trace; + qpid::log::Logger::instance().configure(log, argv[0]); + } + std::string host; uint16_t port; std::string virtualhost; @@ -51,6 +72,7 @@ struct TestOptions : public qpid::Options std::string password; bool trace; bool help; + log::Options log; }; } diff --git a/qpid/cpp/src/tests/cluster_client.cpp b/qpid/cpp/src/tests/cluster_client.cpp index 421a33a40a..7620faa9fa 100644 --- a/qpid/cpp/src/tests/cluster_client.cpp +++ b/qpid/cpp/src/tests/cluster_client.cpp @@ -52,6 +52,7 @@ struct ClusterConnections : public vector > { }; BOOST_AUTO_TEST_CASE(testWiringReplication) { + // Declare on one broker, use on others. ClusterConnections cluster; BOOST_REQUIRE(cluster.size() > 1); @@ -63,13 +64,17 @@ BOOST_AUTO_TEST_CASE(testWiringReplication) { broker0.declareExchange(fooEx); broker0.declareQueue(fooQ); broker0.bind(fooEx, fooQ, "FooKey"); - - Channel broker1; - cluster[1]->openChannel(broker1); - broker1.publish(Message("hello"), fooEx, "FooKey"); - Message m; - BOOST_REQUIRE(broker1.get(m, fooQ)); - BOOST_REQUIRE_EQUAL(m.getData(), "hello"); + broker0.close(); + + for (size_t i = 1; i < cluster.size(); ++i) { + Channel ch; + cluster[i]->openChannel(ch); + ch.publish(Message("hello"), fooEx, "FooKey"); + Message m; + BOOST_REQUIRE(ch.get(m, fooQ)); + BOOST_REQUIRE_EQUAL(m.getData(), "hello"); + ch.close(); + } } diff --git a/qpid/cpp/src/tests/start_cluster b/qpid/cpp/src/tests/start_cluster index c2806bb225..8f44854978 100755 --- a/qpid/cpp/src/tests/start_cluster +++ b/qpid/cpp/src/tests/start_cluster @@ -6,14 +6,14 @@ test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; } test -z "$*" && { echo "Usage: $0 cluster-size [options]"; exit 1; } +rm -f cluster*.log cluster.ports SIZE=$1 shift OPTS=$* - +CLUSTER=`whoami` # Cluster name=user name, avoid clashes. for (( i=0; i> cluster.ports done -echo $PORTS > cluster.ports diff --git a/qpid/cpp/src/tests/stop_cluster b/qpid/cpp/src/tests/stop_cluster index f5db5a4488..6afcb527e5 100755 --- a/qpid/cpp/src/tests/stop_cluster +++ b/qpid/cpp/src/tests/stop_cluster @@ -6,11 +6,9 @@ PORTS=`cat cluster.ports` for PORT in $PORTS ; do ../qpidd -qp $PORT || ERROR="$ERROR $PORT" done +rm -f cluster.ports if [ -n "$ERROR" ]; then echo "Errors stopping brokers on ports: $ERROR" - echo $ERROR > cluster.ports exit 1 -else - rm cluster.ports fi -- cgit v1.2.1