diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 148 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 23 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 40 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 22 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h | 46 |
6 files changed, 182 insertions, 115 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 3007e9b1ab..2727d5af0a 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -17,7 +17,9 @@ */ #include "Cluster.h" +#include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" +#include "qpid/broker/Connection.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" @@ -32,68 +34,49 @@ namespace cluster { using namespace qpid::framing; using namespace qpid::sys; using namespace std; -using broker::SessionState; +using broker::Connection; namespace { +// FIXME aconway 2008-07-01: sending every frame to cluster, +// serializing all processing in cluster deliver thread. +// This will not perform at all, but provides a correct starting point. +// +// TODO: +// - Fake "Connection" for cluster: owns shadow sessions. +// - Maintain shadow sessions. +// - Apply foreign frames to shadow sessions. +// + + // Beginning of inbound chain: send to cluster. struct ClusterSendHandler : public FrameHandler { - SessionState& session; + Connection& connection; Cluster& cluster; - bool busy; - Monitor lock; - ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} - - void handle(AMQFrame& f) { - Mutex::ScopedLock l(lock); - assert(!busy); - // FIXME aconway 2008-01-29: refcount Sessions. - // session.addRef(); // Keep the session till the message is self delivered. - cluster.send(f, next); // Indirectly send to next via cluster. - - // FIXME aconway 2008-01-29: need to get this blocking out of the loop. - // But cluster needs to agree on order of side-effects on the shared model. - // OK for wiring to block, for messages use queue tokens? - // Both in & out transfers must be orderd per queue. - // May need out-of-order completion. - busy=true; - while (busy) lock.wait(); - } -}; + ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {} -// Next in inbound chain, self delivered from cluster. -struct ClusterDeliverHandler : public FrameHandler { - Cluster& cluster; - ClusterSendHandler& sender; - - ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {} - void handle(AMQFrame& f) { - next->handle(f); - // FIXME aconway 2008-06-16: solve overtaking problem - async completion of commands. - // Mutex::ScopedLock l(lock); - // senderBusy=false; - // senderLock.notify(); + // FIXME aconway 2008-01-29: Refcount Connections to ensure + // Connection not destroyed till message is self delivered. + cluster.send(f, &connection, next); // Indirectly send to next via cluster. } }; -struct SessionObserver : public broker::SessionManager::Observer { +struct ConnectionObserver : public broker::ConnectionManager::Observer { Cluster& cluster; - SessionObserver(Cluster& c) : cluster(c) {} + ConnectionObserver(Cluster& c) : cluster(c) {} - void opened(SessionState& s) { + void created(Connection& c) { // FIXME aconway 2008-06-16: clean up chaining and observers. - ClusterSendHandler* sender=new ClusterSendHandler(s, cluster); - ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster); - s.getInChain().insert(deliverer); - s.getOutChain().insert(sender); + ClusterSendHandler* sender=new ClusterSendHandler(c, cluster); + c.getInChain().insert(sender); } }; } ostream& operator <<(ostream& out, const Cluster& cluster) { - return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]"; + return out << cluster.name.str() << "-" << cluster.self; } ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) { @@ -106,13 +89,16 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) : +// FIXME aconway 2008-07-02: create a Connection for the cluster. +Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : + broker(b), cpg(*this), name(name_), url(url_), - observer(new SessionObserver(*this)) + observer(new ConnectionObserver(*this)), + self(cpg.self()) { - QPID_LOG(trace, *this << " Joining cluster: " << name_); + QPID_LOG(trace, "Joining cluster: " << name_); cpg.join(name); notify(); dispatcher=Thread(*this); @@ -136,19 +122,32 @@ Cluster::~Cluster() { } } -void Cluster::send(AMQFrame& frame, FrameHandler* next) { - QPID_LOG(trace, *this << " SEND: " << frame); - char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling. - Buffer buf(data); +template <class T> void decodePtr(Buffer& buf, T*& ptr) { + uint64_t value = buf.getLongLong(); + ptr = reinterpret_cast<T*>(value); +} + +template <class T> void encodePtr(Buffer& buf, T* ptr) { + uint64_t value = reinterpret_cast<uint64_t>(ptr); + buf.putLongLong(value); +} + +void Cluster::send(AMQFrame& frame, void* connection, FrameHandler* next) { + QPID_LOG(trace, "MCAST [" << connection << "] " << frame); + // TODO aconway 2008-07-03: More efficient buffer management. + // Cache coded form of decoded frames for re-encoding? + Buffer buf(buffer); + assert(frame.size() + 128 < sizeof(buffer)); frame.encode(buf); - buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer. - iovec iov = { data, frame.size()+sizeof(next) }; + encodePtr(buf, connection); + encodePtr(buf, next); + iovec iov = { buffer, buf.getPosition() }; cpg.mcast(name, &iov, 1); } void Cluster::notify() { AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())); - send(frame, 0); + send(frame, 0, 0); } size_t Cluster::size() const { @@ -164,6 +163,21 @@ Cluster::MemberList Cluster::getMembers() const { return result; } +boost::shared_ptr<broker::Connection> +Cluster::getShadowConnection(const Cpg::Id& member, void* connectionPtr) { + // FIXME aconway 2008-07-02: locking - called by deliver in + // cluster thread so no locks but may need to revisit as model + // changes. + ShadowConnectionId id(member, connectionPtr); + boost::shared_ptr<broker::Connection>& ptr = shadowConnectionMap[id]; + if (!ptr) { + std::ostringstream os; + os << name << ":" << member << ":" << std::hex << connectionPtr; + ptr.reset(new broker::Connection(&shadowOut, broker, os.str())); + } + return ptr; +} + void Cluster::deliver( cpg_handle_t /*handle*/, cpg_name* /*group*/, @@ -172,20 +186,28 @@ void Cluster::deliver( void* msg, int msg_len) { + Id from(nodeid, pid); try { - Id from(nodeid, pid); Buffer buf(static_cast<char*>(msg), msg_len); AMQFrame frame; frame.decode(buf); - QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); - if (frame.getChannel() == 0) + void* connectionId; + decodePtr(buf, connectionId); + + QPID_LOG(trace, "DLVR [" << from << " " << connectionId << "] " << frame); + + if (connectionId == 0) // A cluster control frame. handleClusterFrame(from, frame); - else if (from == self) { - FrameHandler* next; - buf.getRawData((uint8_t*)&next, sizeof(next)); + else if (from == self) { // My own frame, carries a next pointer. + FrameHandler* next; + decodePtr(buf, next); next->handle(frame); } - // FIXME aconway 2008-01-30: apply frames from foreign sessions. + else { // Foreign frame, forward to shadow connection. + // FIXME aconway 2008-07-02: ptr_map instead of shared_ptr. + boost::shared_ptr<broker::Connection> shadow = getShadowConnection(from, connectionId); + shadow->received(frame); + } } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. @@ -203,7 +225,7 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, return (predicate(*this)); } -// Handle cluster control frame from the null session. +// Handle cluster control frame . void Cluster::handleClusterFrame(Id from, AMQFrame& frame) { // TODO aconway 2007-06-20: use visitor pattern here. ClusterNotifyBody* notifyIn= @@ -213,10 +235,8 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) { { Mutex::ScopedLock l(lock); members[from].url=notifyIn->getUrl(); - if (!self.id && notifyIn->getUrl() == url.str()) - self=from; lock.notifyAll(); - QPID_LOG(trace, *this << ": members joined: " << members); + QPID_LOG(debug, "Cluster join: " << members); } } @@ -234,7 +254,7 @@ void Cluster::configChange( if (nLeft) { for (int i = 0; i < nLeft; ++i) members.erase(Id(left[i])); - QPID_LOG(trace, *this << ": members left: " << members); + QPID_LOG(debug, "Cluster leave: " << members); lock.notifyAll(); } newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 6cc8dd7f78..031baf914a 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -19,7 +19,8 @@ * */ -#include "Cpg.h" +#include "qpid/cluster/Cpg.h" +#include "qpid/cluster/ShadowConnectionOutputHandler.h" #include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" @@ -36,7 +37,8 @@ #include <map> #include <vector> -namespace qpid { namespace cluster { +namespace qpid { +namespace cluster { /** * Connection to the cluster. @@ -63,7 +65,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler virtual ~Cluster(); // FIXME aconway 2008-01-29: - boost::intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; } + boost::intrusive_ptr<broker::ConnectionManager::Observer> getObserver() { return observer; } /** Get the current cluster membership. */ MemberList getMembers() const; @@ -82,11 +84,13 @@ class Cluster : private sys::Runnable, private Cpg::Handler sys::Duration timeout=sys::TIME_INFINITE) const; /** Send frame to the cluster */ - void send(framing::AMQFrame&, framing::FrameHandler*); + void send(framing::AMQFrame&, void* connection, framing::FrameHandler*); private: typedef Cpg::Id Id; typedef std::map<Id, Member> MemberMap; + typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId; + typedef std::map<ShadowConnectionId, boost::shared_ptr<broker::Connection> > ShadowConnectionMap; void notify(); ///< Notify cluster of my details. @@ -107,17 +111,24 @@ class Cluster : private sys::Runnable, private Cpg::Handler ); void run(); + void handleClusterFrame(Id from, framing::AMQFrame&); + boost::shared_ptr<broker::Connection> getShadowConnection(const Cpg::Id&, void*); + mutable sys::Monitor lock; + broker::Broker& broker; Cpg cpg; Cpg::Name name; Url url; - Id self; MemberMap members; sys::Thread dispatcher; boost::function<void()> callback; - boost::intrusive_ptr<broker::SessionManager::Observer> observer; + boost::intrusive_ptr<broker::ConnectionManager::Observer> observer; + Id self; + ShadowConnectionMap shadowConnectionMap; + ShadowConnectionOutputHandler shadowOut; + char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management. friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index a638f509c6..10695496bc 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -35,15 +35,6 @@ namespace cluster { using namespace std; using broker::Broker; -struct OptionValues { - string name; - string url; - - Url getUrl(uint16_t port) const { - if (url.empty()) return Url::getIpAddressesUrl(port); - return Url(url); - } -}; // Note we update the values in a separate struct. // This is to work around boost::program_options differences, @@ -51,43 +42,44 @@ struct OptionValues { // ones take a copy (or require a shared_ptr) // struct ClusterOptions : public Options { + std::string name; + std::string url; - ClusterOptions(OptionValues* v) : Options("Cluster Options") { + ClusterOptions() : Options("Cluster Options") { addOptions() - ("cluster-name", optValue(v->name, "NAME"), "Name of cluster to join") - ("cluster-url", optValue(v->url,"URL"), + ("cluster-name", optValue(name,""), "Cluster identifier") + ("cluster-url", optValue(url,"URL"), "URL of this broker, advertized to the cluster.\n" - "Defaults to a URL listing all the local IP addresses\n"); + "Defaults to a URL listing all the local IP addresses\n") + ; } }; struct ClusterPlugin : public PluginT<Broker> { - OptionValues values; + ClusterOptions options; boost::optional<Cluster> cluster; - ClusterPlugin(const OptionValues& v) : values(v) {} + ClusterPlugin(const ClusterOptions& opts) : options(opts) {} - void initializeT(Broker& broker) { - cluster = boost::in_place(values.name, values.getUrl(broker.getPort()), boost::ref(broker)); - broker.getSessionManager().add(cluster->getObserver()); + void initializeT(Broker& broker) { // FIXME aconway 2008-07-01: drop T suffix. + Url url = options.url.empty() ? Url::getIpAddressesUrl(broker.getPort()) : Url(options.url); + cluster = boost::in_place(options.name, url, boost::ref(broker)); + broker.getConnectionManager().add(cluster->getObserver()); // FIXME aconway 2008-07-01: to Cluster ctor } }; struct PluginFactory : public Plugin::FactoryT<Broker> { - OptionValues values; ClusterOptions options; - PluginFactory() : options(&values) {} - Options* getOptions() { return &options; } boost::shared_ptr<Plugin> createT(Broker&) { - // Only provide to a Broker, and only if the --cluster config is set. - if (values.name.empty()) + if (options.name.empty()) { // No cluster name, don't initialize cluster. return boost::shared_ptr<Plugin>(); + } else - return make_shared_ptr(new ClusterPlugin(values)); + return make_shared_ptr(new ClusterPlugin(options)); } }; diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 7b8fce4112..7831f66da1 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -144,24 +144,20 @@ std::string Cpg::cantMcastMsg(const Name& group) { return "Cannot mcast to CPG group "+group.str(); } +Cpg::Id Cpg::self() const { + unsigned int nodeid; + check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity"); + return Id(nodeid, getpid()); +} + ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) { ostream_iterator<Cpg::Id> i(o, " "); std::copy(a.first, a.first+a.second, i); return o; } -static int popbyte(uint32_t& n) { - uint8_t b=n&0xff; - n>>=8; - return b; -} - ostream& operator <<(ostream& out, const Cpg::Id& id) { - uint32_t node=id.nodeId(); - out << popbyte(node); - for (int i = 0; i < 3; i++) - out << "." << popbyte(node); - return out << ":" << id.pid(); + return out << id.getNodeId() << "-" << id.getPid(); } ostream& operator <<(ostream& out, const cpg_name& name) { diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index 1ed362f94e..a918fb0cbf 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -22,6 +22,8 @@ #include "qpid/Exception.h" #include "qpid/cluster/Dispatchable.h" +#include <boost/tuple/tuple.hpp> +#include <boost/tuple/tuple_comparison.hpp> #include <cassert> #include <string.h> @@ -55,16 +57,14 @@ class Cpg : public Dispatchable { std::string str() const { return std::string(value, length); } }; - - struct Id { - uint64_t id; - Id(uint64_t n=0) : id(n) {} - Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; } - Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {} - - operator uint64_t() const { return id; } - uint32_t nodeId() const { return id >> 32; } - pid_t pid() const { return id & 0xFFFF; } + + + // boost::tuple gives us == and < for free. + struct Id : public boost::tuple<uint32_t, uint32_t> { + Id(uint32_t n=0, uint32_t p=0) : boost::tuple<uint32_t, uint32_t>(n, p) {} + Id(const cpg_address& addr) : boost::tuple<uint32_t, uint32_t>(addr.nodeid, addr.pid) {} + uint32_t getNodeId() const { return boost::get<0>(*this); } + uint32_t getPid() const { return boost::get<1>(*this); } }; static std::string str(const cpg_name& n) { @@ -131,6 +131,8 @@ class Cpg : public Dispatchable { cpg_handle_t getHandle() const { return handle; } + Id self() const; + private: class Handles; struct ClearHandleOnExit; diff --git a/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h b/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h new file mode 100644 index 0000000000..6d429535e6 --- /dev/null +++ b/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h @@ -0,0 +1,46 @@ +#ifndef QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H +#define QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <qpid/sys/ConnectionOutputHandler.h> + +namespace qpid { + +namespace framing { class AMQFrame; } + +namespace cluster { + +/** + * Output handler for frames sent to shadow connections. + * Simply discards frames. + */ +class ShadowConnectionOutputHandler : public sys::ConnectionOutputHandler +{ + public: + virtual void send(framing::AMQFrame&) {} + virtual void close() {} + virtual void activateOutput() {} +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H*/ |
