diff options
| author | Alan Conway <aconway@apache.org> | 2007-07-24 19:39:27 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-07-24 19:39:27 +0000 |
| commit | b7c528b027bff7585481c9ce3a01144040c6de5a (patch) | |
| tree | 6e4588e6b52a5a5457767ae9f8b59cddcfd28ef6 /cpp/src/qpid/cluster/SessionManager.cpp | |
| parent | 0dcc71862cb48a79263a05facd4c42453441cbb5 (diff) | |
| download | qpid-python-b7c528b027bff7585481c9ce3a01144040c6de5a.tar.gz | |
* Summary:
- Wiring (declare/delete/bind) is replicated via AIS.
- TestOptions includes all logging options.
- Logger automatically parses env vars so logging can be enabled
for any program linked with libqpidcommon e.g. by setting QPID_TRACE=1.
* src/qpid/cluster/SessionManager.cpp: Handle frames from cluster
- Forward to BrokerAdapter for execution.
- Suppress responses in proxy.
* src/tests/TestOptions.h (Options): Logging options, --help option.
* src/qpid/client/ClientConnection.cpp: Removed log initialization.
Logs are initialized either in TestOptions or automatically from env vars,
e.g. QPID_TRACE,
* src/qpid/QpidError.h (class QpidError): Initialize Exception in
constructor so messages can be logged.
* src/qpid/framing/ChannelAdapter.h: Made send() virtual.
* src/tests/Cluster_child.cpp: UUID corrected.
* src/qpid/broker/Broker.cpp: Pass chains to updater by ref.
* src/qpid/Options.cpp (parse): Fix log settings from environment.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559171 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/SessionManager.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/SessionManager.cpp | 81 |
1 files changed, 54 insertions, 27 deletions
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp index 24f201535d..9f6438cf92 100644 --- a/cpp/src/qpid/cluster/SessionManager.cpp +++ b/cpp/src/qpid/cluster/SessionManager.cpp @@ -16,17 +16,59 @@ * */ +#include "SessionManager.h" +#include "ClassifierHandler.h" + #include "qpid/log/Statement.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQFrame.h" -#include "SessionManager.h" -#include "ClassifierHandler.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/broker/BrokerAdapter.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/BrokerChannel.h" +#include "qpid/framing/ChannelAdapter.h" namespace qpid { namespace cluster { using namespace framing; using namespace sys; +using namespace broker; + +/** Handler to send frames direct to local broker (bypass correlation etc.) */ +struct BrokerHandler : public FrameHandler, private ChannelAdapter { + Connection connection; + Channel channel; + BrokerAdapter adapter; + + // TODO aconway 2007-07-23: Lots of needless flab here (Channel, + // Connection, ChannelAdapter) As these classes are untangled the + // flab can be reduced. The real requirements are: + // - Dispatch methods direct to broker bypassing all the correlation muck + // - Efficiently suppress responses + // For the latter we are now using a ChannelAdapter with noop send() + // A more efficient solution would be a no-op proxy. + // + BrokerHandler(Broker& broker) : + connection(0, broker), + channel(connection, 1, 0), + adapter(channel, connection, broker, *this) {} + + void handle(AMQFrame& frame) { + AMQMethodBody* body=dynamic_cast<AMQMethodBody*>(frame.body.get()); + assert(body); + body->invoke(adapter, MethodContext()); // TODO aconway 2007-07-24: Remove MethodContext + } + + // Dummy methods. + virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>){} + virtual void handleContent(boost::shared_ptr<AMQContentBody>){} + virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>){} + virtual bool isOpen() const{ return true; } + virtual void handleMethodInContext(shared_ptr<AMQMethodBody>, const MethodContext&){} + // No-op send. + virtual RequestId send(shared_ptr<AMQBody>, Correlator::Action) { return 0; } +}; /** Wrap plain AMQFrames in SessionFrames */ struct FrameWrapperHandler : public FrameHandler { @@ -47,17 +89,15 @@ struct FrameWrapperHandler : public FrameHandler { SessionFrameHandler::Chain next; }; -SessionManager::SessionManager() {} +SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {} -void SessionManager::update(FrameHandler::Chains& chains) -{ +void SessionManager::update(FrameHandler::Chains& chains) { Mutex::ScopedLock l(lock); // Create a new local session, store local chains. Uuid uuid(true); sessions[uuid] = chains; - // Replace local incoming chain. Build from the back. - // + // Replace local in chain. Build from the back. // TODO aconway 2007-07-05: Currently mcast wiring, bypass // everythign else. assert(clusterSend); @@ -65,39 +105,26 @@ void SessionManager::update(FrameHandler::Chains& chains) FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in)); chains.in = classify; - // FIXME aconway 2007-07-05: Need to stop bypassed frames - // from overtaking mcast frames. - // - - // Leave outgoing chain unmodified. + // Leave out chain unmodified. // TODO aconway 2007-07-05: Failover will require replication of // outgoing frames to session replicas. - } void SessionManager::handle(SessionFrame& frame) { - // Incoming from frame. - FrameHandler::Chains chains; + // Incoming from cluster. { Mutex::ScopedLock l(lock); + assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming? SessionMap::iterator i = sessions.find(frame.uuid); if (i == sessions.end()) { - QPID_LOG(trace, "Non-local frame cluster: " << frame.frame); - chains = nonLocal; + // Non local method frame, invoke. + localBroker->handle(frame.frame); } else { - QPID_LOG(trace, "Local frame from cluster: " << frame.frame); - chains = i->second; + // Local frame, continue on local chain + i->second.in->handle(frame.frame); } } - FrameHandler::Chain chain = - chain = frame.isIncoming ? chains.in : chains.out; - // TODO aconway 2007-07-11: Should this be assert(chain) - if (chain) - chain->handle(frame.frame); - - // TODO aconway 2007-07-05: Here's where we should unblock frame - // dispatch for the channel. } }} // namespace qpid::cluster |
