summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp148
-rw-r--r--cpp/src/qpid/cluster/Cluster.h23
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp40
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp18
-rw-r--r--cpp/src/qpid/cluster/Cpg.h22
-rw-r--r--cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h46
6 files changed, 182 insertions, 115 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 3007e9b1ab..2727d5af0a 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,7 +17,9 @@
*/
#include "Cluster.h"
+#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
@@ -32,68 +34,49 @@ namespace cluster {
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-using broker::SessionState;
+using broker::Connection;
namespace {
+// FIXME aconway 2008-07-01: sending every frame to cluster,
+// serializing all processing in cluster deliver thread.
+// This will not perform at all, but provides a correct starting point.
+//
+// TODO:
+// - Fake "Connection" for cluster: owns shadow sessions.
+// - Maintain shadow sessions.
+// - Apply foreign frames to shadow sessions.
+//
+
+
// Beginning of inbound chain: send to cluster.
struct ClusterSendHandler : public FrameHandler {
- SessionState& session;
+ Connection& connection;
Cluster& cluster;
- bool busy;
- Monitor lock;
- ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
-
- void handle(AMQFrame& f) {
- Mutex::ScopedLock l(lock);
- assert(!busy);
- // FIXME aconway 2008-01-29: refcount Sessions.
- // session.addRef(); // Keep the session till the message is self delivered.
- cluster.send(f, next); // Indirectly send to next via cluster.
-
- // FIXME aconway 2008-01-29: need to get this blocking out of the loop.
- // But cluster needs to agree on order of side-effects on the shared model.
- // OK for wiring to block, for messages use queue tokens?
- // Both in & out transfers must be orderd per queue.
- // May need out-of-order completion.
- busy=true;
- while (busy) lock.wait();
- }
-};
+ ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {}
-// Next in inbound chain, self delivered from cluster.
-struct ClusterDeliverHandler : public FrameHandler {
- Cluster& cluster;
- ClusterSendHandler& sender;
-
- ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {}
-
void handle(AMQFrame& f) {
- next->handle(f);
- // FIXME aconway 2008-06-16: solve overtaking problem - async completion of commands.
- // Mutex::ScopedLock l(lock);
- // senderBusy=false;
- // senderLock.notify();
+ // FIXME aconway 2008-01-29: Refcount Connections to ensure
+ // Connection not destroyed till message is self delivered.
+ cluster.send(f, &connection, next); // Indirectly send to next via cluster.
}
};
-struct SessionObserver : public broker::SessionManager::Observer {
+struct ConnectionObserver : public broker::ConnectionManager::Observer {
Cluster& cluster;
- SessionObserver(Cluster& c) : cluster(c) {}
+ ConnectionObserver(Cluster& c) : cluster(c) {}
- void opened(SessionState& s) {
+ void created(Connection& c) {
// FIXME aconway 2008-06-16: clean up chaining and observers.
- ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
- ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
- s.getInChain().insert(deliverer);
- s.getOutChain().insert(sender);
+ ClusterSendHandler* sender=new ClusterSendHandler(c, cluster);
+ c.getInChain().insert(sender);
}
};
}
ostream& operator <<(ostream& out, const Cluster& cluster) {
- return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
+ return out << cluster.name.str() << "-" << cluster.self;
}
ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
@@ -106,13 +89,16 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) :
+// FIXME aconway 2008-07-02: create a Connection for the cluster.
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
+ broker(b),
cpg(*this),
name(name_),
url(url_),
- observer(new SessionObserver(*this))
+ observer(new ConnectionObserver(*this)),
+ self(cpg.self())
{
- QPID_LOG(trace, *this << " Joining cluster: " << name_);
+ QPID_LOG(trace, "Joining cluster: " << name_);
cpg.join(name);
notify();
dispatcher=Thread(*this);
@@ -136,19 +122,32 @@ Cluster::~Cluster() {
}
}
-void Cluster::send(AMQFrame& frame, FrameHandler* next) {
- QPID_LOG(trace, *this << " SEND: " << frame);
- char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling.
- Buffer buf(data);
+template <class T> void decodePtr(Buffer& buf, T*& ptr) {
+ uint64_t value = buf.getLongLong();
+ ptr = reinterpret_cast<T*>(value);
+}
+
+template <class T> void encodePtr(Buffer& buf, T* ptr) {
+ uint64_t value = reinterpret_cast<uint64_t>(ptr);
+ buf.putLongLong(value);
+}
+
+void Cluster::send(AMQFrame& frame, void* connection, FrameHandler* next) {
+ QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
+ // TODO aconway 2008-07-03: More efficient buffer management.
+ // Cache coded form of decoded frames for re-encoding?
+ Buffer buf(buffer);
+ assert(frame.size() + 128 < sizeof(buffer));
frame.encode(buf);
- buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer.
- iovec iov = { data, frame.size()+sizeof(next) };
+ encodePtr(buf, connection);
+ encodePtr(buf, next);
+ iovec iov = { buffer, buf.getPosition() };
cpg.mcast(name, &iov, 1);
}
void Cluster::notify() {
AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
- send(frame, 0);
+ send(frame, 0, 0);
}
size_t Cluster::size() const {
@@ -164,6 +163,21 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
+boost::shared_ptr<broker::Connection>
+Cluster::getShadowConnection(const Cpg::Id& member, void* connectionPtr) {
+ // FIXME aconway 2008-07-02: locking - called by deliver in
+ // cluster thread so no locks but may need to revisit as model
+ // changes.
+ ShadowConnectionId id(member, connectionPtr);
+ boost::shared_ptr<broker::Connection>& ptr = shadowConnectionMap[id];
+ if (!ptr) {
+ std::ostringstream os;
+ os << name << ":" << member << ":" << std::hex << connectionPtr;
+ ptr.reset(new broker::Connection(&shadowOut, broker, os.str()));
+ }
+ return ptr;
+}
+
void Cluster::deliver(
cpg_handle_t /*handle*/,
cpg_name* /*group*/,
@@ -172,20 +186,28 @@ void Cluster::deliver(
void* msg,
int msg_len)
{
+ Id from(nodeid, pid);
try {
- Id from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
frame.decode(buf);
- QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
- if (frame.getChannel() == 0)
+ void* connectionId;
+ decodePtr(buf, connectionId);
+
+ QPID_LOG(trace, "DLVR [" << from << " " << connectionId << "] " << frame);
+
+ if (connectionId == 0) // A cluster control frame.
handleClusterFrame(from, frame);
- else if (from == self) {
- FrameHandler* next;
- buf.getRawData((uint8_t*)&next, sizeof(next));
+ else if (from == self) { // My own frame, carries a next pointer.
+ FrameHandler* next;
+ decodePtr(buf, next);
next->handle(frame);
}
- // FIXME aconway 2008-01-30: apply frames from foreign sessions.
+ else { // Foreign frame, forward to shadow connection.
+ // FIXME aconway 2008-07-02: ptr_map instead of shared_ptr.
+ boost::shared_ptr<broker::Connection> shadow = getShadowConnection(from, connectionId);
+ shadow->received(frame);
+ }
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -203,7 +225,7 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
return (predicate(*this));
}
-// Handle cluster control frame from the null session.
+// Handle cluster control frame .
void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
// TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=
@@ -213,10 +235,8 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
{
Mutex::ScopedLock l(lock);
members[from].url=notifyIn->getUrl();
- if (!self.id && notifyIn->getUrl() == url.str())
- self=from;
lock.notifyAll();
- QPID_LOG(trace, *this << ": members joined: " << members);
+ QPID_LOG(debug, "Cluster join: " << members);
}
}
@@ -234,7 +254,7 @@ void Cluster::configChange(
if (nLeft) {
for (int i = 0; i < nLeft; ++i)
members.erase(Id(left[i]));
- QPID_LOG(trace, *this << ": members left: " << members);
+ QPID_LOG(debug, "Cluster leave: " << members);
lock.notifyAll();
}
newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 6cc8dd7f78..031baf914a 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -19,7 +19,8 @@
*
*/
-#include "Cpg.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/ShadowConnectionOutputHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/Monitor.h"
@@ -36,7 +37,8 @@
#include <map>
#include <vector>
-namespace qpid { namespace cluster {
+namespace qpid {
+namespace cluster {
/**
* Connection to the cluster.
@@ -63,7 +65,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
virtual ~Cluster();
// FIXME aconway 2008-01-29:
- boost::intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
+ boost::intrusive_ptr<broker::ConnectionManager::Observer> getObserver() { return observer; }
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -82,11 +84,13 @@ class Cluster : private sys::Runnable, private Cpg::Handler
sys::Duration timeout=sys::TIME_INFINITE) const;
/** Send frame to the cluster */
- void send(framing::AMQFrame&, framing::FrameHandler*);
+ void send(framing::AMQFrame&, void* connection, framing::FrameHandler*);
private:
typedef Cpg::Id Id;
typedef std::map<Id, Member> MemberMap;
+ typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
+ typedef std::map<ShadowConnectionId, boost::shared_ptr<broker::Connection> > ShadowConnectionMap;
void notify(); ///< Notify cluster of my details.
@@ -107,17 +111,24 @@ class Cluster : private sys::Runnable, private Cpg::Handler
);
void run();
+
void handleClusterFrame(Id from, framing::AMQFrame&);
+ boost::shared_ptr<broker::Connection> getShadowConnection(const Cpg::Id&, void*);
+
mutable sys::Monitor lock;
+ broker::Broker& broker;
Cpg cpg;
Cpg::Name name;
Url url;
- Id self;
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
- boost::intrusive_ptr<broker::SessionManager::Observer> observer;
+ boost::intrusive_ptr<broker::ConnectionManager::Observer> observer;
+ Id self;
+ ShadowConnectionMap shadowConnectionMap;
+ ShadowConnectionOutputHandler shadowOut;
+ char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index a638f509c6..10695496bc 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -35,15 +35,6 @@ namespace cluster {
using namespace std;
using broker::Broker;
-struct OptionValues {
- string name;
- string url;
-
- Url getUrl(uint16_t port) const {
- if (url.empty()) return Url::getIpAddressesUrl(port);
- return Url(url);
- }
-};
// Note we update the values in a separate struct.
// This is to work around boost::program_options differences,
@@ -51,43 +42,44 @@ struct OptionValues {
// ones take a copy (or require a shared_ptr)
//
struct ClusterOptions : public Options {
+ std::string name;
+ std::string url;
- ClusterOptions(OptionValues* v) : Options("Cluster Options") {
+ ClusterOptions() : Options("Cluster Options") {
addOptions()
- ("cluster-name", optValue(v->name, "NAME"), "Name of cluster to join")
- ("cluster-url", optValue(v->url,"URL"),
+ ("cluster-name", optValue(name,""), "Cluster identifier")
+ ("cluster-url", optValue(url,"URL"),
"URL of this broker, advertized to the cluster.\n"
- "Defaults to a URL listing all the local IP addresses\n");
+ "Defaults to a URL listing all the local IP addresses\n")
+ ;
}
};
struct ClusterPlugin : public PluginT<Broker> {
- OptionValues values;
+ ClusterOptions options;
boost::optional<Cluster> cluster;
- ClusterPlugin(const OptionValues& v) : values(v) {}
+ ClusterPlugin(const ClusterOptions& opts) : options(opts) {}
- void initializeT(Broker& broker) {
- cluster = boost::in_place(values.name, values.getUrl(broker.getPort()), boost::ref(broker));
- broker.getSessionManager().add(cluster->getObserver());
+ void initializeT(Broker& broker) { // FIXME aconway 2008-07-01: drop T suffix.
+ Url url = options.url.empty() ? Url::getIpAddressesUrl(broker.getPort()) : Url(options.url);
+ cluster = boost::in_place(options.name, url, boost::ref(broker));
+ broker.getConnectionManager().add(cluster->getObserver()); // FIXME aconway 2008-07-01: to Cluster ctor
}
};
struct PluginFactory : public Plugin::FactoryT<Broker> {
- OptionValues values;
ClusterOptions options;
- PluginFactory() : options(&values) {}
-
Options* getOptions() { return &options; }
boost::shared_ptr<Plugin> createT(Broker&) {
- // Only provide to a Broker, and only if the --cluster config is set.
- if (values.name.empty())
+ if (options.name.empty()) { // No cluster name, don't initialize cluster.
return boost::shared_ptr<Plugin>();
+ }
else
- return make_shared_ptr(new ClusterPlugin(values));
+ return make_shared_ptr(new ClusterPlugin(options));
}
};
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 7b8fce4112..7831f66da1 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -144,24 +144,20 @@ std::string Cpg::cantMcastMsg(const Name& group) {
return "Cannot mcast to CPG group "+group.str();
}
+Cpg::Id Cpg::self() const {
+ unsigned int nodeid;
+ check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity");
+ return Id(nodeid, getpid());
+}
+
ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
ostream_iterator<Cpg::Id> i(o, " ");
std::copy(a.first, a.first+a.second, i);
return o;
}
-static int popbyte(uint32_t& n) {
- uint8_t b=n&0xff;
- n>>=8;
- return b;
-}
-
ostream& operator <<(ostream& out, const Cpg::Id& id) {
- uint32_t node=id.nodeId();
- out << popbyte(node);
- for (int i = 0; i < 3; i++)
- out << "." << popbyte(node);
- return out << ":" << id.pid();
+ return out << id.getNodeId() << "-" << id.getPid();
}
ostream& operator <<(ostream& out, const cpg_name& name) {
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index 1ed362f94e..a918fb0cbf 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -22,6 +22,8 @@
#include "qpid/Exception.h"
#include "qpid/cluster/Dispatchable.h"
+#include <boost/tuple/tuple.hpp>
+#include <boost/tuple/tuple_comparison.hpp>
#include <cassert>
#include <string.h>
@@ -55,16 +57,14 @@ class Cpg : public Dispatchable {
std::string str() const { return std::string(value, length); }
};
-
- struct Id {
- uint64_t id;
- Id(uint64_t n=0) : id(n) {}
- Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; }
- Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {}
-
- operator uint64_t() const { return id; }
- uint32_t nodeId() const { return id >> 32; }
- pid_t pid() const { return id & 0xFFFF; }
+
+
+ // boost::tuple gives us == and < for free.
+ struct Id : public boost::tuple<uint32_t, uint32_t> {
+ Id(uint32_t n=0, uint32_t p=0) : boost::tuple<uint32_t, uint32_t>(n, p) {}
+ Id(const cpg_address& addr) : boost::tuple<uint32_t, uint32_t>(addr.nodeid, addr.pid) {}
+ uint32_t getNodeId() const { return boost::get<0>(*this); }
+ uint32_t getPid() const { return boost::get<1>(*this); }
};
static std::string str(const cpg_name& n) {
@@ -131,6 +131,8 @@ class Cpg : public Dispatchable {
cpg_handle_t getHandle() const { return handle; }
+ Id self() const;
+
private:
class Handles;
struct ClearHandleOnExit;
diff --git a/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h b/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
new file mode 100644
index 0000000000..6d429535e6
--- /dev/null
+++ b/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
@@ -0,0 +1,46 @@
+#ifndef QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H
+#define QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/sys/ConnectionOutputHandler.h>
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+/**
+ * Output handler for frames sent to shadow connections.
+ * Simply discards frames.
+ */
+class ShadowConnectionOutputHandler : public sys::ConnectionOutputHandler
+{
+ public:
+ virtual void send(framing::AMQFrame&) {}
+ virtual void close() {}
+ virtual void activateOutput() {}
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H*/