diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 54 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 13 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/SessionFrame.cpp | 51 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/SessionFrame.h | 71 |
4 files changed, 149 insertions, 40 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e691ad357d..f2d1b75f3f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -19,6 +19,7 @@ #include "Cluster.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" +#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <algorithm> @@ -45,24 +46,11 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -namespace { - -/** We mark the high bit of a frame's channel number to know if it's - * an incoming or outgoing frame when frames arrive via multicast. - */ -bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; } -bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); } -void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; } -void markIncoming(AMQFrame&) { /*noop*/ } -void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; } - -} - struct Cluster::IncomingHandler : public FrameHandler { IncomingHandler(Cluster& c) : cluster(c) {} void handle(AMQFrame& frame) { - markIncoming(frame); - cluster.mcast(frame); + SessionFrame sf(Uuid(true), frame, SessionFrame::IN); + cluster.mcast(sf); } Cluster& cluster; }; @@ -70,18 +58,18 @@ struct Cluster::IncomingHandler : public FrameHandler { struct Cluster::OutgoingHandler : public FrameHandler { OutgoingHandler(Cluster& c) : cluster(c) {} void handle(AMQFrame& frame) { - markOutgoing(frame); - cluster.mcast(frame); + SessionFrame sf(Uuid(true), frame, SessionFrame::OUT); + cluster.mcast(sf); } Cluster& cluster; }; - // TODO aconway 2007-06-28: Right now everything is backed up via // multicast. When we have point-to-point backups the // Incoming/Outgoing handlers must determine where each frame should // be sent: to multicast or only to specific backup(s) via AMQP. + Cluster::Cluster(const std::string& name_, const std::string& url_) : cpg(new Cpg(*this)), name(name_), @@ -114,7 +102,7 @@ Cluster::~Cluster() { } } -void Cluster::mcast(AMQFrame& frame) { +void Cluster::mcast(SessionFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -124,11 +112,9 @@ void Cluster::mcast(AMQFrame& frame) { } void Cluster::notify() { - // TODO aconway 2007-06-25: Use proxy here. - ProtocolVersion version; - AMQFrame frame(version, 0, - make_shared_ptr(new ClusterNotifyBody(version, url))); - mcast(frame); + SessionFrame sf; + sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url))); + mcast(sf); } size_t Cluster::size() const { @@ -136,12 +122,13 @@ size_t Cluster::size() const { return members.size(); } -void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) { +void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) { Mutex::ScopedLock l(lock); - fromChains = chains; + receivedChain = chain; } Cluster::MemberList Cluster::getMembers() const { + // TODO aconway 2007-07-04: use read/write lock? Mutex::ScopedLock l(lock); MemberList result(members.size()); std::transform(members.begin(), members.end(), result.begin(), @@ -159,15 +146,13 @@ void Cluster::deliver( { Id from(nodeid, pid); Buffer buf(static_cast<char*>(msg), msg_len); - AMQFrame frame; + SessionFrame frame; frame.decode(buf); QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); - if (!handleClusterFrame(from, frame)) { - FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : fromChains.out; - unMark(frame); - if (chain) - chain->handle(frame); - } + if (frame.uuid.isNull()) + handleClusterFrame(from, frame.frame); + else + receivedChain->handle(frame); } bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, @@ -179,7 +164,8 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, ; return (predicate(*this)); } - + +// Handle cluster control frame from the null session. bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) { // TODO aconway 2007-06-20: use visitor pattern here. ClusterNotifyBody* notifyIn= diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 199a93a7c5..6ab4cb58df 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -20,6 +20,7 @@ */ #include "qpid/cluster/Cpg.h" +#include "qpid/cluster/SessionFrame.h" #include "qpid/framing/FrameHandler.h" #include "qpid/shared_ptr.h" #include "qpid/sys/Monitor.h" @@ -69,13 +70,13 @@ class Cluster : private sys::Runnable, private Cpg::Handler bool empty() const { return size() == 0; } - /** Get handler chains to send frames to the cluster */ - framing::FrameHandler::Chains getToChains() { + /** Get handler chains to send incoming/outgoing frames to the cluster */ + framing::FrameHandler::Chains getSendChains() { return toChains; } - /** Set handler chains for frames received from the cluster */ - void setFromChains(const framing::FrameHandler::Chains& chains); + /** Set handler for frames received from the cluster */ + void setReceivedChain(const SessionFrameHandler::Chain& chain); /** Wait for predicate(*this) to be true, up to timeout. *@return True if predicate became true, false if timed out. @@ -91,7 +92,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler typedef std::map< framing::ChannelId, framing::FrameHandler::Chains> ChannelMap; - void mcast(framing::AMQFrame&); ///< send frame by multicast. + void mcast(SessionFrame&); ///< send frame by multicast. void notify(); ///< Notify cluster of my details. void deliver( @@ -123,7 +124,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler sys::Thread dispatcher; boost::function<void()> callback; framing::FrameHandler::Chains toChains; - framing::FrameHandler::Chains fromChains; + SessionFrameHandler::Chain receivedChain; struct IncomingHandler; struct OutgoingHandler; diff --git a/cpp/src/qpid/cluster/SessionFrame.cpp b/cpp/src/qpid/cluster/SessionFrame.cpp new file mode 100644 index 0000000000..1a20a5eddc --- /dev/null +++ b/cpp/src/qpid/cluster/SessionFrame.cpp @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionFrame.h" + +#include "qpid/QpidError.h" + +namespace qpid { +namespace cluster { + +void SessionFrame::encode(framing::Buffer& buf) { + uuid.encode(buf); + frame.encode(buf); + buf.putOctet(isIncoming); +} + +void SessionFrame::decode(framing::Buffer& buf) { + uuid.decode(buf); + if (!frame.decode(buf)) + THROW_QPID_ERROR(FRAMING_ERROR, "Incomplete frame"); + isIncoming = buf.getOctet(); +} + +size_t SessionFrame::size() const { + return uuid.size() + frame.size() + 1 /*isIncoming*/; +} + +std::ostream& operator<<(std::ostream& out, const SessionFrame& frame) { + return out << "[session=" << frame.uuid + << (frame.isIncoming ? ",in: ":",out: ") + << frame.frame << "]"; +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionFrame.h b/cpp/src/qpid/cluster/SessionFrame.h new file mode 100644 index 0000000000..12885da7e1 --- /dev/null +++ b/cpp/src/qpid/cluster/SessionFrame.h @@ -0,0 +1,71 @@ +#ifndef QPID_CLUSTER_SESSIONFRAME_H +#define QPID_CLUSTER_SESSIONFRAME_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Handler.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/Uuid.h" + +#include <ostream> + +namespace qpid { + +namespace framing { +class AMQFrame; +class Buffer; +} + +namespace cluster { + +/** + * An AMQFrame with a UUID and direction. + */ +struct SessionFrame +{ + SessionFrame() : isIncoming(false) {} + + SessionFrame(const framing::Uuid& id, const framing::AMQFrame& f, bool incoming) + : uuid(id), frame(f), isIncoming(incoming) {} + + void encode(framing::Buffer&); + + void decode(framing::Buffer&); + + size_t size() const; + + static const bool IN = true; + static const bool OUT = false; + + framing::Uuid uuid; + framing::AMQFrame frame; + bool isIncoming; +}; + +typedef framing::Handler<SessionFrame&> SessionFrameHandler; + +std::ostream& operator<<(std::ostream&, const SessionFrame&); + +}} // namespace qpid::cluster + + + +#endif /*!QPID_CLUSTER_SESSIONFRAME_H*/ |
