From b7c528b027bff7585481c9ce3a01144040c6de5a Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 24 Jul 2007 19:39:27 +0000 Subject: * Summary: - Wiring (declare/delete/bind) is replicated via AIS. - TestOptions includes all logging options. - Logger automatically parses env vars so logging can be enabled for any program linked with libqpidcommon e.g. by setting QPID_TRACE=1. * src/qpid/cluster/SessionManager.cpp: Handle frames from cluster - Forward to BrokerAdapter for execution. - Suppress responses in proxy. * src/tests/TestOptions.h (Options): Logging options, --help option. * src/qpid/client/ClientConnection.cpp: Removed log initialization. Logs are initialized either in TestOptions or automatically from env vars, e.g. QPID_TRACE, * src/qpid/QpidError.h (class QpidError): Initialize Exception in constructor so messages can be logged. * src/qpid/framing/ChannelAdapter.h: Made send() virtual. * src/tests/Cluster_child.cpp: UUID corrected. * src/qpid/broker/Broker.cpp: Pass chains to updater by ref. * src/qpid/Options.cpp (parse): Fix log settings from environment. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559171 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/Options.cpp | 33 ++++++++---- cpp/src/qpid/QpidError.cpp | 5 +- cpp/src/qpid/QpidError.h | 9 ++-- cpp/src/qpid/broker/Broker.cpp | 2 +- cpp/src/qpid/client/ClientConnection.cpp | 3 -- cpp/src/qpid/cluster/ClassifierHandler.cpp | 3 ++ cpp/src/qpid/cluster/ClusterPlugin.cpp | 5 +- cpp/src/qpid/cluster/SessionManager.cpp | 81 ++++++++++++++++++++---------- cpp/src/qpid/cluster/SessionManager.h | 17 +++++-- cpp/src/qpid/framing/ChannelAdapter.h | 4 +- 10 files changed, 104 insertions(+), 58 deletions(-) (limited to 'cpp/src/qpid') diff --git a/cpp/src/qpid/Options.cpp b/cpp/src/qpid/Options.cpp index 7f4536ff7b..2b6cff44f6 100644 --- a/cpp/src/qpid/Options.cpp +++ b/cpp/src/qpid/Options.cpp @@ -18,6 +18,9 @@ #include "Options.h" #include "qpid/Exception.h" + +#include + #include #include #include @@ -28,18 +31,26 @@ using namespace std; namespace { -char env2optchar(char env) { return (env=='_') ? '-' : tolower(env); } +struct EnvOptMapper { + static bool matchChar(char env, char opt) { + return (env==toupper(opt)) || (strchr("-.", opt) && env=='_'); + } -struct Mapper { - Mapper(const Options& o) : opts(o) {} - string operator()(const string& env) { + static bool matchStr(const string& env, boost::shared_ptr desc) { + return std::equal(env.begin(), env.end(), desc->long_name().begin(), &matchChar); + } + + EnvOptMapper(const Options& o) : opts(o) {} + + string operator()(const string& envVar) { static const std::string prefix("QPID_"); - if (env.substr(0, prefix.size()) == prefix) { - string opt = env.substr(prefix.size()); - transform(opt.begin(), opt.end(), opt.begin(), env2optchar); - // Ignore env vars that don't match to known options. - if (opts.find_nothrow(opt, false)) - return opt; + if (envVar.substr(0, prefix.size()) == prefix) { + string env = envVar.substr(prefix.size()); + typedef const std::vector< boost::shared_ptr > OptDescs; + OptDescs::const_iterator i = + find_if(opts.options().begin(), opts.options().end(), boost::bind(matchStr, env, _1)); + if (i != opts.options().end()) + return (*i)->long_name(); } return string(); } @@ -62,7 +73,7 @@ void Options::parse(int argc, char** argv, const std::string& configFile) if (argc > 0 && argv != 0) po::store(po::parse_command_line(argc, argv, *this), vm); parsing="environment variables"; - po::store(po::parse_environment(*this, Mapper(*this)), vm); + po::store(po::parse_environment(*this, EnvOptMapper(*this)), vm); po::notify(vm); // configFile may be updated from arg/env options. if (!configFile.empty()) { parsing="configuration file "+configFile; diff --git a/cpp/src/qpid/QpidError.cpp b/cpp/src/qpid/QpidError.cpp index fcd5af47a5..740ec24e54 100644 --- a/cpp/src/qpid/QpidError.cpp +++ b/cpp/src/qpid/QpidError.cpp @@ -34,9 +34,8 @@ Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_pt void QpidError::throwSelf() const { throw *this; } -void QpidError::init() { - whatStr = boost::str(boost::format("Error [%d] %s (%s:%d)") - % code % msg % loc.file % loc.line); +std::string QpidError::message(int code, const std::string& msg, const char* file, int line) { + return (boost::format("Error [%d] %s (%s:%d)") % code % msg % file % line).str(); } diff --git a/cpp/src/qpid/QpidError.h b/cpp/src/qpid/QpidError.h index dea0902a0e..2ff6571365 100644 --- a/cpp/src/qpid/QpidError.h +++ b/cpp/src/qpid/QpidError.h @@ -48,16 +48,15 @@ class QpidError : public Exception template QpidError(int code_, const T& msg_, const SrcLine& loc_) throw() - : code(code_), loc(loc_), msg(boost::lexical_cast(msg_)) - { init(); } + : Exception(message(code_, boost::lexical_cast(msg_), loc_.file.c_str(), loc_.line)), + code(code_), loc(loc_), msg(boost::lexical_cast(msg_)) {} ~QpidError() throw(); Exception::auto_ptr clone() const throw(); void throwSelf() const; - private: - - void init(); + /** Format message for exception. */ + static std::string message(int code, const std::string& msg, const char* file, int line); }; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 86342b3c43..26ec55ac44 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -164,7 +164,7 @@ void Broker::add(const shared_ptr& updater) { void Broker::update(FrameHandler::Chains& chains) { for_each(handlerUpdaters.begin(), handlerUpdaters.end(), - boost::bind(&HandlerUpdater::update, _1, chains)); + boost::bind(&HandlerUpdater::update, _1, boost::ref(chains))); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp index 102de555fd..4b8f32a26f 100644 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ b/cpp/src/qpid/client/ClientConnection.cpp @@ -51,9 +51,6 @@ Connection::Connection( isOpen(false), debug(_debug) { setConnector(defaultConnector); - qpid::log::Options o; - o.trace = debug; - qpid::log::Logger::instance().configure(o, "qpid-c++-client"); } Connection::~Connection(){} diff --git a/cpp/src/qpid/cluster/ClassifierHandler.cpp b/cpp/src/qpid/cluster/ClassifierHandler.cpp index 0d0465c89e..1cce126800 100644 --- a/cpp/src/qpid/cluster/ClassifierHandler.cpp +++ b/cpp/src/qpid/cluster/ClassifierHandler.cpp @@ -61,6 +61,9 @@ void ClassifierHandler::handle(AMQFrame& frame) { Chain chosen; shared_ptr method = dynamic_pointer_cast(frame.getBody()); + // FIXME aconway 2007-07-05: Need to stop bypassed frames + // from overtaking mcast frames. + // if (method) chosen=map[fullId(method)]; if (chosen) diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 10b1c44f40..b00152cbcd 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -50,9 +50,10 @@ struct ClusterPlugin : public Plugin { // Only provide to a Broker, and only if the --cluster config is set. if (broker && !options.clusterName.empty()) { assert(!cluster); // A process can only belong to one cluster. - sessions.reset(new SessionManager()); + + sessions.reset(new SessionManager(*broker)); cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions)); - sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10: + sessions->setClusterSend(cluster); broker->add(sessions); } } diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp index 24f201535d..9f6438cf92 100644 --- a/cpp/src/qpid/cluster/SessionManager.cpp +++ b/cpp/src/qpid/cluster/SessionManager.cpp @@ -16,17 +16,59 @@ * */ +#include "SessionManager.h" +#include "ClassifierHandler.h" + #include "qpid/log/Statement.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQFrame.h" -#include "SessionManager.h" -#include "ClassifierHandler.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/broker/BrokerAdapter.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/BrokerChannel.h" +#include "qpid/framing/ChannelAdapter.h" namespace qpid { namespace cluster { using namespace framing; using namespace sys; +using namespace broker; + +/** Handler to send frames direct to local broker (bypass correlation etc.) */ +struct BrokerHandler : public FrameHandler, private ChannelAdapter { + Connection connection; + Channel channel; + BrokerAdapter adapter; + + // TODO aconway 2007-07-23: Lots of needless flab here (Channel, + // Connection, ChannelAdapter) As these classes are untangled the + // flab can be reduced. The real requirements are: + // - Dispatch methods direct to broker bypassing all the correlation muck + // - Efficiently suppress responses + // For the latter we are now using a ChannelAdapter with noop send() + // A more efficient solution would be a no-op proxy. + // + BrokerHandler(Broker& broker) : + connection(0, broker), + channel(connection, 1, 0), + adapter(channel, connection, broker, *this) {} + + void handle(AMQFrame& frame) { + AMQMethodBody* body=dynamic_cast(frame.body.get()); + assert(body); + body->invoke(adapter, MethodContext()); // TODO aconway 2007-07-24: Remove MethodContext + } + + // Dummy methods. + virtual void handleHeader(boost::shared_ptr){} + virtual void handleContent(boost::shared_ptr){} + virtual void handleHeartbeat(boost::shared_ptr){} + virtual bool isOpen() const{ return true; } + virtual void handleMethodInContext(shared_ptr, const MethodContext&){} + // No-op send. + virtual RequestId send(shared_ptr, Correlator::Action) { return 0; } +}; /** Wrap plain AMQFrames in SessionFrames */ struct FrameWrapperHandler : public FrameHandler { @@ -47,17 +89,15 @@ struct FrameWrapperHandler : public FrameHandler { SessionFrameHandler::Chain next; }; -SessionManager::SessionManager() {} +SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {} -void SessionManager::update(FrameHandler::Chains& chains) -{ +void SessionManager::update(FrameHandler::Chains& chains) { Mutex::ScopedLock l(lock); // Create a new local session, store local chains. Uuid uuid(true); sessions[uuid] = chains; - // Replace local incoming chain. Build from the back. - // + // Replace local in chain. Build from the back. // TODO aconway 2007-07-05: Currently mcast wiring, bypass // everythign else. assert(clusterSend); @@ -65,39 +105,26 @@ void SessionManager::update(FrameHandler::Chains& chains) FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in)); chains.in = classify; - // FIXME aconway 2007-07-05: Need to stop bypassed frames - // from overtaking mcast frames. - // - - // Leave outgoing chain unmodified. + // Leave out chain unmodified. // TODO aconway 2007-07-05: Failover will require replication of // outgoing frames to session replicas. - } void SessionManager::handle(SessionFrame& frame) { - // Incoming from frame. - FrameHandler::Chains chains; + // Incoming from cluster. { Mutex::ScopedLock l(lock); + assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming? SessionMap::iterator i = sessions.find(frame.uuid); if (i == sessions.end()) { - QPID_LOG(trace, "Non-local frame cluster: " << frame.frame); - chains = nonLocal; + // Non local method frame, invoke. + localBroker->handle(frame.frame); } else { - QPID_LOG(trace, "Local frame from cluster: " << frame.frame); - chains = i->second; + // Local frame, continue on local chain + i->second.in->handle(frame.frame); } } - FrameHandler::Chain chain = - chain = frame.isIncoming ? chains.in : chains.out; - // TODO aconway 2007-07-11: Should this be assert(chain) - if (chain) - chain->handle(frame.frame); - - // TODO aconway 2007-07-05: Here's where we should unblock frame - // dispatch for the channel. } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h index c23efde18e..77fc71733b 100644 --- a/cpp/src/qpid/cluster/SessionManager.h +++ b/cpp/src/qpid/cluster/SessionManager.h @@ -19,25 +19,33 @@ * */ -#include "qpid/broker/BrokerChannel.h" #include "qpid/cluster/SessionFrame.h" #include "qpid/framing/HandlerUpdater.h" +#include "qpid/framing/FrameHandler.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" +#include + #include namespace qpid { + +namespace broker { +class Broker; +} + namespace cluster { /** * Manage sessions and handler chains for the cluster. * */ -class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler +class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler, + private boost::noncopyable { public: - SessionManager(); + SessionManager(broker::Broker& broker); /** Set the handler to send to the cluster */ void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; } @@ -52,12 +60,13 @@ class SessionManager : public framing::HandlerUpdater, public SessionFrameHandle framing::ChannelId getChannelId(const framing::Uuid&) const; private: + class SessionOperations; typedef std::map SessionMap; sys::Mutex lock; SessionFrameHandler::Chain clusterSend; + framing::FrameHandler::Chain localBroker; SessionMap sessions; - framing::FrameHandler::Chains nonLocal; }; diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h index 50b1c9ff7e..a7c9c61640 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.h +++ b/cpp/src/qpid/framing/ChannelAdapter.h @@ -75,8 +75,8 @@ class ChannelAdapter : protected BodyHandler { *response to this frame. Ignored if body is not a Request. *@return If body is a request, the ID assigned else 0. */ - RequestId send(shared_ptr body, - Correlator::Action action=Correlator::Action()); + virtual RequestId send(shared_ptr body, + Correlator::Action action=Correlator::Action()); virtual bool isOpen() const = 0; -- cgit v1.2.1