diff options
| author | Alan Conway <aconway@apache.org> | 2007-07-24 19:39:27 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-07-24 19:39:27 +0000 |
| commit | b7c528b027bff7585481c9ce3a01144040c6de5a (patch) | |
| tree | 6e4588e6b52a5a5457767ae9f8b59cddcfd28ef6 /cpp/src/qpid | |
| parent | 0dcc71862cb48a79263a05facd4c42453441cbb5 (diff) | |
| download | qpid-python-b7c528b027bff7585481c9ce3a01144040c6de5a.tar.gz | |
* Summary:
- Wiring (declare/delete/bind) is replicated via AIS.
- TestOptions includes all logging options.
- Logger automatically parses env vars so logging can be enabled
for any program linked with libqpidcommon e.g. by setting QPID_TRACE=1.
* src/qpid/cluster/SessionManager.cpp: Handle frames from cluster
- Forward to BrokerAdapter for execution.
- Suppress responses in proxy.
* src/tests/TestOptions.h (Options): Logging options, --help option.
* src/qpid/client/ClientConnection.cpp: Removed log initialization.
Logs are initialized either in TestOptions or automatically from env vars,
e.g. QPID_TRACE,
* src/qpid/QpidError.h (class QpidError): Initialize Exception in
constructor so messages can be logged.
* src/qpid/framing/ChannelAdapter.h: Made send() virtual.
* src/tests/Cluster_child.cpp: UUID corrected.
* src/qpid/broker/Broker.cpp: Pass chains to updater by ref.
* src/qpid/Options.cpp (parse): Fix log settings from environment.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559171 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/Options.cpp | 33 | ||||
| -rw-r--r-- | cpp/src/qpid/QpidError.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/QpidError.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ClientConnection.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClassifierHandler.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/SessionManager.cpp | 81 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/SessionManager.h | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.h | 4 |
10 files changed, 104 insertions, 58 deletions
diff --git a/cpp/src/qpid/Options.cpp b/cpp/src/qpid/Options.cpp index 7f4536ff7b..2b6cff44f6 100644 --- a/cpp/src/qpid/Options.cpp +++ b/cpp/src/qpid/Options.cpp @@ -18,6 +18,9 @@ #include "Options.h" #include "qpid/Exception.h" + +#include <boost/bind.hpp> + #include <fstream> #include <algorithm> #include <iostream> @@ -28,18 +31,26 @@ using namespace std; namespace { -char env2optchar(char env) { return (env=='_') ? '-' : tolower(env); } +struct EnvOptMapper { + static bool matchChar(char env, char opt) { + return (env==toupper(opt)) || (strchr("-.", opt) && env=='_'); + } -struct Mapper { - Mapper(const Options& o) : opts(o) {} - string operator()(const string& env) { + static bool matchStr(const string& env, boost::shared_ptr<po::option_description> desc) { + return std::equal(env.begin(), env.end(), desc->long_name().begin(), &matchChar); + } + + EnvOptMapper(const Options& o) : opts(o) {} + + string operator()(const string& envVar) { static const std::string prefix("QPID_"); - if (env.substr(0, prefix.size()) == prefix) { - string opt = env.substr(prefix.size()); - transform(opt.begin(), opt.end(), opt.begin(), env2optchar); - // Ignore env vars that don't match to known options. - if (opts.find_nothrow(opt, false)) - return opt; + if (envVar.substr(0, prefix.size()) == prefix) { + string env = envVar.substr(prefix.size()); + typedef const std::vector< boost::shared_ptr<po::option_description> > OptDescs; + OptDescs::const_iterator i = + find_if(opts.options().begin(), opts.options().end(), boost::bind(matchStr, env, _1)); + if (i != opts.options().end()) + return (*i)->long_name(); } return string(); } @@ -62,7 +73,7 @@ void Options::parse(int argc, char** argv, const std::string& configFile) if (argc > 0 && argv != 0) po::store(po::parse_command_line(argc, argv, *this), vm); parsing="environment variables"; - po::store(po::parse_environment(*this, Mapper(*this)), vm); + po::store(po::parse_environment(*this, EnvOptMapper(*this)), vm); po::notify(vm); // configFile may be updated from arg/env options. if (!configFile.empty()) { parsing="configuration file "+configFile; diff --git a/cpp/src/qpid/QpidError.cpp b/cpp/src/qpid/QpidError.cpp index fcd5af47a5..740ec24e54 100644 --- a/cpp/src/qpid/QpidError.cpp +++ b/cpp/src/qpid/QpidError.cpp @@ -34,9 +34,8 @@ Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_pt void QpidError::throwSelf() const { throw *this; } -void QpidError::init() { - whatStr = boost::str(boost::format("Error [%d] %s (%s:%d)") - % code % msg % loc.file % loc.line); +std::string QpidError::message(int code, const std::string& msg, const char* file, int line) { + return (boost::format("Error [%d] %s (%s:%d)") % code % msg % file % line).str(); } diff --git a/cpp/src/qpid/QpidError.h b/cpp/src/qpid/QpidError.h index dea0902a0e..2ff6571365 100644 --- a/cpp/src/qpid/QpidError.h +++ b/cpp/src/qpid/QpidError.h @@ -48,16 +48,15 @@ class QpidError : public Exception template <class T> QpidError(int code_, const T& msg_, const SrcLine& loc_) throw() - : code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_)) - { init(); } + : Exception(message(code_, boost::lexical_cast<std::string>(msg_), loc_.file.c_str(), loc_.line)), + code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_)) {} ~QpidError() throw(); Exception::auto_ptr clone() const throw(); void throwSelf() const; - private: - - void init(); + /** Format message for exception. */ + static std::string message(int code, const std::string& msg, const char* file, int line); }; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 86342b3c43..26ec55ac44 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -164,7 +164,7 @@ void Broker::add(const shared_ptr<HandlerUpdater>& updater) { void Broker::update(FrameHandler::Chains& chains) { for_each(handlerUpdaters.begin(), handlerUpdaters.end(), - boost::bind(&HandlerUpdater::update, _1, chains)); + boost::bind(&HandlerUpdater::update, _1, boost::ref(chains))); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp index 102de555fd..4b8f32a26f 100644 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ b/cpp/src/qpid/client/ClientConnection.cpp @@ -51,9 +51,6 @@ Connection::Connection( isOpen(false), debug(_debug) { setConnector(defaultConnector); - qpid::log::Options o; - o.trace = debug; - qpid::log::Logger::instance().configure(o, "qpid-c++-client"); } Connection::~Connection(){} diff --git a/cpp/src/qpid/cluster/ClassifierHandler.cpp b/cpp/src/qpid/cluster/ClassifierHandler.cpp index 0d0465c89e..1cce126800 100644 --- a/cpp/src/qpid/cluster/ClassifierHandler.cpp +++ b/cpp/src/qpid/cluster/ClassifierHandler.cpp @@ -61,6 +61,9 @@ void ClassifierHandler::handle(AMQFrame& frame) { Chain chosen; shared_ptr<AMQMethodBody> method = dynamic_pointer_cast<AMQMethodBody>(frame.getBody()); + // FIXME aconway 2007-07-05: Need to stop bypassed frames + // from overtaking mcast frames. + // if (method) chosen=map[fullId(method)]; if (chosen) diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 10b1c44f40..b00152cbcd 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -50,9 +50,10 @@ struct ClusterPlugin : public Plugin { // Only provide to a Broker, and only if the --cluster config is set. if (broker && !options.clusterName.empty()) { assert(!cluster); // A process can only belong to one cluster. - sessions.reset(new SessionManager()); + + sessions.reset(new SessionManager(*broker)); cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions)); - sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10: + sessions->setClusterSend(cluster); broker->add(sessions); } } diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp index 24f201535d..9f6438cf92 100644 --- a/cpp/src/qpid/cluster/SessionManager.cpp +++ b/cpp/src/qpid/cluster/SessionManager.cpp @@ -16,17 +16,59 @@ * */ +#include "SessionManager.h" +#include "ClassifierHandler.h" + #include "qpid/log/Statement.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQFrame.h" -#include "SessionManager.h" -#include "ClassifierHandler.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/broker/BrokerAdapter.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/BrokerChannel.h" +#include "qpid/framing/ChannelAdapter.h" namespace qpid { namespace cluster { using namespace framing; using namespace sys; +using namespace broker; + +/** Handler to send frames direct to local broker (bypass correlation etc.) */ +struct BrokerHandler : public FrameHandler, private ChannelAdapter { + Connection connection; + Channel channel; + BrokerAdapter adapter; + + // TODO aconway 2007-07-23: Lots of needless flab here (Channel, + // Connection, ChannelAdapter) As these classes are untangled the + // flab can be reduced. The real requirements are: + // - Dispatch methods direct to broker bypassing all the correlation muck + // - Efficiently suppress responses + // For the latter we are now using a ChannelAdapter with noop send() + // A more efficient solution would be a no-op proxy. + // + BrokerHandler(Broker& broker) : + connection(0, broker), + channel(connection, 1, 0), + adapter(channel, connection, broker, *this) {} + + void handle(AMQFrame& frame) { + AMQMethodBody* body=dynamic_cast<AMQMethodBody*>(frame.body.get()); + assert(body); + body->invoke(adapter, MethodContext()); // TODO aconway 2007-07-24: Remove MethodContext + } + + // Dummy methods. + virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>){} + virtual void handleContent(boost::shared_ptr<AMQContentBody>){} + virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>){} + virtual bool isOpen() const{ return true; } + virtual void handleMethodInContext(shared_ptr<AMQMethodBody>, const MethodContext&){} + // No-op send. + virtual RequestId send(shared_ptr<AMQBody>, Correlator::Action) { return 0; } +}; /** Wrap plain AMQFrames in SessionFrames */ struct FrameWrapperHandler : public FrameHandler { @@ -47,17 +89,15 @@ struct FrameWrapperHandler : public FrameHandler { SessionFrameHandler::Chain next; }; -SessionManager::SessionManager() {} +SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {} -void SessionManager::update(FrameHandler::Chains& chains) -{ +void SessionManager::update(FrameHandler::Chains& chains) { Mutex::ScopedLock l(lock); // Create a new local session, store local chains. Uuid uuid(true); sessions[uuid] = chains; - // Replace local incoming chain. Build from the back. - // + // Replace local in chain. Build from the back. // TODO aconway 2007-07-05: Currently mcast wiring, bypass // everythign else. assert(clusterSend); @@ -65,39 +105,26 @@ void SessionManager::update(FrameHandler::Chains& chains) FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in)); chains.in = classify; - // FIXME aconway 2007-07-05: Need to stop bypassed frames - // from overtaking mcast frames. - // - - // Leave outgoing chain unmodified. + // Leave out chain unmodified. // TODO aconway 2007-07-05: Failover will require replication of // outgoing frames to session replicas. - } void SessionManager::handle(SessionFrame& frame) { - // Incoming from frame. - FrameHandler::Chains chains; + // Incoming from cluster. { Mutex::ScopedLock l(lock); + assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming? SessionMap::iterator i = sessions.find(frame.uuid); if (i == sessions.end()) { - QPID_LOG(trace, "Non-local frame cluster: " << frame.frame); - chains = nonLocal; + // Non local method frame, invoke. + localBroker->handle(frame.frame); } else { - QPID_LOG(trace, "Local frame from cluster: " << frame.frame); - chains = i->second; + // Local frame, continue on local chain + i->second.in->handle(frame.frame); } } - FrameHandler::Chain chain = - chain = frame.isIncoming ? chains.in : chains.out; - // TODO aconway 2007-07-11: Should this be assert(chain) - if (chain) - chain->handle(frame.frame); - - // TODO aconway 2007-07-05: Here's where we should unblock frame - // dispatch for the channel. } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h index c23efde18e..77fc71733b 100644 --- a/cpp/src/qpid/cluster/SessionManager.h +++ b/cpp/src/qpid/cluster/SessionManager.h @@ -19,25 +19,33 @@ * */ -#include "qpid/broker/BrokerChannel.h" #include "qpid/cluster/SessionFrame.h" #include "qpid/framing/HandlerUpdater.h" +#include "qpid/framing/FrameHandler.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" +#include <boost/noncopyable.hpp> + #include <map> namespace qpid { + +namespace broker { +class Broker; +} + namespace cluster { /** * Manage sessions and handler chains for the cluster. * */ -class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler +class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler, + private boost::noncopyable { public: - SessionManager(); + SessionManager(broker::Broker& broker); /** Set the handler to send to the cluster */ void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; } @@ -52,12 +60,13 @@ class SessionManager : public framing::HandlerUpdater, public SessionFrameHandle framing::ChannelId getChannelId(const framing::Uuid&) const; private: + class SessionOperations; typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap; sys::Mutex lock; SessionFrameHandler::Chain clusterSend; + framing::FrameHandler::Chain localBroker; SessionMap sessions; - framing::FrameHandler::Chains nonLocal; }; diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h index 50b1c9ff7e..a7c9c61640 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.h +++ b/cpp/src/qpid/framing/ChannelAdapter.h @@ -75,8 +75,8 @@ class ChannelAdapter : protected BodyHandler { *response to this frame. Ignored if body is not a Request. *@return If body is a request, the ID assigned else 0. */ - RequestId send(shared_ptr<AMQBody> body, - Correlator::Action action=Correlator::Action()); + virtual RequestId send(shared_ptr<AMQBody> body, + Correlator::Action action=Correlator::Action()); virtual bool isOpen() const = 0; |
