summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/SessionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/SessionManager.cpp')
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp81
1 files changed, 54 insertions, 27 deletions
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
index 24f201535d..9f6438cf92 100644
--- a/cpp/src/qpid/cluster/SessionManager.cpp
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -16,17 +16,59 @@
*
*/
+#include "SessionManager.h"
+#include "ClassifierHandler.h"
+
#include "qpid/log/Statement.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQFrame.h"
-#include "SessionManager.h"
-#include "ClassifierHandler.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/broker/BrokerAdapter.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/BrokerChannel.h"
+#include "qpid/framing/ChannelAdapter.h"
namespace qpid {
namespace cluster {
using namespace framing;
using namespace sys;
+using namespace broker;
+
+/** Handler to send frames direct to local broker (bypass correlation etc.) */
+struct BrokerHandler : public FrameHandler, private ChannelAdapter {
+ Connection connection;
+ Channel channel;
+ BrokerAdapter adapter;
+
+ // TODO aconway 2007-07-23: Lots of needless flab here (Channel,
+ // Connection, ChannelAdapter) As these classes are untangled the
+ // flab can be reduced. The real requirements are:
+ // - Dispatch methods direct to broker bypassing all the correlation muck
+ // - Efficiently suppress responses
+ // For the latter we are now using a ChannelAdapter with noop send()
+ // A more efficient solution would be a no-op proxy.
+ //
+ BrokerHandler(Broker& broker) :
+ connection(0, broker),
+ channel(connection, 1, 0),
+ adapter(channel, connection, broker, *this) {}
+
+ void handle(AMQFrame& frame) {
+ AMQMethodBody* body=dynamic_cast<AMQMethodBody*>(frame.body.get());
+ assert(body);
+ body->invoke(adapter, MethodContext()); // TODO aconway 2007-07-24: Remove MethodContext
+ }
+
+ // Dummy methods.
+ virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>){}
+ virtual void handleContent(boost::shared_ptr<AMQContentBody>){}
+ virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>){}
+ virtual bool isOpen() const{ return true; }
+ virtual void handleMethodInContext(shared_ptr<AMQMethodBody>, const MethodContext&){}
+ // No-op send.
+ virtual RequestId send(shared_ptr<AMQBody>, Correlator::Action) { return 0; }
+};
/** Wrap plain AMQFrames in SessionFrames */
struct FrameWrapperHandler : public FrameHandler {
@@ -47,17 +89,15 @@ struct FrameWrapperHandler : public FrameHandler {
SessionFrameHandler::Chain next;
};
-SessionManager::SessionManager() {}
+SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {}
-void SessionManager::update(FrameHandler::Chains& chains)
-{
+void SessionManager::update(FrameHandler::Chains& chains) {
Mutex::ScopedLock l(lock);
// Create a new local session, store local chains.
Uuid uuid(true);
sessions[uuid] = chains;
- // Replace local incoming chain. Build from the back.
- //
+ // Replace local in chain. Build from the back.
// TODO aconway 2007-07-05: Currently mcast wiring, bypass
// everythign else.
assert(clusterSend);
@@ -65,39 +105,26 @@ void SessionManager::update(FrameHandler::Chains& chains)
FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in));
chains.in = classify;
- // FIXME aconway 2007-07-05: Need to stop bypassed frames
- // from overtaking mcast frames.
- //
-
- // Leave outgoing chain unmodified.
+ // Leave out chain unmodified.
// TODO aconway 2007-07-05: Failover will require replication of
// outgoing frames to session replicas.
-
}
void SessionManager::handle(SessionFrame& frame) {
- // Incoming from frame.
- FrameHandler::Chains chains;
+ // Incoming from cluster.
{
Mutex::ScopedLock l(lock);
+ assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming?
SessionMap::iterator i = sessions.find(frame.uuid);
if (i == sessions.end()) {
- QPID_LOG(trace, "Non-local frame cluster: " << frame.frame);
- chains = nonLocal;
+ // Non local method frame, invoke.
+ localBroker->handle(frame.frame);
}
else {
- QPID_LOG(trace, "Local frame from cluster: " << frame.frame);
- chains = i->second;
+ // Local frame, continue on local chain
+ i->second.in->handle(frame.frame);
}
}
- FrameHandler::Chain chain =
- chain = frame.isIncoming ? chains.in : chains.out;
- // TODO aconway 2007-07-11: Should this be assert(chain)
- if (chain)
- chain->handle(frame.frame);
-
- // TODO aconway 2007-07-05: Here's where we should unblock frame
- // dispatch for the channel.
}
}} // namespace qpid::cluster