summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-24 19:39:27 +0000
committerAlan Conway <aconway@apache.org>2007-07-24 19:39:27 +0000
commitb7c528b027bff7585481c9ce3a01144040c6de5a (patch)
tree6e4588e6b52a5a5457767ae9f8b59cddcfd28ef6 /cpp/src/qpid/cluster
parent0dcc71862cb48a79263a05facd4c42453441cbb5 (diff)
downloadqpid-python-b7c528b027bff7585481c9ce3a01144040c6de5a.tar.gz
* Summary:
- Wiring (declare/delete/bind) is replicated via AIS. - TestOptions includes all logging options. - Logger automatically parses env vars so logging can be enabled for any program linked with libqpidcommon e.g. by setting QPID_TRACE=1. * src/qpid/cluster/SessionManager.cpp: Handle frames from cluster - Forward to BrokerAdapter for execution. - Suppress responses in proxy. * src/tests/TestOptions.h (Options): Logging options, --help option. * src/qpid/client/ClientConnection.cpp: Removed log initialization. Logs are initialized either in TestOptions or automatically from env vars, e.g. QPID_TRACE, * src/qpid/QpidError.h (class QpidError): Initialize Exception in constructor so messages can be logged. * src/qpid/framing/ChannelAdapter.h: Made send() virtual. * src/tests/Cluster_child.cpp: UUID corrected. * src/qpid/broker/Broker.cpp: Pass chains to updater by ref. * src/qpid/Options.cpp (parse): Fix log settings from environment. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559171 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/ClassifierHandler.cpp3
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp5
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp81
-rw-r--r--cpp/src/qpid/cluster/SessionManager.h17
4 files changed, 73 insertions, 33 deletions
diff --git a/cpp/src/qpid/cluster/ClassifierHandler.cpp b/cpp/src/qpid/cluster/ClassifierHandler.cpp
index 0d0465c89e..1cce126800 100644
--- a/cpp/src/qpid/cluster/ClassifierHandler.cpp
+++ b/cpp/src/qpid/cluster/ClassifierHandler.cpp
@@ -61,6 +61,9 @@ void ClassifierHandler::handle(AMQFrame& frame) {
Chain chosen;
shared_ptr<AMQMethodBody> method =
dynamic_pointer_cast<AMQMethodBody>(frame.getBody());
+ // FIXME aconway 2007-07-05: Need to stop bypassed frames
+ // from overtaking mcast frames.
+ //
if (method)
chosen=map[fullId(method)];
if (chosen)
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 10b1c44f40..b00152cbcd 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -50,9 +50,10 @@ struct ClusterPlugin : public Plugin {
// Only provide to a Broker, and only if the --cluster config is set.
if (broker && !options.clusterName.empty()) {
assert(!cluster); // A process can only belong to one cluster.
- sessions.reset(new SessionManager());
+
+ sessions.reset(new SessionManager(*broker));
cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions));
- sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10:
+ sessions->setClusterSend(cluster);
broker->add(sessions);
}
}
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
index 24f201535d..9f6438cf92 100644
--- a/cpp/src/qpid/cluster/SessionManager.cpp
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -16,17 +16,59 @@
*
*/
+#include "SessionManager.h"
+#include "ClassifierHandler.h"
+
#include "qpid/log/Statement.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQFrame.h"
-#include "SessionManager.h"
-#include "ClassifierHandler.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/broker/BrokerAdapter.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/BrokerChannel.h"
+#include "qpid/framing/ChannelAdapter.h"
namespace qpid {
namespace cluster {
using namespace framing;
using namespace sys;
+using namespace broker;
+
+/** Handler to send frames direct to local broker (bypass correlation etc.) */
+struct BrokerHandler : public FrameHandler, private ChannelAdapter {
+ Connection connection;
+ Channel channel;
+ BrokerAdapter adapter;
+
+ // TODO aconway 2007-07-23: Lots of needless flab here (Channel,
+ // Connection, ChannelAdapter) As these classes are untangled the
+ // flab can be reduced. The real requirements are:
+ // - Dispatch methods direct to broker bypassing all the correlation muck
+ // - Efficiently suppress responses
+ // For the latter we are now using a ChannelAdapter with noop send()
+ // A more efficient solution would be a no-op proxy.
+ //
+ BrokerHandler(Broker& broker) :
+ connection(0, broker),
+ channel(connection, 1, 0),
+ adapter(channel, connection, broker, *this) {}
+
+ void handle(AMQFrame& frame) {
+ AMQMethodBody* body=dynamic_cast<AMQMethodBody*>(frame.body.get());
+ assert(body);
+ body->invoke(adapter, MethodContext()); // TODO aconway 2007-07-24: Remove MethodContext
+ }
+
+ // Dummy methods.
+ virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>){}
+ virtual void handleContent(boost::shared_ptr<AMQContentBody>){}
+ virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>){}
+ virtual bool isOpen() const{ return true; }
+ virtual void handleMethodInContext(shared_ptr<AMQMethodBody>, const MethodContext&){}
+ // No-op send.
+ virtual RequestId send(shared_ptr<AMQBody>, Correlator::Action) { return 0; }
+};
/** Wrap plain AMQFrames in SessionFrames */
struct FrameWrapperHandler : public FrameHandler {
@@ -47,17 +89,15 @@ struct FrameWrapperHandler : public FrameHandler {
SessionFrameHandler::Chain next;
};
-SessionManager::SessionManager() {}
+SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {}
-void SessionManager::update(FrameHandler::Chains& chains)
-{
+void SessionManager::update(FrameHandler::Chains& chains) {
Mutex::ScopedLock l(lock);
// Create a new local session, store local chains.
Uuid uuid(true);
sessions[uuid] = chains;
- // Replace local incoming chain. Build from the back.
- //
+ // Replace local in chain. Build from the back.
// TODO aconway 2007-07-05: Currently mcast wiring, bypass
// everythign else.
assert(clusterSend);
@@ -65,39 +105,26 @@ void SessionManager::update(FrameHandler::Chains& chains)
FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in));
chains.in = classify;
- // FIXME aconway 2007-07-05: Need to stop bypassed frames
- // from overtaking mcast frames.
- //
-
- // Leave outgoing chain unmodified.
+ // Leave out chain unmodified.
// TODO aconway 2007-07-05: Failover will require replication of
// outgoing frames to session replicas.
-
}
void SessionManager::handle(SessionFrame& frame) {
- // Incoming from frame.
- FrameHandler::Chains chains;
+ // Incoming from cluster.
{
Mutex::ScopedLock l(lock);
+ assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming?
SessionMap::iterator i = sessions.find(frame.uuid);
if (i == sessions.end()) {
- QPID_LOG(trace, "Non-local frame cluster: " << frame.frame);
- chains = nonLocal;
+ // Non local method frame, invoke.
+ localBroker->handle(frame.frame);
}
else {
- QPID_LOG(trace, "Local frame from cluster: " << frame.frame);
- chains = i->second;
+ // Local frame, continue on local chain
+ i->second.in->handle(frame.frame);
}
}
- FrameHandler::Chain chain =
- chain = frame.isIncoming ? chains.in : chains.out;
- // TODO aconway 2007-07-11: Should this be assert(chain)
- if (chain)
- chain->handle(frame.frame);
-
- // TODO aconway 2007-07-05: Here's where we should unblock frame
- // dispatch for the channel.
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h
index c23efde18e..77fc71733b 100644
--- a/cpp/src/qpid/cluster/SessionManager.h
+++ b/cpp/src/qpid/cluster/SessionManager.h
@@ -19,25 +19,33 @@
*
*/
-#include "qpid/broker/BrokerChannel.h"
#include "qpid/cluster/SessionFrame.h"
#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
+#include <boost/noncopyable.hpp>
+
#include <map>
namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
namespace cluster {
/**
* Manage sessions and handler chains for the cluster.
*
*/
-class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler
+class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler,
+ private boost::noncopyable
{
public:
- SessionManager();
+ SessionManager(broker::Broker& broker);
/** Set the handler to send to the cluster */
void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; }
@@ -52,12 +60,13 @@ class SessionManager : public framing::HandlerUpdater, public SessionFrameHandle
framing::ChannelId getChannelId(const framing::Uuid&) const;
private:
+ class SessionOperations;
typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap;
sys::Mutex lock;
SessionFrameHandler::Chain clusterSend;
+ framing::FrameHandler::Chain localBroker;
SessionMap sessions;
- framing::FrameHandler::Chains nonLocal;
};