summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-10-27 18:01:27 +0000
committerAlan Conway <aconway@apache.org>2010-10-27 18:01:27 +0000
commit326dddd0d0d48401d14ca93044b3fc0e35ad87d9 (patch)
tree019a45480d8cdf832f62d7176b7a10a5d0971535 /cpp/src/qpid/cluster
parentaae11121cfcf891b2365241141f9ab9cb47d3024 (diff)
downloadqpid-python-326dddd0d0d48401d14ca93044b3fc0e35ad87d9.tar.gz
Revert experimental cluster code, too close to 0.8 release.
Reverts revisions: r1023966 "Introduce broker::Cluster interface." r1024275 "Fix compile error: outline set/getCluster fucntions on Broker." r1027210 "New cluster: core framework and initial implementation of enqueue logic." git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1028055 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/BrokerHandler.cpp96
-rw-r--r--cpp/src/qpid/cluster/BrokerHandler.h86
-rw-r--r--cpp/src/qpid/cluster/Cluster2Plugin.cpp65
-rw-r--r--cpp/src/qpid/cluster/Core.cpp68
-rw-r--r--cpp/src/qpid/cluster/Core.h95
-rw-r--r--cpp/src/qpid/cluster/EventHandler.cpp89
-rw-r--r--cpp/src/qpid/cluster/EventHandler.h85
-rw-r--r--cpp/src/qpid/cluster/LockedMap.h73
-rw-r--r--cpp/src/qpid/cluster/MessageHandler.cpp82
-rw-r--r--cpp/src/qpid/cluster/MessageHandler.h70
-rw-r--r--cpp/src/qpid/cluster/MessageId.cpp35
-rw-r--r--cpp/src/qpid/cluster/MessageId.h52
-rw-r--r--cpp/src/qpid/cluster/PollerDispatch.cpp9
-rw-r--r--cpp/src/qpid/cluster/PollerDispatch.h1
14 files changed, 3 insertions, 903 deletions
diff --git a/cpp/src/qpid/cluster/BrokerHandler.cpp b/cpp/src/qpid/cluster/BrokerHandler.cpp
deleted file mode 100644
index f0b930a221..0000000000
--- a/cpp/src/qpid/cluster/BrokerHandler.cpp
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Core.h"
-#include "BrokerHandler.h"
-#include "qpid/framing/ClusterMessageRoutingBody.h"
-#include "qpid/framing/ClusterMessageRoutedBody.h"
-#include "qpid/framing/ClusterMessageEnqueueBody.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/broker/QueuedMessage.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/log/Statement.h"
-
-namespace qpid {
-namespace cluster {
-
-using namespace framing;
-using namespace broker;
-
-namespace {
-// noReplicate means the current thread is handling a message
-// received from the cluster so it should not be replciated.
-QPID_TSS bool noReplicate = false;
-
-// Sequence number of the message currently being routed.
-// 0 if we are not currently routing a message.
-QPID_TSS SequenceNumber routeSeq = 0;
-}
-
-BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() {
- assert(!noReplicate);
- noReplicate = true;
-}
-
-BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() {
- assert(noReplicate);
- noReplicate = false;
-}
-
-BrokerHandler::BrokerHandler(Core& c) : core(c) {}
-
-SequenceNumber BrokerHandler::nextSequenceNumber() {
- SequenceNumber s = ++sequence;
- if (!s) s = ++sequence; // Avoid 0 on wrap-around.
- return s;
-}
-
-void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { }
-
-bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
-{
- if (noReplicate) return true;
- if (!routeSeq) { // This is the first enqueue, so send the message
- routeSeq = nextSequenceNumber();
- // FIXME aconway 2010-10-20: replicate message in fixed size buffers.
- std::string data(msg->encodedSize(),char());
- framing::Buffer buf(&data[0], data.size());
- msg->encode(buf);
- core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), routeSeq, data));
- core.getRoutingMap().put(routeSeq, msg);
- }
- core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), routeSeq, queue.getName()));
- // TODO aconway 2010-10-21: configable option for strict (wait
- // for CPG deliver to do local deliver) vs. loose (local deliver
- // immediately).
- return false;
-}
-
-void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) {
- if (routeSeq) { // we enqueued at least one message.
- core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), routeSeq));
- // Note: routingMap is cleaned up on CPG delivery in MessageHandler.
- routeSeq = 0;
- }
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/BrokerHandler.h b/cpp/src/qpid/cluster/BrokerHandler.h
deleted file mode 100644
index 1a61d1fc11..0000000000
--- a/cpp/src/qpid/cluster/BrokerHandler.h
+++ /dev/null
@@ -1,86 +0,0 @@
-#ifndef QPID_CLUSTER_BROKERHANDLER_H
-#define QPID_CLUSTER_BROKERHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/broker/Cluster.h"
-#include "qpid/sys/AtomicValue.h"
-
-namespace qpid {
-namespace cluster {
-class Core;
-
-// TODO aconway 2010-10-19: experimental cluster code.
-
-/**
- * Implements broker::Cluster interface, handles events in broker code.
- */
-class BrokerHandler : public broker::Cluster
-{
- public:
- /** Suppress replication while in scope.
- * Used to prevent re-replication of messages received from the cluster.
- */
- struct ScopedSuppressReplication {
- ScopedSuppressReplication();
- ~ScopedSuppressReplication();
- };
-
- BrokerHandler(Core&);
-
- // FIXME aconway 2010-10-20: implement all points.
-
- // Messages
-
- void routing(const boost::intrusive_ptr<broker::Message>&);
- bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&);
- void routed(const boost::intrusive_ptr<broker::Message>&);
- void acquire(const broker::QueuedMessage&) {}
- void accept(const broker::QueuedMessage&) {}
- void reject(const broker::QueuedMessage&) {}
- void rejected(const broker::QueuedMessage&) {}
- void release(const broker::QueuedMessage&) {}
- void drop(const broker::QueuedMessage&) {}
-
- // Consumers
-
- void consume(const broker::Queue&, size_t) {}
- void cancel(const broker::Queue&, size_t) {}
-
- // Wiring
-
- void create(const broker::Queue&) {}
- void destroy(const broker::Queue&) {}
- void create(const broker::Exchange&) {}
- void destroy(const broker::Exchange&) {}
- void bind(const broker::Queue&, const broker::Exchange&,
- const std::string&, const framing::FieldTable&) {}
-
- private:
- SequenceNumber nextSequenceNumber();
-
- Core& core;
- sys::AtomicValue<SequenceNumber> sequence;
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_BROKERHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/Cluster2Plugin.cpp b/cpp/src/qpid/cluster/Cluster2Plugin.cpp
deleted file mode 100644
index 28b7dcec2e..0000000000
--- a/cpp/src/qpid/cluster/Cluster2Plugin.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <qpid/Options.h>
-#include <qpid/broker/Broker.h>
-#include "Core.h"
-
-namespace qpid {
-namespace cluster {
-using broker::Broker;
-
-// TODO aconway 2010-10-19: experimental new cluster code.
-
-/**
- * Plugin for the cluster.
- */
-struct Cluster2Plugin : public Plugin {
- struct Opts : public Options {
- Core::Settings& settings;
- Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) {
- addOptions()
- ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join");
- // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h
- }
- };
-
- Core::Settings settings;
- Opts options;
- Core* core; // Core deletes itself on shutdown.
-
- Cluster2Plugin() : options(settings), core(0) {}
-
- Options* getOptions() { return &options; }
-
- void earlyInitialize(Plugin::Target& target) {
- if (settings.name.empty()) return;
- Broker* broker = dynamic_cast<Broker*>(&target);
- if (!broker) return;
- core = new Core(settings, *broker);
- }
-
- void initialize(Plugin::Target& target) {
- Broker* broker = dynamic_cast<Broker*>(&target);
- if (broker && core) core->initialize();
- }
-};
-
-static Cluster2Plugin instance; // Static initialization.
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Core.cpp b/cpp/src/qpid/cluster/Core.cpp
deleted file mode 100644
index e4127fa443..0000000000
--- a/cpp/src/qpid/cluster/Core.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Core.h"
-#include "EventHandler.h"
-#include "BrokerHandler.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/SignalHandler.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/log/Statement.h"
-#include <sys/uio.h> // For iovec
-
-namespace qpid {
-namespace cluster {
-
-Core::Core(const Settings& s, broker::Broker& b) :
- broker(b),
- eventHandler(new EventHandler(*this))
-{
- std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this));
- brokerHandler = bh.get();
- // BrokerHandler belongs to Broker
- broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
- // FIXME aconway 2010-10-20: ownership of BrokerHandler, shutdown issues.
- eventHandler->getCpg().join(s.name);
-}
-
-void Core::initialize() {}
-
-void Core::fatal() {
- // FIXME aconway 2010-10-20: error handling
- assert(0);
- broker::SignalHandler::shutdown();
-}
-
-void Core::mcast(const framing::AMQBody& body) {
- QPID_LOG(trace, "multicast: " << body);
- // FIXME aconway 2010-10-20: use Multicaster, or bring in its features.
- // here we multicast Frames rather than Events.
- framing::AMQFrame f(body);
- std::string data(f.encodedSize(), char());
- framing::Buffer buf(&data[0], data.size());
- f.encode(buf);
- iovec iov = { buf.getPointer(), buf.getSize() };
- while (!eventHandler->getCpg().mcast(&iov, 1))
- ::usleep(1000); // FIXME aconway 2010-10-20: flow control
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Core.h b/cpp/src/qpid/cluster/Core.h
deleted file mode 100644
index 9976c1c906..0000000000
--- a/cpp/src/qpid/cluster/Core.h
+++ /dev/null
@@ -1,95 +0,0 @@
-#ifndef QPID_CLUSTER_CORE_H
-#define QPID_CLUSTER_CORE_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <string>
-#include <memory>
-
-#include "Cpg.h"
-#include "MessageId.h"
-#include "LockedMap.h"
-#include <qpid/broker/QueuedMessage.h>
-
-// TODO aconway 2010-10-19: experimental cluster code.
-
-namespace qpid {
-
-namespace framing{
-class AMQBody;
-}
-
-namespace broker {
-class Broker;
-}
-
-namespace cluster {
-class EventHandler;
-class BrokerHandler;
-
-/**
- * Cluster core state machine.
- * Holds together the various objects that implement cluster behavior,
- * and holds state that is shared by multiple components.
- *
- * Thread safe: called from broker connection threads and CPG dispatch threads.
- */
-class Core
-{
- public:
- /** Configuration settings */
- struct Settings {
- std::string name;
- };
-
- typedef LockedMap<SequenceNumber, boost::intrusive_ptr<broker::Message> >
- SequenceMessageMap;
-
- /** Constructed during Plugin::earlyInitialize() */
- Core(const Settings&, broker::Broker&);
-
- /** Called during Plugin::initialize() */
- void initialize();
-
- /** Shut down broker due to fatal error. Caller should log a critical message */
- void fatal();
-
- /** Multicast an event */
- void mcast(const framing::AMQBody&);
-
- broker::Broker& getBroker() { return broker; }
- EventHandler& getEventHandler() { return *eventHandler; }
- BrokerHandler& getBrokerHandler() { return *brokerHandler; }
-
- /** Map of messages that are currently being routed.
- * Used to pass messages being routed from BrokerHandler to MessageHandler
- */
- SequenceMessageMap& getRoutingMap() { return routingMap; }
- private:
- broker::Broker& broker;
- std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
- BrokerHandler* brokerHandler; // Handles broker events.
- SequenceMessageMap routingMap;
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_CORE_H*/
diff --git a/cpp/src/qpid/cluster/EventHandler.cpp b/cpp/src/qpid/cluster/EventHandler.cpp
deleted file mode 100644
index 95ae285b06..0000000000
--- a/cpp/src/qpid/cluster/EventHandler.cpp
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "MessageHandler.h"
-#include "EventHandler.h"
-#include "Core.h"
-#include "types.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AllInvoker.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/log/Statement.h"
-
-namespace qpid {
-namespace cluster {
-
-EventHandler::EventHandler(Core& c) :
- core(c),
- cpg(*this), // FIXME aconway 2010-10-20: belongs on Core.
- dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)),
- self(cpg.self()),
- messageHandler(new MessageHandler(*this))
-{
- dispatcher.start(); // FIXME aconway 2010-10-20: later in initialization?
-}
-
-EventHandler::~EventHandler() {}
-
-// Deliver CPG message.
-void EventHandler::deliver(
- cpg_handle_t /*handle*/,
- const cpg_name* /*group*/,
- uint32_t nodeid,
- uint32_t pid,
- void* msg,
- int msg_len)
-{
- sender = MemberId(nodeid, pid);
- framing::Buffer buf(static_cast<char*>(msg), msg_len);
- framing::AMQFrame frame;
- while (buf.available()) {
- frame.decode(buf);
- assert(frame.getBody());
- QPID_LOG(trace, "cluster deliver: " << *frame.getBody());
- try {
- invoke(*frame.getBody());
- }
- catch (const std::exception& e) {
- // Note: exceptions are assumed to be survivable,
- // fatal errors should log a message and call Core::fatal.
- QPID_LOG(error, e.what());
- }
- }
-}
-
-void EventHandler::invoke(const framing::AMQBody& body) {
- if (framing::invoke(*messageHandler, body).wasHandled()) return;
-}
-
-// CPG config-change callback.
-void EventHandler::configChange (
- cpg_handle_t /*handle*/,
- const cpg_name */*group*/,
- const cpg_address */*members*/, int /*nMembers*/,
- const cpg_address */*left*/, int /*nLeft*/,
- const cpg_address */*joined*/, int /*nJoined*/)
-{
- // FIXME aconway 2010-10-20: TODO
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/EventHandler.h b/cpp/src/qpid/cluster/EventHandler.h
deleted file mode 100644
index 5645c3980b..0000000000
--- a/cpp/src/qpid/cluster/EventHandler.h
+++ /dev/null
@@ -1,85 +0,0 @@
-#ifndef QPID_CLUSTER_EVENTHANDLER_H
-#define QPID_CLUSTER_EVENTHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-// TODO aconway 2010-10-19: experimental cluster code.
-
-#include "types.h"
-#include "Cpg.h"
-#include "PollerDispatch.h"
-
-namespace qpid {
-
-namespace framing {
-class AMQBody;
-}
-
-namespace cluster {
-class Core;
-class MessageHandler;
-
-/**
- * Dispatch events received from CPG.
- * Thread unsafe: only called in CPG deliver thread context.
- */
-class EventHandler : public Cpg::Handler
-{
- public:
- EventHandler(Core&);
- ~EventHandler();
-
- void deliver( // CPG deliver callback.
- cpg_handle_t /*handle*/,
- const struct cpg_name *group,
- uint32_t /*nodeid*/,
- uint32_t /*pid*/,
- void* /*msg*/,
- int /*msg_len*/);
-
- void configChange( // CPG config change callback.
- cpg_handle_t /*handle*/,
- const struct cpg_name */*group*/,
- const struct cpg_address */*members*/, int /*nMembers*/,
- const struct cpg_address */*left*/, int /*nLeft*/,
- const struct cpg_address */*joined*/, int /*nJoined*/
- );
-
-
- MemberId getSender() { return sender; }
- MemberId getSelf() { return self; }
- Core& getCore() { return core; }
- Cpg& getCpg() { return cpg; }
-
- private:
- void invoke(const framing::AMQBody& body);
-
- Core& core;
- Cpg cpg;
- PollerDispatch dispatcher;
- MemberId sender; // sender of current event.
- MemberId self;
- std::auto_ptr<MessageHandler> messageHandler;
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_EVENTHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/LockedMap.h b/cpp/src/qpid/cluster/LockedMap.h
deleted file mode 100644
index 0736e7ac35..0000000000
--- a/cpp/src/qpid/cluster/LockedMap.h
+++ /dev/null
@@ -1,73 +0,0 @@
-#ifndef QPID_CLUSTER_LOCKEDMAP_H
-#define QPID_CLUSTER_LOCKEDMAP_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Mutex.h"
-#include <map>
-
-namespace qpid {
-namespace cluster {
-
-/**
- * A reader-writer locked thread safe map.
- */
-template <class Key, class Value>
-class LockedMap
-{
- public:
- /** Get value associated with key, returns Value() if none. */
- Value get(const Key& key) const {
- sys::RWlock::ScopedRlock r(lock);
- typename Map::const_iterator i = map.find(key);
- if (i == map.end()) return Value();
- else return i->second;
- }
-
- /** Associate value with key, overwriting any previous value for key. */
- void put(const Key& key, const Value& value) {
- sys::RWlock::ScopedWlock w(lock);
- map[key] = value;
- }
-
- /** Associate value with key if there is not already a value associated with key.
- * Returns true if the value was added.
- */
- bool add(const Key& key, const Value& value) {
- sys::RWlock::ScopedWlock w(lock);
- return map.insert(key, value).second;
- }
-
- /** Erase the value associated with key if any. Return true if a value was erased. */
- bool erase(const Key& key) {
- sys::RWlock::ScopedWlock w(lock);
- return map.erase(key);
- }
-
- private:
- typedef std::map<Key, Value> Map;
- Map map;
- mutable sys::RWlock lock;
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_LOCKEDMAP_H*/
diff --git a/cpp/src/qpid/cluster/MessageHandler.cpp b/cpp/src/qpid/cluster/MessageHandler.cpp
deleted file mode 100644
index fbbdad38a3..0000000000
--- a/cpp/src/qpid/cluster/MessageHandler.cpp
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Core.h"
-#include "MessageHandler.h"
-#include "BrokerHandler.h"
-#include "EventHandler.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/sys/Thread.h"
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-namespace cluster {
-using namespace broker;
-
-MessageHandler::MessageHandler(EventHandler& e) :
- broker(e.getCore().getBroker()),
- eventHandler(e),
- brokerHandler(e.getCore().getBrokerHandler())
-{}
-
-MessageHandler::~MessageHandler() {}
-
-MemberId MessageHandler::sender() { return eventHandler.getSender(); }
-MemberId MessageHandler::self() { return eventHandler.getSelf(); }
-
-void MessageHandler::routing(uint64_t sequence, const std::string& message) {
- MessageId id(sender(), sequence);
- boost::intrusive_ptr<Message> msg;
- if (sender() == self())
- msg = eventHandler.getCore().getRoutingMap().get(sequence);
- if (!msg) {
- framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
- msg = new Message;
- msg->decodeHeader(buf);
- msg->decodeContent(buf);
- }
- routingMap[id] = msg;
-}
-
-void MessageHandler::enqueue(uint64_t sequence, const std::string& q) {
- MessageId id(sender(), sequence);
- boost::shared_ptr<Queue> queue = broker.getQueues().find(q);
- if (!queue) throw Exception(QPID_MSG("Cluster message for unknown queue " << q));
- boost::intrusive_ptr<Message> msg = routingMap[id];
- if (!msg) throw Exception(QPID_MSG("Unknown cluster message for queue " << q));
- BrokerHandler::ScopedSuppressReplication ssr;
- // TODO aconway 2010-10-21: configable option for strict (wait
- // for CPG deliver to do local deliver) vs. loose (local deliver
- // immediately).
- queue->deliver(msg);
-}
-
-void MessageHandler::routed(uint64_t sequence) {
- MessageId id(sender(), sequence);
- routingMap.erase(id);
- eventHandler.getCore().getRoutingMap().erase(sequence);
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/MessageHandler.h b/cpp/src/qpid/cluster/MessageHandler.h
deleted file mode 100644
index 5c32bf474e..0000000000
--- a/cpp/src/qpid/cluster/MessageHandler.h
+++ /dev/null
@@ -1,70 +0,0 @@
-#ifndef QPID_CLUSTER_MESSAGEHANDLER_H
-#define QPID_CLUSTER_MESSAGEHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-// TODO aconway 2010-10-19: experimental cluster code.
-
-#include "qpid/framing/AMQP_AllOperations.h"
-#include "MessageId.h"
-#include <boost/intrusive_ptr.hpp>
-#include <map>
-
-namespace qpid {
-
-namespace broker {
-class Message;
-class Broker;
-}
-
-namespace cluster {
-class EventHandler;
-class BrokerHandler;
-
-/**
- * Handler for message disposition events.
- */
-class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
-{
- public:
- MessageHandler(EventHandler&);
- ~MessageHandler();
-
- void routing(uint64_t sequence, const std::string& message);
- void enqueue(uint64_t sequence, const std::string& queue);
- void routed(uint64_t sequence);
-
- private:
- typedef std::map<MessageId, boost::intrusive_ptr<broker::Message> > RoutingMap;
-
- MemberId sender();
- MemberId self();
-
- broker::Broker& broker;
- EventHandler& eventHandler;
- BrokerHandler& brokerHandler;
- RoutingMap routingMap;
-
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_MESSAGEHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/MessageId.cpp b/cpp/src/qpid/cluster/MessageId.cpp
deleted file mode 100644
index fbd248ed69..0000000000
--- a/cpp/src/qpid/cluster/MessageId.cpp
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "MessageId.h"
-#include <ostream>
-
-namespace qpid {
-namespace cluster {
-
-bool operator<(const MessageId& a, const MessageId& b) {
- return a.member < b.member || ((a.member == b.member) && a.sequence < b.sequence);
-}
-
-std::ostream& operator<<(std::ostream& o, const MessageId& m) {
- return o << m.member << ":" << m.sequence;
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/MessageId.h b/cpp/src/qpid/cluster/MessageId.h
deleted file mode 100644
index 16bf7ddd6d..0000000000
--- a/cpp/src/qpid/cluster/MessageId.h
+++ /dev/null
@@ -1,52 +0,0 @@
-#ifndef QPID_CLUSTER_MESSAGEID_H
-#define QPID_CLUSTER_MESSAGEID_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "types.h"
-#include <iosfwd>
-
-namespace qpid {
-namespace cluster {
-
-// TODO aconway 2010-10-20: experimental new cluster code.
-
-/** Sequence number used in message identifiers */
-typedef uint64_t SequenceNumber;
-
-/**
- * Message identifier
- */
-struct MessageId {
- MemberId member; /// Member that created the message
- SequenceNumber sequence; /// Sequence number assiged by member.
- MessageId(MemberId m=MemberId(), SequenceNumber s=0) : member(m), sequence(s) {}
-};
-
-bool operator<(const MessageId&, const MessageId&);
-
-std::ostream& operator<<(std::ostream&, const MessageId&);
-
-
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_MESSAGEID_H*/
diff --git a/cpp/src/qpid/cluster/PollerDispatch.cpp b/cpp/src/qpid/cluster/PollerDispatch.cpp
index 43c171efe8..b8d94b95a5 100644
--- a/cpp/src/qpid/cluster/PollerDispatch.cpp
+++ b/cpp/src/qpid/cluster/PollerDispatch.cpp
@@ -37,11 +37,9 @@ PollerDispatch::PollerDispatch(Cpg& c, boost::shared_ptr<sys::Poller> p,
started(false)
{}
-PollerDispatch::~PollerDispatch() { stop(); }
-
-void PollerDispatch::stop() {
- if (started) dispatchHandle.stopWatch();
- started = false;
+PollerDispatch::~PollerDispatch() {
+ if (started)
+ dispatchHandle.stopWatch();
}
void PollerDispatch::start() {
@@ -56,7 +54,6 @@ void PollerDispatch::dispatch(sys::DispatchHandle& h) {
h.rewatch();
} catch (const std::exception& e) {
QPID_LOG(critical, "Error in cluster dispatch: " << e.what());
- stop();
onError();
}
}
diff --git a/cpp/src/qpid/cluster/PollerDispatch.h b/cpp/src/qpid/cluster/PollerDispatch.h
index f16d5ece95..63801e0de9 100644
--- a/cpp/src/qpid/cluster/PollerDispatch.h
+++ b/cpp/src/qpid/cluster/PollerDispatch.h
@@ -41,7 +41,6 @@ class PollerDispatch {
~PollerDispatch();
void start();
- void stop();
private:
// Poller callbacks