summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-24 19:39:27 +0000
committerAlan Conway <aconway@apache.org>2007-07-24 19:39:27 +0000
commitb7c528b027bff7585481c9ce3a01144040c6de5a (patch)
tree6e4588e6b52a5a5457767ae9f8b59cddcfd28ef6 /cpp/src/qpid
parent0dcc71862cb48a79263a05facd4c42453441cbb5 (diff)
downloadqpid-python-b7c528b027bff7585481c9ce3a01144040c6de5a.tar.gz
* Summary:
- Wiring (declare/delete/bind) is replicated via AIS. - TestOptions includes all logging options. - Logger automatically parses env vars so logging can be enabled for any program linked with libqpidcommon e.g. by setting QPID_TRACE=1. * src/qpid/cluster/SessionManager.cpp: Handle frames from cluster - Forward to BrokerAdapter for execution. - Suppress responses in proxy. * src/tests/TestOptions.h (Options): Logging options, --help option. * src/qpid/client/ClientConnection.cpp: Removed log initialization. Logs are initialized either in TestOptions or automatically from env vars, e.g. QPID_TRACE, * src/qpid/QpidError.h (class QpidError): Initialize Exception in constructor so messages can be logged. * src/qpid/framing/ChannelAdapter.h: Made send() virtual. * src/tests/Cluster_child.cpp: UUID corrected. * src/qpid/broker/Broker.cpp: Pass chains to updater by ref. * src/qpid/Options.cpp (parse): Fix log settings from environment. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559171 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/Options.cpp33
-rw-r--r--cpp/src/qpid/QpidError.cpp5
-rw-r--r--cpp/src/qpid/QpidError.h9
-rw-r--r--cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--cpp/src/qpid/client/ClientConnection.cpp3
-rw-r--r--cpp/src/qpid/cluster/ClassifierHandler.cpp3
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp5
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp81
-rw-r--r--cpp/src/qpid/cluster/SessionManager.h17
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.h4
10 files changed, 104 insertions, 58 deletions
diff --git a/cpp/src/qpid/Options.cpp b/cpp/src/qpid/Options.cpp
index 7f4536ff7b..2b6cff44f6 100644
--- a/cpp/src/qpid/Options.cpp
+++ b/cpp/src/qpid/Options.cpp
@@ -18,6 +18,9 @@
#include "Options.h"
#include "qpid/Exception.h"
+
+#include <boost/bind.hpp>
+
#include <fstream>
#include <algorithm>
#include <iostream>
@@ -28,18 +31,26 @@ using namespace std;
namespace {
-char env2optchar(char env) { return (env=='_') ? '-' : tolower(env); }
+struct EnvOptMapper {
+ static bool matchChar(char env, char opt) {
+ return (env==toupper(opt)) || (strchr("-.", opt) && env=='_');
+ }
-struct Mapper {
- Mapper(const Options& o) : opts(o) {}
- string operator()(const string& env) {
+ static bool matchStr(const string& env, boost::shared_ptr<po::option_description> desc) {
+ return std::equal(env.begin(), env.end(), desc->long_name().begin(), &matchChar);
+ }
+
+ EnvOptMapper(const Options& o) : opts(o) {}
+
+ string operator()(const string& envVar) {
static const std::string prefix("QPID_");
- if (env.substr(0, prefix.size()) == prefix) {
- string opt = env.substr(prefix.size());
- transform(opt.begin(), opt.end(), opt.begin(), env2optchar);
- // Ignore env vars that don't match to known options.
- if (opts.find_nothrow(opt, false))
- return opt;
+ if (envVar.substr(0, prefix.size()) == prefix) {
+ string env = envVar.substr(prefix.size());
+ typedef const std::vector< boost::shared_ptr<po::option_description> > OptDescs;
+ OptDescs::const_iterator i =
+ find_if(opts.options().begin(), opts.options().end(), boost::bind(matchStr, env, _1));
+ if (i != opts.options().end())
+ return (*i)->long_name();
}
return string();
}
@@ -62,7 +73,7 @@ void Options::parse(int argc, char** argv, const std::string& configFile)
if (argc > 0 && argv != 0)
po::store(po::parse_command_line(argc, argv, *this), vm);
parsing="environment variables";
- po::store(po::parse_environment(*this, Mapper(*this)), vm);
+ po::store(po::parse_environment(*this, EnvOptMapper(*this)), vm);
po::notify(vm); // configFile may be updated from arg/env options.
if (!configFile.empty()) {
parsing="configuration file "+configFile;
diff --git a/cpp/src/qpid/QpidError.cpp b/cpp/src/qpid/QpidError.cpp
index fcd5af47a5..740ec24e54 100644
--- a/cpp/src/qpid/QpidError.cpp
+++ b/cpp/src/qpid/QpidError.cpp
@@ -34,9 +34,8 @@ Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_pt
void QpidError::throwSelf() const { throw *this; }
-void QpidError::init() {
- whatStr = boost::str(boost::format("Error [%d] %s (%s:%d)")
- % code % msg % loc.file % loc.line);
+std::string QpidError::message(int code, const std::string& msg, const char* file, int line) {
+ return (boost::format("Error [%d] %s (%s:%d)") % code % msg % file % line).str();
}
diff --git a/cpp/src/qpid/QpidError.h b/cpp/src/qpid/QpidError.h
index dea0902a0e..2ff6571365 100644
--- a/cpp/src/qpid/QpidError.h
+++ b/cpp/src/qpid/QpidError.h
@@ -48,16 +48,15 @@ class QpidError : public Exception
template <class T>
QpidError(int code_, const T& msg_, const SrcLine& loc_) throw()
- : code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_))
- { init(); }
+ : Exception(message(code_, boost::lexical_cast<std::string>(msg_), loc_.file.c_str(), loc_.line)),
+ code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_)) {}
~QpidError() throw();
Exception::auto_ptr clone() const throw();
void throwSelf() const;
- private:
-
- void init();
+ /** Format message for exception. */
+ static std::string message(int code, const std::string& msg, const char* file, int line);
};
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 86342b3c43..26ec55ac44 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -164,7 +164,7 @@ void Broker::add(const shared_ptr<HandlerUpdater>& updater) {
void Broker::update(FrameHandler::Chains& chains) {
for_each(handlerUpdaters.begin(), handlerUpdaters.end(),
- boost::bind(&HandlerUpdater::update, _1, chains));
+ boost::bind(&HandlerUpdater::update, _1, boost::ref(chains)));
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp
index 102de555fd..4b8f32a26f 100644
--- a/cpp/src/qpid/client/ClientConnection.cpp
+++ b/cpp/src/qpid/client/ClientConnection.cpp
@@ -51,9 +51,6 @@ Connection::Connection(
isOpen(false), debug(_debug)
{
setConnector(defaultConnector);
- qpid::log::Options o;
- o.trace = debug;
- qpid::log::Logger::instance().configure(o, "qpid-c++-client");
}
Connection::~Connection(){}
diff --git a/cpp/src/qpid/cluster/ClassifierHandler.cpp b/cpp/src/qpid/cluster/ClassifierHandler.cpp
index 0d0465c89e..1cce126800 100644
--- a/cpp/src/qpid/cluster/ClassifierHandler.cpp
+++ b/cpp/src/qpid/cluster/ClassifierHandler.cpp
@@ -61,6 +61,9 @@ void ClassifierHandler::handle(AMQFrame& frame) {
Chain chosen;
shared_ptr<AMQMethodBody> method =
dynamic_pointer_cast<AMQMethodBody>(frame.getBody());
+ // FIXME aconway 2007-07-05: Need to stop bypassed frames
+ // from overtaking mcast frames.
+ //
if (method)
chosen=map[fullId(method)];
if (chosen)
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 10b1c44f40..b00152cbcd 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -50,9 +50,10 @@ struct ClusterPlugin : public Plugin {
// Only provide to a Broker, and only if the --cluster config is set.
if (broker && !options.clusterName.empty()) {
assert(!cluster); // A process can only belong to one cluster.
- sessions.reset(new SessionManager());
+
+ sessions.reset(new SessionManager(*broker));
cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions));
- sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10:
+ sessions->setClusterSend(cluster);
broker->add(sessions);
}
}
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
index 24f201535d..9f6438cf92 100644
--- a/cpp/src/qpid/cluster/SessionManager.cpp
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -16,17 +16,59 @@
*
*/
+#include "SessionManager.h"
+#include "ClassifierHandler.h"
+
#include "qpid/log/Statement.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQFrame.h"
-#include "SessionManager.h"
-#include "ClassifierHandler.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/broker/BrokerAdapter.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/BrokerChannel.h"
+#include "qpid/framing/ChannelAdapter.h"
namespace qpid {
namespace cluster {
using namespace framing;
using namespace sys;
+using namespace broker;
+
+/** Handler to send frames direct to local broker (bypass correlation etc.) */
+struct BrokerHandler : public FrameHandler, private ChannelAdapter {
+ Connection connection;
+ Channel channel;
+ BrokerAdapter adapter;
+
+ // TODO aconway 2007-07-23: Lots of needless flab here (Channel,
+ // Connection, ChannelAdapter) As these classes are untangled the
+ // flab can be reduced. The real requirements are:
+ // - Dispatch methods direct to broker bypassing all the correlation muck
+ // - Efficiently suppress responses
+ // For the latter we are now using a ChannelAdapter with noop send()
+ // A more efficient solution would be a no-op proxy.
+ //
+ BrokerHandler(Broker& broker) :
+ connection(0, broker),
+ channel(connection, 1, 0),
+ adapter(channel, connection, broker, *this) {}
+
+ void handle(AMQFrame& frame) {
+ AMQMethodBody* body=dynamic_cast<AMQMethodBody*>(frame.body.get());
+ assert(body);
+ body->invoke(adapter, MethodContext()); // TODO aconway 2007-07-24: Remove MethodContext
+ }
+
+ // Dummy methods.
+ virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>){}
+ virtual void handleContent(boost::shared_ptr<AMQContentBody>){}
+ virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>){}
+ virtual bool isOpen() const{ return true; }
+ virtual void handleMethodInContext(shared_ptr<AMQMethodBody>, const MethodContext&){}
+ // No-op send.
+ virtual RequestId send(shared_ptr<AMQBody>, Correlator::Action) { return 0; }
+};
/** Wrap plain AMQFrames in SessionFrames */
struct FrameWrapperHandler : public FrameHandler {
@@ -47,17 +89,15 @@ struct FrameWrapperHandler : public FrameHandler {
SessionFrameHandler::Chain next;
};
-SessionManager::SessionManager() {}
+SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {}
-void SessionManager::update(FrameHandler::Chains& chains)
-{
+void SessionManager::update(FrameHandler::Chains& chains) {
Mutex::ScopedLock l(lock);
// Create a new local session, store local chains.
Uuid uuid(true);
sessions[uuid] = chains;
- // Replace local incoming chain. Build from the back.
- //
+ // Replace local in chain. Build from the back.
// TODO aconway 2007-07-05: Currently mcast wiring, bypass
// everythign else.
assert(clusterSend);
@@ -65,39 +105,26 @@ void SessionManager::update(FrameHandler::Chains& chains)
FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in));
chains.in = classify;
- // FIXME aconway 2007-07-05: Need to stop bypassed frames
- // from overtaking mcast frames.
- //
-
- // Leave outgoing chain unmodified.
+ // Leave out chain unmodified.
// TODO aconway 2007-07-05: Failover will require replication of
// outgoing frames to session replicas.
-
}
void SessionManager::handle(SessionFrame& frame) {
- // Incoming from frame.
- FrameHandler::Chains chains;
+ // Incoming from cluster.
{
Mutex::ScopedLock l(lock);
+ assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming?
SessionMap::iterator i = sessions.find(frame.uuid);
if (i == sessions.end()) {
- QPID_LOG(trace, "Non-local frame cluster: " << frame.frame);
- chains = nonLocal;
+ // Non local method frame, invoke.
+ localBroker->handle(frame.frame);
}
else {
- QPID_LOG(trace, "Local frame from cluster: " << frame.frame);
- chains = i->second;
+ // Local frame, continue on local chain
+ i->second.in->handle(frame.frame);
}
}
- FrameHandler::Chain chain =
- chain = frame.isIncoming ? chains.in : chains.out;
- // TODO aconway 2007-07-11: Should this be assert(chain)
- if (chain)
- chain->handle(frame.frame);
-
- // TODO aconway 2007-07-05: Here's where we should unblock frame
- // dispatch for the channel.
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h
index c23efde18e..77fc71733b 100644
--- a/cpp/src/qpid/cluster/SessionManager.h
+++ b/cpp/src/qpid/cluster/SessionManager.h
@@ -19,25 +19,33 @@
*
*/
-#include "qpid/broker/BrokerChannel.h"
#include "qpid/cluster/SessionFrame.h"
#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
+#include <boost/noncopyable.hpp>
+
#include <map>
namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
namespace cluster {
/**
* Manage sessions and handler chains for the cluster.
*
*/
-class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler
+class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler,
+ private boost::noncopyable
{
public:
- SessionManager();
+ SessionManager(broker::Broker& broker);
/** Set the handler to send to the cluster */
void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; }
@@ -52,12 +60,13 @@ class SessionManager : public framing::HandlerUpdater, public SessionFrameHandle
framing::ChannelId getChannelId(const framing::Uuid&) const;
private:
+ class SessionOperations;
typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap;
sys::Mutex lock;
SessionFrameHandler::Chain clusterSend;
+ framing::FrameHandler::Chain localBroker;
SessionMap sessions;
- framing::FrameHandler::Chains nonLocal;
};
diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h
index 50b1c9ff7e..a7c9c61640 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.h
+++ b/cpp/src/qpid/framing/ChannelAdapter.h
@@ -75,8 +75,8 @@ class ChannelAdapter : protected BodyHandler {
*response to this frame. Ignored if body is not a Request.
*@return If body is a request, the ID assigned else 0.
*/
- RequestId send(shared_ptr<AMQBody> body,
- Correlator::Action action=Correlator::Action());
+ virtual RequestId send(shared_ptr<AMQBody> body,
+ Correlator::Action action=Correlator::Action());
virtual bool isOpen() const = 0;