summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-10-25 18:00:34 +0000
committerAlan Conway <aconway@apache.org>2010-10-25 18:00:34 +0000
commitf27a733f9a4cca3ad2a42acb35ab4620a47e320d (patch)
treea4d7d7a34a6cf42e1241e998f7da012ee37b109f /cpp/src/qpid/cluster
parent2c422462dc717e667c13aa74bbc552c8507e3f83 (diff)
downloadqpid-python-f27a733f9a4cca3ad2a42acb35ab4620a47e320d.tar.gz
New cluster: core framework and initial implementation of enqueue logic.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1027210 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/BrokerHandler.cpp96
-rw-r--r--cpp/src/qpid/cluster/BrokerHandler.h86
-rw-r--r--cpp/src/qpid/cluster/Cluster2Plugin.cpp65
-rw-r--r--cpp/src/qpid/cluster/Core.cpp68
-rw-r--r--cpp/src/qpid/cluster/Core.h95
-rw-r--r--cpp/src/qpid/cluster/EventHandler.cpp89
-rw-r--r--cpp/src/qpid/cluster/EventHandler.h85
-rw-r--r--cpp/src/qpid/cluster/LockedMap.h73
-rw-r--r--cpp/src/qpid/cluster/MessageHandler.cpp82
-rw-r--r--cpp/src/qpid/cluster/MessageHandler.h70
-rw-r--r--cpp/src/qpid/cluster/MessageId.cpp35
-rw-r--r--cpp/src/qpid/cluster/MessageId.h52
-rw-r--r--cpp/src/qpid/cluster/PollerDispatch.cpp9
-rw-r--r--cpp/src/qpid/cluster/PollerDispatch.h1
-rw-r--r--cpp/src/qpid/cluster/new-cluster-design.txt2
-rw-r--r--cpp/src/qpid/cluster/new-cluster-plan.txt37
16 files changed, 920 insertions, 25 deletions
diff --git a/cpp/src/qpid/cluster/BrokerHandler.cpp b/cpp/src/qpid/cluster/BrokerHandler.cpp
new file mode 100644
index 0000000000..f0b930a221
--- /dev/null
+++ b/cpp/src/qpid/cluster/BrokerHandler.cpp
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "BrokerHandler.h"
+#include "qpid/framing/ClusterMessageRoutingBody.h"
+#include "qpid/framing/ClusterMessageRoutedBody.h"
+#include "qpid/framing/ClusterMessageEnqueueBody.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+using namespace broker;
+
+namespace {
+// noReplicate means the current thread is handling a message
+// received from the cluster so it should not be replciated.
+QPID_TSS bool noReplicate = false;
+
+// Sequence number of the message currently being routed.
+// 0 if we are not currently routing a message.
+QPID_TSS SequenceNumber routeSeq = 0;
+}
+
+BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() {
+ assert(!noReplicate);
+ noReplicate = true;
+}
+
+BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() {
+ assert(noReplicate);
+ noReplicate = false;
+}
+
+BrokerHandler::BrokerHandler(Core& c) : core(c) {}
+
+SequenceNumber BrokerHandler::nextSequenceNumber() {
+ SequenceNumber s = ++sequence;
+ if (!s) s = ++sequence; // Avoid 0 on wrap-around.
+ return s;
+}
+
+void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { }
+
+bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
+{
+ if (noReplicate) return true;
+ if (!routeSeq) { // This is the first enqueue, so send the message
+ routeSeq = nextSequenceNumber();
+ // FIXME aconway 2010-10-20: replicate message in fixed size buffers.
+ std::string data(msg->encodedSize(),char());
+ framing::Buffer buf(&data[0], data.size());
+ msg->encode(buf);
+ core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), routeSeq, data));
+ core.getRoutingMap().put(routeSeq, msg);
+ }
+ core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), routeSeq, queue.getName()));
+ // TODO aconway 2010-10-21: configable option for strict (wait
+ // for CPG deliver to do local deliver) vs. loose (local deliver
+ // immediately).
+ return false;
+}
+
+void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) {
+ if (routeSeq) { // we enqueued at least one message.
+ core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), routeSeq));
+ // Note: routingMap is cleaned up on CPG delivery in MessageHandler.
+ routeSeq = 0;
+ }
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/BrokerHandler.h b/cpp/src/qpid/cluster/BrokerHandler.h
new file mode 100644
index 0000000000..1a61d1fc11
--- /dev/null
+++ b/cpp/src/qpid/cluster/BrokerHandler.h
@@ -0,0 +1,86 @@
+#ifndef QPID_CLUSTER_BROKERHANDLER_H
+#define QPID_CLUSTER_BROKERHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Cluster.h"
+#include "qpid/sys/AtomicValue.h"
+
+namespace qpid {
+namespace cluster {
+class Core;
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+/**
+ * Implements broker::Cluster interface, handles events in broker code.
+ */
+class BrokerHandler : public broker::Cluster
+{
+ public:
+ /** Suppress replication while in scope.
+ * Used to prevent re-replication of messages received from the cluster.
+ */
+ struct ScopedSuppressReplication {
+ ScopedSuppressReplication();
+ ~ScopedSuppressReplication();
+ };
+
+ BrokerHandler(Core&);
+
+ // FIXME aconway 2010-10-20: implement all points.
+
+ // Messages
+
+ void routing(const boost::intrusive_ptr<broker::Message>&);
+ bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&);
+ void routed(const boost::intrusive_ptr<broker::Message>&);
+ void acquire(const broker::QueuedMessage&) {}
+ void accept(const broker::QueuedMessage&) {}
+ void reject(const broker::QueuedMessage&) {}
+ void rejected(const broker::QueuedMessage&) {}
+ void release(const broker::QueuedMessage&) {}
+ void drop(const broker::QueuedMessage&) {}
+
+ // Consumers
+
+ void consume(const broker::Queue&, size_t) {}
+ void cancel(const broker::Queue&, size_t) {}
+
+ // Wiring
+
+ void create(const broker::Queue&) {}
+ void destroy(const broker::Queue&) {}
+ void create(const broker::Exchange&) {}
+ void destroy(const broker::Exchange&) {}
+ void bind(const broker::Queue&, const broker::Exchange&,
+ const std::string&, const framing::FieldTable&) {}
+
+ private:
+ SequenceNumber nextSequenceNumber();
+
+ Core& core;
+ sys::AtomicValue<SequenceNumber> sequence;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_BROKERHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/Cluster2Plugin.cpp b/cpp/src/qpid/cluster/Cluster2Plugin.cpp
new file mode 100644
index 0000000000..28b7dcec2e
--- /dev/null
+++ b/cpp/src/qpid/cluster/Cluster2Plugin.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/Options.h>
+#include <qpid/broker/Broker.h>
+#include "Core.h"
+
+namespace qpid {
+namespace cluster {
+using broker::Broker;
+
+// TODO aconway 2010-10-19: experimental new cluster code.
+
+/**
+ * Plugin for the cluster.
+ */
+struct Cluster2Plugin : public Plugin {
+ struct Opts : public Options {
+ Core::Settings& settings;
+ Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) {
+ addOptions()
+ ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join");
+ // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h
+ }
+ };
+
+ Core::Settings settings;
+ Opts options;
+ Core* core; // Core deletes itself on shutdown.
+
+ Cluster2Plugin() : options(settings), core(0) {}
+
+ Options* getOptions() { return &options; }
+
+ void earlyInitialize(Plugin::Target& target) {
+ if (settings.name.empty()) return;
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (!broker) return;
+ core = new Core(settings, *broker);
+ }
+
+ void initialize(Plugin::Target& target) {
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (broker && core) core->initialize();
+ }
+};
+
+static Cluster2Plugin instance; // Static initialization.
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Core.cpp b/cpp/src/qpid/cluster/Core.cpp
new file mode 100644
index 0000000000..e4127fa443
--- /dev/null
+++ b/cpp/src/qpid/cluster/Core.cpp
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "EventHandler.h"
+#include "BrokerHandler.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SignalHandler.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+#include <sys/uio.h> // For iovec
+
+namespace qpid {
+namespace cluster {
+
+Core::Core(const Settings& s, broker::Broker& b) :
+ broker(b),
+ eventHandler(new EventHandler(*this))
+{
+ std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this));
+ brokerHandler = bh.get();
+ // BrokerHandler belongs to Broker
+ broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
+ // FIXME aconway 2010-10-20: ownership of BrokerHandler, shutdown issues.
+ eventHandler->getCpg().join(s.name);
+}
+
+void Core::initialize() {}
+
+void Core::fatal() {
+ // FIXME aconway 2010-10-20: error handling
+ assert(0);
+ broker::SignalHandler::shutdown();
+}
+
+void Core::mcast(const framing::AMQBody& body) {
+ QPID_LOG(trace, "multicast: " << body);
+ // FIXME aconway 2010-10-20: use Multicaster, or bring in its features.
+ // here we multicast Frames rather than Events.
+ framing::AMQFrame f(body);
+ std::string data(f.encodedSize(), char());
+ framing::Buffer buf(&data[0], data.size());
+ f.encode(buf);
+ iovec iov = { buf.getPointer(), buf.getSize() };
+ while (!eventHandler->getCpg().mcast(&iov, 1))
+ ::usleep(1000); // FIXME aconway 2010-10-20: flow control
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Core.h b/cpp/src/qpid/cluster/Core.h
new file mode 100644
index 0000000000..9976c1c906
--- /dev/null
+++ b/cpp/src/qpid/cluster/Core.h
@@ -0,0 +1,95 @@
+#ifndef QPID_CLUSTER_CORE_H
+#define QPID_CLUSTER_CORE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <memory>
+
+#include "Cpg.h"
+#include "MessageId.h"
+#include "LockedMap.h"
+#include <qpid/broker/QueuedMessage.h>
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+namespace qpid {
+
+namespace framing{
+class AMQBody;
+}
+
+namespace broker {
+class Broker;
+}
+
+namespace cluster {
+class EventHandler;
+class BrokerHandler;
+
+/**
+ * Cluster core state machine.
+ * Holds together the various objects that implement cluster behavior,
+ * and holds state that is shared by multiple components.
+ *
+ * Thread safe: called from broker connection threads and CPG dispatch threads.
+ */
+class Core
+{
+ public:
+ /** Configuration settings */
+ struct Settings {
+ std::string name;
+ };
+
+ typedef LockedMap<SequenceNumber, boost::intrusive_ptr<broker::Message> >
+ SequenceMessageMap;
+
+ /** Constructed during Plugin::earlyInitialize() */
+ Core(const Settings&, broker::Broker&);
+
+ /** Called during Plugin::initialize() */
+ void initialize();
+
+ /** Shut down broker due to fatal error. Caller should log a critical message */
+ void fatal();
+
+ /** Multicast an event */
+ void mcast(const framing::AMQBody&);
+
+ broker::Broker& getBroker() { return broker; }
+ EventHandler& getEventHandler() { return *eventHandler; }
+ BrokerHandler& getBrokerHandler() { return *brokerHandler; }
+
+ /** Map of messages that are currently being routed.
+ * Used to pass messages being routed from BrokerHandler to MessageHandler
+ */
+ SequenceMessageMap& getRoutingMap() { return routingMap; }
+ private:
+ broker::Broker& broker;
+ std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
+ BrokerHandler* brokerHandler; // Handles broker events.
+ SequenceMessageMap routingMap;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CORE_H*/
diff --git a/cpp/src/qpid/cluster/EventHandler.cpp b/cpp/src/qpid/cluster/EventHandler.cpp
new file mode 100644
index 0000000000..95ae285b06
--- /dev/null
+++ b/cpp/src/qpid/cluster/EventHandler.cpp
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageHandler.h"
+#include "EventHandler.h"
+#include "Core.h"
+#include "types.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+EventHandler::EventHandler(Core& c) :
+ core(c),
+ cpg(*this), // FIXME aconway 2010-10-20: belongs on Core.
+ dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)),
+ self(cpg.self()),
+ messageHandler(new MessageHandler(*this))
+{
+ dispatcher.start(); // FIXME aconway 2010-10-20: later in initialization?
+}
+
+EventHandler::~EventHandler() {}
+
+// Deliver CPG message.
+void EventHandler::deliver(
+ cpg_handle_t /*handle*/,
+ const cpg_name* /*group*/,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ int msg_len)
+{
+ sender = MemberId(nodeid, pid);
+ framing::Buffer buf(static_cast<char*>(msg), msg_len);
+ framing::AMQFrame frame;
+ while (buf.available()) {
+ frame.decode(buf);
+ assert(frame.getBody());
+ QPID_LOG(trace, "cluster deliver: " << *frame.getBody());
+ try {
+ invoke(*frame.getBody());
+ }
+ catch (const std::exception& e) {
+ // Note: exceptions are assumed to be survivable,
+ // fatal errors should log a message and call Core::fatal.
+ QPID_LOG(error, e.what());
+ }
+ }
+}
+
+void EventHandler::invoke(const framing::AMQBody& body) {
+ if (framing::invoke(*messageHandler, body).wasHandled()) return;
+}
+
+// CPG config-change callback.
+void EventHandler::configChange (
+ cpg_handle_t /*handle*/,
+ const cpg_name */*group*/,
+ const cpg_address */*members*/, int /*nMembers*/,
+ const cpg_address */*left*/, int /*nLeft*/,
+ const cpg_address */*joined*/, int /*nJoined*/)
+{
+ // FIXME aconway 2010-10-20: TODO
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/EventHandler.h b/cpp/src/qpid/cluster/EventHandler.h
new file mode 100644
index 0000000000..5645c3980b
--- /dev/null
+++ b/cpp/src/qpid/cluster/EventHandler.h
@@ -0,0 +1,85 @@
+#ifndef QPID_CLUSTER_EVENTHANDLER_H
+#define QPID_CLUSTER_EVENTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "types.h"
+#include "Cpg.h"
+#include "PollerDispatch.h"
+
+namespace qpid {
+
+namespace framing {
+class AMQBody;
+}
+
+namespace cluster {
+class Core;
+class MessageHandler;
+
+/**
+ * Dispatch events received from CPG.
+ * Thread unsafe: only called in CPG deliver thread context.
+ */
+class EventHandler : public Cpg::Handler
+{
+ public:
+ EventHandler(Core&);
+ ~EventHandler();
+
+ void deliver( // CPG deliver callback.
+ cpg_handle_t /*handle*/,
+ const struct cpg_name *group,
+ uint32_t /*nodeid*/,
+ uint32_t /*pid*/,
+ void* /*msg*/,
+ int /*msg_len*/);
+
+ void configChange( // CPG config change callback.
+ cpg_handle_t /*handle*/,
+ const struct cpg_name */*group*/,
+ const struct cpg_address */*members*/, int /*nMembers*/,
+ const struct cpg_address */*left*/, int /*nLeft*/,
+ const struct cpg_address */*joined*/, int /*nJoined*/
+ );
+
+
+ MemberId getSender() { return sender; }
+ MemberId getSelf() { return self; }
+ Core& getCore() { return core; }
+ Cpg& getCpg() { return cpg; }
+
+ private:
+ void invoke(const framing::AMQBody& body);
+
+ Core& core;
+ Cpg cpg;
+ PollerDispatch dispatcher;
+ MemberId sender; // sender of current event.
+ MemberId self;
+ std::auto_ptr<MessageHandler> messageHandler;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EVENTHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/LockedMap.h b/cpp/src/qpid/cluster/LockedMap.h
new file mode 100644
index 0000000000..0736e7ac35
--- /dev/null
+++ b/cpp/src/qpid/cluster/LockedMap.h
@@ -0,0 +1,73 @@
+#ifndef QPID_CLUSTER_LOCKEDMAP_H
+#define QPID_CLUSTER_LOCKEDMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A reader-writer locked thread safe map.
+ */
+template <class Key, class Value>
+class LockedMap
+{
+ public:
+ /** Get value associated with key, returns Value() if none. */
+ Value get(const Key& key) const {
+ sys::RWlock::ScopedRlock r(lock);
+ typename Map::const_iterator i = map.find(key);
+ if (i == map.end()) return Value();
+ else return i->second;
+ }
+
+ /** Associate value with key, overwriting any previous value for key. */
+ void put(const Key& key, const Value& value) {
+ sys::RWlock::ScopedWlock w(lock);
+ map[key] = value;
+ }
+
+ /** Associate value with key if there is not already a value associated with key.
+ * Returns true if the value was added.
+ */
+ bool add(const Key& key, const Value& value) {
+ sys::RWlock::ScopedWlock w(lock);
+ return map.insert(key, value).second;
+ }
+
+ /** Erase the value associated with key if any. Return true if a value was erased. */
+ bool erase(const Key& key) {
+ sys::RWlock::ScopedWlock w(lock);
+ return map.erase(key);
+ }
+
+ private:
+ typedef std::map<Key, Value> Map;
+ Map map;
+ mutable sys::RWlock lock;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_LOCKEDMAP_H*/
diff --git a/cpp/src/qpid/cluster/MessageHandler.cpp b/cpp/src/qpid/cluster/MessageHandler.cpp
new file mode 100644
index 0000000000..fbbdad38a3
--- /dev/null
+++ b/cpp/src/qpid/cluster/MessageHandler.cpp
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "MessageHandler.h"
+#include "BrokerHandler.h"
+#include "EventHandler.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/sys/Thread.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace cluster {
+using namespace broker;
+
+MessageHandler::MessageHandler(EventHandler& e) :
+ broker(e.getCore().getBroker()),
+ eventHandler(e),
+ brokerHandler(e.getCore().getBrokerHandler())
+{}
+
+MessageHandler::~MessageHandler() {}
+
+MemberId MessageHandler::sender() { return eventHandler.getSender(); }
+MemberId MessageHandler::self() { return eventHandler.getSelf(); }
+
+void MessageHandler::routing(uint64_t sequence, const std::string& message) {
+ MessageId id(sender(), sequence);
+ boost::intrusive_ptr<Message> msg;
+ if (sender() == self())
+ msg = eventHandler.getCore().getRoutingMap().get(sequence);
+ if (!msg) {
+ framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
+ msg = new Message;
+ msg->decodeHeader(buf);
+ msg->decodeContent(buf);
+ }
+ routingMap[id] = msg;
+}
+
+void MessageHandler::enqueue(uint64_t sequence, const std::string& q) {
+ MessageId id(sender(), sequence);
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(q);
+ if (!queue) throw Exception(QPID_MSG("Cluster message for unknown queue " << q));
+ boost::intrusive_ptr<Message> msg = routingMap[id];
+ if (!msg) throw Exception(QPID_MSG("Unknown cluster message for queue " << q));
+ BrokerHandler::ScopedSuppressReplication ssr;
+ // TODO aconway 2010-10-21: configable option for strict (wait
+ // for CPG deliver to do local deliver) vs. loose (local deliver
+ // immediately).
+ queue->deliver(msg);
+}
+
+void MessageHandler::routed(uint64_t sequence) {
+ MessageId id(sender(), sequence);
+ routingMap.erase(id);
+ eventHandler.getCore().getRoutingMap().erase(sequence);
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/MessageHandler.h b/cpp/src/qpid/cluster/MessageHandler.h
new file mode 100644
index 0000000000..5c32bf474e
--- /dev/null
+++ b/cpp/src/qpid/cluster/MessageHandler.h
@@ -0,0 +1,70 @@
+#ifndef QPID_CLUSTER_MESSAGEHANDLER_H
+#define QPID_CLUSTER_MESSAGEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "MessageId.h"
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+
+namespace qpid {
+
+namespace broker {
+class Message;
+class Broker;
+}
+
+namespace cluster {
+class EventHandler;
+class BrokerHandler;
+
+/**
+ * Handler for message disposition events.
+ */
+class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
+{
+ public:
+ MessageHandler(EventHandler&);
+ ~MessageHandler();
+
+ void routing(uint64_t sequence, const std::string& message);
+ void enqueue(uint64_t sequence, const std::string& queue);
+ void routed(uint64_t sequence);
+
+ private:
+ typedef std::map<MessageId, boost::intrusive_ptr<broker::Message> > RoutingMap;
+
+ MemberId sender();
+ MemberId self();
+
+ broker::Broker& broker;
+ EventHandler& eventHandler;
+ BrokerHandler& brokerHandler;
+ RoutingMap routingMap;
+
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_MESSAGEHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/MessageId.cpp b/cpp/src/qpid/cluster/MessageId.cpp
new file mode 100644
index 0000000000..fbd248ed69
--- /dev/null
+++ b/cpp/src/qpid/cluster/MessageId.cpp
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessageId.h"
+#include <ostream>
+
+namespace qpid {
+namespace cluster {
+
+bool operator<(const MessageId& a, const MessageId& b) {
+ return a.member < b.member || ((a.member == b.member) && a.sequence < b.sequence);
+}
+
+std::ostream& operator<<(std::ostream& o, const MessageId& m) {
+ return o << m.member << ":" << m.sequence;
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/MessageId.h b/cpp/src/qpid/cluster/MessageId.h
new file mode 100644
index 0000000000..16bf7ddd6d
--- /dev/null
+++ b/cpp/src/qpid/cluster/MessageId.h
@@ -0,0 +1,52 @@
+#ifndef QPID_CLUSTER_MESSAGEID_H
+#define QPID_CLUSTER_MESSAGEID_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include <iosfwd>
+
+namespace qpid {
+namespace cluster {
+
+// TODO aconway 2010-10-20: experimental new cluster code.
+
+/** Sequence number used in message identifiers */
+typedef uint64_t SequenceNumber;
+
+/**
+ * Message identifier
+ */
+struct MessageId {
+ MemberId member; /// Member that created the message
+ SequenceNumber sequence; /// Sequence number assiged by member.
+ MessageId(MemberId m=MemberId(), SequenceNumber s=0) : member(m), sequence(s) {}
+};
+
+bool operator<(const MessageId&, const MessageId&);
+
+std::ostream& operator<<(std::ostream&, const MessageId&);
+
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_MESSAGEID_H*/
diff --git a/cpp/src/qpid/cluster/PollerDispatch.cpp b/cpp/src/qpid/cluster/PollerDispatch.cpp
index b8d94b95a5..43c171efe8 100644
--- a/cpp/src/qpid/cluster/PollerDispatch.cpp
+++ b/cpp/src/qpid/cluster/PollerDispatch.cpp
@@ -37,9 +37,11 @@ PollerDispatch::PollerDispatch(Cpg& c, boost::shared_ptr<sys::Poller> p,
started(false)
{}
-PollerDispatch::~PollerDispatch() {
- if (started)
- dispatchHandle.stopWatch();
+PollerDispatch::~PollerDispatch() { stop(); }
+
+void PollerDispatch::stop() {
+ if (started) dispatchHandle.stopWatch();
+ started = false;
}
void PollerDispatch::start() {
@@ -54,6 +56,7 @@ void PollerDispatch::dispatch(sys::DispatchHandle& h) {
h.rewatch();
} catch (const std::exception& e) {
QPID_LOG(critical, "Error in cluster dispatch: " << e.what());
+ stop();
onError();
}
}
diff --git a/cpp/src/qpid/cluster/PollerDispatch.h b/cpp/src/qpid/cluster/PollerDispatch.h
index 63801e0de9..f16d5ece95 100644
--- a/cpp/src/qpid/cluster/PollerDispatch.h
+++ b/cpp/src/qpid/cluster/PollerDispatch.h
@@ -41,6 +41,7 @@ class PollerDispatch {
~PollerDispatch();
void start();
+ void stop();
private:
// Poller callbacks
diff --git a/cpp/src/qpid/cluster/new-cluster-design.txt b/cpp/src/qpid/cluster/new-cluster-design.txt
index 8ee740372d..abbbcd616c 100644
--- a/cpp/src/qpid/cluster/new-cluster-design.txt
+++ b/cpp/src/qpid/cluster/new-cluster-design.txt
@@ -328,8 +328,6 @@ and been in use (one of the key missing features).
** Misc outstanding issues & notes
-Message IDs: need an efficient cluster-wide message ID.
-
Replicating wiring
- Need async completion of wiring commands?
- qpid.sequence_counter: need extra work to support in new design, do we care?
diff --git a/cpp/src/qpid/cluster/new-cluster-plan.txt b/cpp/src/qpid/cluster/new-cluster-plan.txt
index 57c1241607..4eeb030b1a 100644
--- a/cpp/src/qpid/cluster/new-cluster-plan.txt
+++ b/cpp/src/qpid/cluster/new-cluster-plan.txt
@@ -38,21 +38,9 @@ a note for later optimization/improvement.
- acquire then kill broker: verify can be dequeued other members.
- acquire then reject: verify goes on alt-exchange once only.
-*** TODO broker::Cluster interface and call points.
+*** DONE broker::Cluster interface and call points.
-Initial draft is commited.
-
-Issues to review:
-
-queue API: internal classes like RingQueuePolicy use Queue::acuqire/dequeue
-when messages are pushed. How to reconcile with queue ownership?
-
-rejecting messages: if there's an alternate exchange where do we do the
-re-routing? On origin broker or on all brokers?
-
-Intercept points: on Queue vs. on DeliveryRecord, SemanticState etc.
-Intercepting client actions on the queue vs. internal actions
-(e.g. ring policy)
+Initial interface commited.
*** Main classes
@@ -63,7 +51,7 @@ BrokerHandler:
LocalMessageMap:
- Holds local messages while they are being enqueued.
-- thread safe: called by both BrokerHandler and DeliverHandler
+- thread safe: called by both BrokerHandler and MessageHandler
MessageHandler:
- handles delivered mcast messages related to messages.
@@ -77,7 +65,7 @@ QueueOwnerHandler:
- maintains view of cluster state regarding queue ownership.
cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
-- thread safe: manage state used by both DeliverHandler and BrokerHandler
+- thread safe: manage state used by both MessageHandler and BrokerHandler
The following code sketch illustrates only the "happy path" error handling
is omitted.
@@ -89,13 +77,15 @@ Types:
- NodeId 64 bit CPG node-id, identifies member of the cluster.
- struct MessageId { NodeId node; SequenceNumber seq; }
+NOTE: Message ID's identify a QueuedMessage, i.e. a position on a queue.
+
Members:
- atomic<SequenceNumber> sequence // sequence number for message IDs.
- thread_local bool noReplicate // suppress replication.
- thread_local bool isRouting // suppress operations while routing
- QueuedMessage localMessage[SequenceNumber] // local messages being enqueued.
-NOTE: localMessage is also modified by DeliverHandler.
+NOTE: localMessage is also modified by MessageHandler.
broker::Cluster intercept functions:
@@ -150,7 +140,7 @@ dequeue(QueuedMessage)
# FIXME revisit - move it out of the queue lock.
cleanup(msg)
-*** DeliverHandler and mcast messages
+*** MessageHandler and mcast messages
Types:
- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
- struct QueueKey { MessageId id; QueueName q; }
@@ -326,8 +316,9 @@ cancel(q,consumer,consumerCount) - Queue::cancel()
- keep design modular, keep threading rules clear.
** TODO [#B] Large message replication.
-Need to be able to multicast large messages in fragments
-
+Multicast should encode messages in fixed size buffers (64k)?
+Can't assume we can send message in one chunk.
+For 0-10 can use channel numbers & send whole frames packed into larger buffer.
** TODO [#B] Batch CPG multicast messages
The new cluster design involves a lot of small multicast messages,
they need to be batched into larger CPG messages for efficiency.
@@ -437,3 +428,9 @@ Look for ways to capitalize on the similarity & simplify the code.
In particular QueuedEvents (async replication) strongly resembles
cluster replication, but over TCP rather than multicast.
+** TODO [#C] Concurrency for enqueue events.
+All enqueue events are being processed in the CPG deliver thread context which
+serializes all the work. We only need ordering on a per queue basis, can we
+enqueue in parallel on different queues and will that improve performance?
+** TODO [#C] Handling immediate messages in a cluster
+Include remote consumers in descision to deliver an immediate message?