summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp54
-rw-r--r--cpp/src/qpid/cluster/Cluster.h13
-rw-r--r--cpp/src/qpid/cluster/SessionFrame.cpp51
-rw-r--r--cpp/src/qpid/cluster/SessionFrame.h71
4 files changed, 149 insertions, 40 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e691ad357d..f2d1b75f3f 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -19,6 +19,7 @@
#include "Cluster.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <algorithm>
@@ -45,24 +46,11 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-namespace {
-
-/** We mark the high bit of a frame's channel number to know if it's
- * an incoming or outgoing frame when frames arrive via multicast.
- */
-bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; }
-bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); }
-void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; }
-void markIncoming(AMQFrame&) { /*noop*/ }
-void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; }
-
-}
-
struct Cluster::IncomingHandler : public FrameHandler {
IncomingHandler(Cluster& c) : cluster(c) {}
void handle(AMQFrame& frame) {
- markIncoming(frame);
- cluster.mcast(frame);
+ SessionFrame sf(Uuid(true), frame, SessionFrame::IN);
+ cluster.mcast(sf);
}
Cluster& cluster;
};
@@ -70,18 +58,18 @@ struct Cluster::IncomingHandler : public FrameHandler {
struct Cluster::OutgoingHandler : public FrameHandler {
OutgoingHandler(Cluster& c) : cluster(c) {}
void handle(AMQFrame& frame) {
- markOutgoing(frame);
- cluster.mcast(frame);
+ SessionFrame sf(Uuid(true), frame, SessionFrame::OUT);
+ cluster.mcast(sf);
}
Cluster& cluster;
};
-
// TODO aconway 2007-06-28: Right now everything is backed up via
// multicast. When we have point-to-point backups the
// Incoming/Outgoing handlers must determine where each frame should
// be sent: to multicast or only to specific backup(s) via AMQP.
+
Cluster::Cluster(const std::string& name_, const std::string& url_) :
cpg(new Cpg(*this)),
name(name_),
@@ -114,7 +102,7 @@ Cluster::~Cluster() {
}
}
-void Cluster::mcast(AMQFrame& frame) {
+void Cluster::mcast(SessionFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -124,11 +112,9 @@ void Cluster::mcast(AMQFrame& frame) {
}
void Cluster::notify() {
- // TODO aconway 2007-06-25: Use proxy here.
- ProtocolVersion version;
- AMQFrame frame(version, 0,
- make_shared_ptr(new ClusterNotifyBody(version, url)));
- mcast(frame);
+ SessionFrame sf;
+ sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url)));
+ mcast(sf);
}
size_t Cluster::size() const {
@@ -136,12 +122,13 @@ size_t Cluster::size() const {
return members.size();
}
-void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) {
+void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) {
Mutex::ScopedLock l(lock);
- fromChains = chains;
+ receivedChain = chain;
}
Cluster::MemberList Cluster::getMembers() const {
+ // TODO aconway 2007-07-04: use read/write lock?
Mutex::ScopedLock l(lock);
MemberList result(members.size());
std::transform(members.begin(), members.end(), result.begin(),
@@ -159,15 +146,13 @@ void Cluster::deliver(
{
Id from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
- AMQFrame frame;
+ SessionFrame frame;
frame.decode(buf);
QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
- if (!handleClusterFrame(from, frame)) {
- FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : fromChains.out;
- unMark(frame);
- if (chain)
- chain->handle(frame);
- }
+ if (frame.uuid.isNull())
+ handleClusterFrame(from, frame.frame);
+ else
+ receivedChain->handle(frame);
}
bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
@@ -179,7 +164,8 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
;
return (predicate(*this));
}
-
+
+// Handle cluster control frame from the null session.
bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
// TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 199a93a7c5..6ab4cb58df 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -20,6 +20,7 @@
*/
#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/SessionFrame.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/shared_ptr.h"
#include "qpid/sys/Monitor.h"
@@ -69,13 +70,13 @@ class Cluster : private sys::Runnable, private Cpg::Handler
bool empty() const { return size() == 0; }
- /** Get handler chains to send frames to the cluster */
- framing::FrameHandler::Chains getToChains() {
+ /** Get handler chains to send incoming/outgoing frames to the cluster */
+ framing::FrameHandler::Chains getSendChains() {
return toChains;
}
- /** Set handler chains for frames received from the cluster */
- void setFromChains(const framing::FrameHandler::Chains& chains);
+ /** Set handler for frames received from the cluster */
+ void setReceivedChain(const SessionFrameHandler::Chain& chain);
/** Wait for predicate(*this) to be true, up to timeout.
*@return True if predicate became true, false if timed out.
@@ -91,7 +92,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
typedef std::map<
framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
- void mcast(framing::AMQFrame&); ///< send frame by multicast.
+ void mcast(SessionFrame&); ///< send frame by multicast.
void notify(); ///< Notify cluster of my details.
void deliver(
@@ -123,7 +124,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
sys::Thread dispatcher;
boost::function<void()> callback;
framing::FrameHandler::Chains toChains;
- framing::FrameHandler::Chains fromChains;
+ SessionFrameHandler::Chain receivedChain;
struct IncomingHandler;
struct OutgoingHandler;
diff --git a/cpp/src/qpid/cluster/SessionFrame.cpp b/cpp/src/qpid/cluster/SessionFrame.cpp
new file mode 100644
index 0000000000..1a20a5eddc
--- /dev/null
+++ b/cpp/src/qpid/cluster/SessionFrame.cpp
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SessionFrame.h"
+
+#include "qpid/QpidError.h"
+
+namespace qpid {
+namespace cluster {
+
+void SessionFrame::encode(framing::Buffer& buf) {
+ uuid.encode(buf);
+ frame.encode(buf);
+ buf.putOctet(isIncoming);
+}
+
+void SessionFrame::decode(framing::Buffer& buf) {
+ uuid.decode(buf);
+ if (!frame.decode(buf))
+ THROW_QPID_ERROR(FRAMING_ERROR, "Incomplete frame");
+ isIncoming = buf.getOctet();
+}
+
+size_t SessionFrame::size() const {
+ return uuid.size() + frame.size() + 1 /*isIncoming*/;
+}
+
+std::ostream& operator<<(std::ostream& out, const SessionFrame& frame) {
+ return out << "[session=" << frame.uuid
+ << (frame.isIncoming ? ",in: ":",out: ")
+ << frame.frame << "]";
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionFrame.h b/cpp/src/qpid/cluster/SessionFrame.h
new file mode 100644
index 0000000000..12885da7e1
--- /dev/null
+++ b/cpp/src/qpid/cluster/SessionFrame.h
@@ -0,0 +1,71 @@
+#ifndef QPID_CLUSTER_SESSIONFRAME_H
+#define QPID_CLUSTER_SESSIONFRAME_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Handler.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Uuid.h"
+
+#include <ostream>
+
+namespace qpid {
+
+namespace framing {
+class AMQFrame;
+class Buffer;
+}
+
+namespace cluster {
+
+/**
+ * An AMQFrame with a UUID and direction.
+ */
+struct SessionFrame
+{
+ SessionFrame() : isIncoming(false) {}
+
+ SessionFrame(const framing::Uuid& id, const framing::AMQFrame& f, bool incoming)
+ : uuid(id), frame(f), isIncoming(incoming) {}
+
+ void encode(framing::Buffer&);
+
+ void decode(framing::Buffer&);
+
+ size_t size() const;
+
+ static const bool IN = true;
+ static const bool OUT = false;
+
+ framing::Uuid uuid;
+ framing::AMQFrame frame;
+ bool isIncoming;
+};
+
+typedef framing::Handler<SessionFrame&> SessionFrameHandler;
+
+std::ostream& operator<<(std::ostream&, const SessionFrame&);
+
+}} // namespace qpid::cluster
+
+
+
+#endif /*!QPID_CLUSTER_SESSIONFRAME_H*/