diff options
| author | Alan Conway <aconway@apache.org> | 2007-07-27 22:08:51 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-07-27 22:08:51 +0000 |
| commit | 956a72f00b64928a601ea2891789a53271fc7571 (patch) | |
| tree | c4469485822787d4742b06d35e23df007f399ed9 /cpp/src/qpid/cluster | |
| parent | ac669123004b6e78468cc4fcea3ffb4b9d7b7bd3 (diff) | |
| download | qpid-python-956a72f00b64928a601ea2891789a53271fc7571.tar.gz | |
* src/tests/ais_check, cluster.mk: Run AIS tests only if:
- CLUSTER makefile conditional set by configure.
- Effective gid == ais
- aisexec is running
Otherwise print a warning.
* src/tests/EventChannelConnectionTest.cpp
* src/qpid/cluster/doxygen_overview.h
Removed unused files.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560404 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 13 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/SessionFrame.cpp | 51 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/SessionFrame.h | 71 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/SessionManager.cpp | 55 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/SessionManager.h | 15 |
6 files changed, 38 insertions, 186 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index b59bfe878d..52d8691f33 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -19,7 +19,6 @@ #include "Cluster.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" -#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <algorithm> @@ -52,9 +51,9 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { Cluster::Cluster( const std::string& name_, const std::string& url_, - const SessionFrameHandler::Chain& next + const FrameHandler::Chain& next ) : - SessionFrameHandler(next), + FrameHandler(next), cpg(new Cpg(*this)), name(name_), url(url_), @@ -85,7 +84,7 @@ Cluster::~Cluster() { } } -void Cluster::handle(SessionFrame& frame) { +void Cluster::handle(AMQFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -95,9 +94,9 @@ void Cluster::handle(SessionFrame& frame) { } void Cluster::notify() { - SessionFrame sf; - sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url))); - handle(sf); + AMQFrame frame(ProtocolVersion(), 0, + new ClusterNotifyBody(ProtocolVersion(), url)); + handle(frame); } size_t Cluster::size() const { @@ -125,11 +124,11 @@ void Cluster::deliver( assert(name == *group); Id from(nodeid, pid); Buffer buf(static_cast<char*>(msg), msg_len); - SessionFrame frame; + AMQFrame frame; frame.decode(buf); QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); - if (frame.uuid.isNull()) - handleClusterFrame(from, frame.frame); + if (frame.getChannel() == 0) + handleClusterFrame(from, frame); else next->handle(frame); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index f6afe14c62..b9cd3b73d1 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -20,7 +20,6 @@ */ #include "qpid/cluster/Cpg.h" -#include "qpid/cluster/SessionFrame.h" #include "qpid/framing/FrameHandler.h" #include "qpid/shared_ptr.h" #include "qpid/sys/Monitor.h" @@ -40,13 +39,13 @@ namespace qpid { namespace cluster { * Connection to the cluster. Maintains cluster membership * data. * - * As SessionFrameHandler, handles frames by sending them to the - * cluster, sends frames received from the cluster to the next - * SessionFrameHandler. + * As FrameHandler, handles frames by sending them to the + * cluster. Frames received from the cluster are sent to the next + * FrameHandler in the chain. * * */ -class Cluster : public SessionFrameHandler, +class Cluster : public framing::FrameHandler, private sys::Runnable, private Cpg::Handler { public: @@ -66,7 +65,7 @@ class Cluster : public SessionFrameHandler, * @param handler for frames received from the cluster. */ Cluster(const std::string& name, const std::string& url, - const SessionFrameHandler::Chain& next); + const framing::FrameHandler::Chain& next); virtual ~Cluster(); @@ -87,7 +86,7 @@ class Cluster : public SessionFrameHandler, sys::Duration timeout=sys::TIME_INFINITE) const; /** Send frame to the cluster */ - void handle(SessionFrame&); + void handle(framing::AMQFrame&); private: typedef Cpg::Id Id; diff --git a/cpp/src/qpid/cluster/SessionFrame.cpp b/cpp/src/qpid/cluster/SessionFrame.cpp deleted file mode 100644 index 1a20a5eddc..0000000000 --- a/cpp/src/qpid/cluster/SessionFrame.cpp +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "SessionFrame.h" - -#include "qpid/QpidError.h" - -namespace qpid { -namespace cluster { - -void SessionFrame::encode(framing::Buffer& buf) { - uuid.encode(buf); - frame.encode(buf); - buf.putOctet(isIncoming); -} - -void SessionFrame::decode(framing::Buffer& buf) { - uuid.decode(buf); - if (!frame.decode(buf)) - THROW_QPID_ERROR(FRAMING_ERROR, "Incomplete frame"); - isIncoming = buf.getOctet(); -} - -size_t SessionFrame::size() const { - return uuid.size() + frame.size() + 1 /*isIncoming*/; -} - -std::ostream& operator<<(std::ostream& out, const SessionFrame& frame) { - return out << "[session=" << frame.uuid - << (frame.isIncoming ? ",in: ":",out: ") - << frame.frame << "]"; -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionFrame.h b/cpp/src/qpid/cluster/SessionFrame.h deleted file mode 100644 index 12885da7e1..0000000000 --- a/cpp/src/qpid/cluster/SessionFrame.h +++ /dev/null @@ -1,71 +0,0 @@ -#ifndef QPID_CLUSTER_SESSIONFRAME_H -#define QPID_CLUSTER_SESSIONFRAME_H - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/Handler.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/Uuid.h" - -#include <ostream> - -namespace qpid { - -namespace framing { -class AMQFrame; -class Buffer; -} - -namespace cluster { - -/** - * An AMQFrame with a UUID and direction. - */ -struct SessionFrame -{ - SessionFrame() : isIncoming(false) {} - - SessionFrame(const framing::Uuid& id, const framing::AMQFrame& f, bool incoming) - : uuid(id), frame(f), isIncoming(incoming) {} - - void encode(framing::Buffer&); - - void decode(framing::Buffer&); - - size_t size() const; - - static const bool IN = true; - static const bool OUT = false; - - framing::Uuid uuid; - framing::AMQFrame frame; - bool isIncoming; -}; - -typedef framing::Handler<SessionFrame&> SessionFrameHandler; - -std::ostream& operator<<(std::ostream&, const SessionFrame&); - -}} // namespace qpid::cluster - - - -#endif /*!QPID_CLUSTER_SESSIONFRAME_H*/ diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp index 88ddfe843f..c9e79b4bbc 100644 --- a/cpp/src/qpid/cluster/SessionManager.cpp +++ b/cpp/src/qpid/cluster/SessionManager.cpp @@ -74,61 +74,38 @@ using namespace broker; virtual void redeliver(Message::shared_ptr&, DeliveryToken::shared_ptr, DeliveryId) {} }; -/** Wrap plain AMQFrames in SessionFrames */ -struct FrameWrapperHandler : public FrameHandler { - - FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain next_) - : uuid(id), direction(dir), next(next_) { - assert(!uuid.isNull()); - } - - void handle(AMQFrame& frame) { - SessionFrame sf(uuid, frame, direction); - assert(next); - next->handle(sf); - } - - Uuid uuid; - bool direction; - SessionFrameHandler::Chain next; -}; - SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {} -void SessionManager::update(FrameHandler::Chains& chains) { +void SessionManager::update(ChannelId channel, FrameHandler::Chains& chains) { Mutex::ScopedLock l(lock); // Create a new local session, store local chains. - Uuid uuid(true); - sessions[uuid] = chains; + sessions[channel] = chains; - // Replace local in chain. Build from the back. - // TODO aconway 2007-07-05: Currently mcast wiring, bypass - // everythign else. + // Replace local "in" chain to mcast wiring and process other frames + // as normal. assert(clusterSend); - FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, clusterSend)); - FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in)); - chains.in = classify; - - // Leave out chain unmodified. - // TODO aconway 2007-07-05: Failover will require replication of - // outgoing frames to session replicas. + chains.in = make_shared_ptr( + new ClassifierHandler(clusterSend, chains.in)); } -void SessionManager::handle(SessionFrame& frame) { +void SessionManager::handle(AMQFrame& frame) { // Incoming from cluster. { Mutex::ScopedLock l(lock); - assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming? - SessionMap::iterator i = sessions.find(frame.uuid); + SessionMap::iterator i = sessions.find(frame.getChannel()); if (i == sessions.end()) { - // Non local method frame, invoke. - localBroker->handle(frame.frame); + // Non-local wiring method frame, invoke locally. + localBroker->handle(frame); } else { - // Local frame, continue on local chain - i->second.in->handle(frame.frame); + // Local frame continuing on local chain + i->second.in->handle(frame); } } } +void SessionManager::setClusterSend(const FrameHandler::Chain& send) { + clusterSend=send; +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h index 77fc71733b..fc43d6e653 100644 --- a/cpp/src/qpid/cluster/SessionManager.h +++ b/cpp/src/qpid/cluster/SessionManager.h @@ -19,7 +19,6 @@ * */ -#include "qpid/cluster/SessionFrame.h" #include "qpid/framing/HandlerUpdater.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/Uuid.h" @@ -41,30 +40,30 @@ namespace cluster { * Manage sessions and handler chains for the cluster. * */ -class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler, +class SessionManager : public framing::HandlerUpdater, public framing::FrameHandler, private boost::noncopyable { public: SessionManager(broker::Broker& broker); /** Set the handler to send to the cluster */ - void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; } + void setClusterSend(const framing::FrameHandler::Chain& send); /** As ChannelUpdater update the handler chains. */ - void update(framing::FrameHandler::Chains& chains); + void update(framing::ChannelId, framing::FrameHandler::Chains&); - /** As SessionFrameHandler handle frames received from the cluster */ - void handle(SessionFrame&); + /** As FrameHandler frames received from the cluster */ + void handle(framing::AMQFrame&); /** Get ChannelID for UUID. Return 0 if no mapping */ framing::ChannelId getChannelId(const framing::Uuid&) const; private: class SessionOperations; - typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap; + typedef std::map<framing::ChannelId,framing::FrameHandler::Chains> SessionMap; sys::Mutex lock; - SessionFrameHandler::Chain clusterSend; + framing::FrameHandler::Chain clusterSend; framing::FrameHandler::Chain localBroker; SessionMap sessions; }; |
