diff options
| author | Alan Conway <aconway@apache.org> | 2010-10-27 18:01:27 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-10-27 18:01:27 +0000 |
| commit | 326dddd0d0d48401d14ca93044b3fc0e35ad87d9 (patch) | |
| tree | 019a45480d8cdf832f62d7176b7a10a5d0971535 /cpp/src/qpid/cluster | |
| parent | aae11121cfcf891b2365241141f9ab9cb47d3024 (diff) | |
| download | qpid-python-326dddd0d0d48401d14ca93044b3fc0e35ad87d9.tar.gz | |
Revert experimental cluster code, too close to 0.8 release.
Reverts revisions:
r1023966 "Introduce broker::Cluster interface."
r1024275 "Fix compile error: outline set/getCluster fucntions on Broker."
r1027210 "New cluster: core framework and initial implementation of enqueue logic."
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1028055 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/BrokerHandler.cpp | 96 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/BrokerHandler.h | 86 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster2Plugin.cpp | 65 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Core.cpp | 68 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Core.h | 95 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/EventHandler.cpp | 89 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/EventHandler.h | 85 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/LockedMap.h | 73 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/MessageHandler.cpp | 82 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/MessageHandler.h | 70 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/MessageId.cpp | 35 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/MessageId.h | 52 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/PollerDispatch.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/PollerDispatch.h | 1 |
14 files changed, 3 insertions, 903 deletions
diff --git a/cpp/src/qpid/cluster/BrokerHandler.cpp b/cpp/src/qpid/cluster/BrokerHandler.cpp deleted file mode 100644 index f0b930a221..0000000000 --- a/cpp/src/qpid/cluster/BrokerHandler.cpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Core.h" -#include "BrokerHandler.h" -#include "qpid/framing/ClusterMessageRoutingBody.h" -#include "qpid/framing/ClusterMessageRoutedBody.h" -#include "qpid/framing/ClusterMessageEnqueueBody.h" -#include "qpid/sys/Thread.h" -#include "qpid/broker/QueuedMessage.h" -#include "qpid/broker/Queue.h" -#include "qpid/framing/Buffer.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace cluster { - -using namespace framing; -using namespace broker; - -namespace { -// noReplicate means the current thread is handling a message -// received from the cluster so it should not be replciated. -QPID_TSS bool noReplicate = false; - -// Sequence number of the message currently being routed. -// 0 if we are not currently routing a message. -QPID_TSS SequenceNumber routeSeq = 0; -} - -BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() { - assert(!noReplicate); - noReplicate = true; -} - -BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() { - assert(noReplicate); - noReplicate = false; -} - -BrokerHandler::BrokerHandler(Core& c) : core(c) {} - -SequenceNumber BrokerHandler::nextSequenceNumber() { - SequenceNumber s = ++sequence; - if (!s) s = ++sequence; // Avoid 0 on wrap-around. - return s; -} - -void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { } - -bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg) -{ - if (noReplicate) return true; - if (!routeSeq) { // This is the first enqueue, so send the message - routeSeq = nextSequenceNumber(); - // FIXME aconway 2010-10-20: replicate message in fixed size buffers. - std::string data(msg->encodedSize(),char()); - framing::Buffer buf(&data[0], data.size()); - msg->encode(buf); - core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), routeSeq, data)); - core.getRoutingMap().put(routeSeq, msg); - } - core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), routeSeq, queue.getName())); - // TODO aconway 2010-10-21: configable option for strict (wait - // for CPG deliver to do local deliver) vs. loose (local deliver - // immediately). - return false; -} - -void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) { - if (routeSeq) { // we enqueued at least one message. - core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), routeSeq)); - // Note: routingMap is cleaned up on CPG delivery in MessageHandler. - routeSeq = 0; - } -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/BrokerHandler.h b/cpp/src/qpid/cluster/BrokerHandler.h deleted file mode 100644 index 1a61d1fc11..0000000000 --- a/cpp/src/qpid/cluster/BrokerHandler.h +++ /dev/null @@ -1,86 +0,0 @@ -#ifndef QPID_CLUSTER_BROKERHANDLER_H -#define QPID_CLUSTER_BROKERHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/Cluster.h" -#include "qpid/sys/AtomicValue.h" - -namespace qpid { -namespace cluster { -class Core; - -// TODO aconway 2010-10-19: experimental cluster code. - -/** - * Implements broker::Cluster interface, handles events in broker code. - */ -class BrokerHandler : public broker::Cluster -{ - public: - /** Suppress replication while in scope. - * Used to prevent re-replication of messages received from the cluster. - */ - struct ScopedSuppressReplication { - ScopedSuppressReplication(); - ~ScopedSuppressReplication(); - }; - - BrokerHandler(Core&); - - // FIXME aconway 2010-10-20: implement all points. - - // Messages - - void routing(const boost::intrusive_ptr<broker::Message>&); - bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&); - void routed(const boost::intrusive_ptr<broker::Message>&); - void acquire(const broker::QueuedMessage&) {} - void accept(const broker::QueuedMessage&) {} - void reject(const broker::QueuedMessage&) {} - void rejected(const broker::QueuedMessage&) {} - void release(const broker::QueuedMessage&) {} - void drop(const broker::QueuedMessage&) {} - - // Consumers - - void consume(const broker::Queue&, size_t) {} - void cancel(const broker::Queue&, size_t) {} - - // Wiring - - void create(const broker::Queue&) {} - void destroy(const broker::Queue&) {} - void create(const broker::Exchange&) {} - void destroy(const broker::Exchange&) {} - void bind(const broker::Queue&, const broker::Exchange&, - const std::string&, const framing::FieldTable&) {} - - private: - SequenceNumber nextSequenceNumber(); - - Core& core; - sys::AtomicValue<SequenceNumber> sequence; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_BROKERHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/Cluster2Plugin.cpp b/cpp/src/qpid/cluster/Cluster2Plugin.cpp deleted file mode 100644 index 28b7dcec2e..0000000000 --- a/cpp/src/qpid/cluster/Cluster2Plugin.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <qpid/Options.h> -#include <qpid/broker/Broker.h> -#include "Core.h" - -namespace qpid { -namespace cluster { -using broker::Broker; - -// TODO aconway 2010-10-19: experimental new cluster code. - -/** - * Plugin for the cluster. - */ -struct Cluster2Plugin : public Plugin { - struct Opts : public Options { - Core::Settings& settings; - Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) { - addOptions() - ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join"); - // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h - } - }; - - Core::Settings settings; - Opts options; - Core* core; // Core deletes itself on shutdown. - - Cluster2Plugin() : options(settings), core(0) {} - - Options* getOptions() { return &options; } - - void earlyInitialize(Plugin::Target& target) { - if (settings.name.empty()) return; - Broker* broker = dynamic_cast<Broker*>(&target); - if (!broker) return; - core = new Core(settings, *broker); - } - - void initialize(Plugin::Target& target) { - Broker* broker = dynamic_cast<Broker*>(&target); - if (broker && core) core->initialize(); - } -}; - -static Cluster2Plugin instance; // Static initialization. - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Core.cpp b/cpp/src/qpid/cluster/Core.cpp deleted file mode 100644 index e4127fa443..0000000000 --- a/cpp/src/qpid/cluster/Core.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Core.h" -#include "EventHandler.h" -#include "BrokerHandler.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/SignalHandler.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/Buffer.h" -#include "qpid/log/Statement.h" -#include <sys/uio.h> // For iovec - -namespace qpid { -namespace cluster { - -Core::Core(const Settings& s, broker::Broker& b) : - broker(b), - eventHandler(new EventHandler(*this)) -{ - std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this)); - brokerHandler = bh.get(); - // BrokerHandler belongs to Broker - broker.setCluster(std::auto_ptr<broker::Cluster>(bh)); - // FIXME aconway 2010-10-20: ownership of BrokerHandler, shutdown issues. - eventHandler->getCpg().join(s.name); -} - -void Core::initialize() {} - -void Core::fatal() { - // FIXME aconway 2010-10-20: error handling - assert(0); - broker::SignalHandler::shutdown(); -} - -void Core::mcast(const framing::AMQBody& body) { - QPID_LOG(trace, "multicast: " << body); - // FIXME aconway 2010-10-20: use Multicaster, or bring in its features. - // here we multicast Frames rather than Events. - framing::AMQFrame f(body); - std::string data(f.encodedSize(), char()); - framing::Buffer buf(&data[0], data.size()); - f.encode(buf); - iovec iov = { buf.getPointer(), buf.getSize() }; - while (!eventHandler->getCpg().mcast(&iov, 1)) - ::usleep(1000); // FIXME aconway 2010-10-20: flow control -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Core.h b/cpp/src/qpid/cluster/Core.h deleted file mode 100644 index 9976c1c906..0000000000 --- a/cpp/src/qpid/cluster/Core.h +++ /dev/null @@ -1,95 +0,0 @@ -#ifndef QPID_CLUSTER_CORE_H -#define QPID_CLUSTER_CORE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <string> -#include <memory> - -#include "Cpg.h" -#include "MessageId.h" -#include "LockedMap.h" -#include <qpid/broker/QueuedMessage.h> - -// TODO aconway 2010-10-19: experimental cluster code. - -namespace qpid { - -namespace framing{ -class AMQBody; -} - -namespace broker { -class Broker; -} - -namespace cluster { -class EventHandler; -class BrokerHandler; - -/** - * Cluster core state machine. - * Holds together the various objects that implement cluster behavior, - * and holds state that is shared by multiple components. - * - * Thread safe: called from broker connection threads and CPG dispatch threads. - */ -class Core -{ - public: - /** Configuration settings */ - struct Settings { - std::string name; - }; - - typedef LockedMap<SequenceNumber, boost::intrusive_ptr<broker::Message> > - SequenceMessageMap; - - /** Constructed during Plugin::earlyInitialize() */ - Core(const Settings&, broker::Broker&); - - /** Called during Plugin::initialize() */ - void initialize(); - - /** Shut down broker due to fatal error. Caller should log a critical message */ - void fatal(); - - /** Multicast an event */ - void mcast(const framing::AMQBody&); - - broker::Broker& getBroker() { return broker; } - EventHandler& getEventHandler() { return *eventHandler; } - BrokerHandler& getBrokerHandler() { return *brokerHandler; } - - /** Map of messages that are currently being routed. - * Used to pass messages being routed from BrokerHandler to MessageHandler - */ - SequenceMessageMap& getRoutingMap() { return routingMap; } - private: - broker::Broker& broker; - std::auto_ptr<EventHandler> eventHandler; // Handles CPG events. - BrokerHandler* brokerHandler; // Handles broker events. - SequenceMessageMap routingMap; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CORE_H*/ diff --git a/cpp/src/qpid/cluster/EventHandler.cpp b/cpp/src/qpid/cluster/EventHandler.cpp deleted file mode 100644 index 95ae285b06..0000000000 --- a/cpp/src/qpid/cluster/EventHandler.cpp +++ /dev/null @@ -1,89 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "MessageHandler.h" -#include "EventHandler.h" -#include "Core.h" -#include "types.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AllInvoker.h" -#include "qpid/broker/Broker.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace cluster { - -EventHandler::EventHandler(Core& c) : - core(c), - cpg(*this), // FIXME aconway 2010-10-20: belongs on Core. - dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)), - self(cpg.self()), - messageHandler(new MessageHandler(*this)) -{ - dispatcher.start(); // FIXME aconway 2010-10-20: later in initialization? -} - -EventHandler::~EventHandler() {} - -// Deliver CPG message. -void EventHandler::deliver( - cpg_handle_t /*handle*/, - const cpg_name* /*group*/, - uint32_t nodeid, - uint32_t pid, - void* msg, - int msg_len) -{ - sender = MemberId(nodeid, pid); - framing::Buffer buf(static_cast<char*>(msg), msg_len); - framing::AMQFrame frame; - while (buf.available()) { - frame.decode(buf); - assert(frame.getBody()); - QPID_LOG(trace, "cluster deliver: " << *frame.getBody()); - try { - invoke(*frame.getBody()); - } - catch (const std::exception& e) { - // Note: exceptions are assumed to be survivable, - // fatal errors should log a message and call Core::fatal. - QPID_LOG(error, e.what()); - } - } -} - -void EventHandler::invoke(const framing::AMQBody& body) { - if (framing::invoke(*messageHandler, body).wasHandled()) return; -} - -// CPG config-change callback. -void EventHandler::configChange ( - cpg_handle_t /*handle*/, - const cpg_name */*group*/, - const cpg_address */*members*/, int /*nMembers*/, - const cpg_address */*left*/, int /*nLeft*/, - const cpg_address */*joined*/, int /*nJoined*/) -{ - // FIXME aconway 2010-10-20: TODO -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/EventHandler.h b/cpp/src/qpid/cluster/EventHandler.h deleted file mode 100644 index 5645c3980b..0000000000 --- a/cpp/src/qpid/cluster/EventHandler.h +++ /dev/null @@ -1,85 +0,0 @@ -#ifndef QPID_CLUSTER_EVENTHANDLER_H -#define QPID_CLUSTER_EVENTHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -// TODO aconway 2010-10-19: experimental cluster code. - -#include "types.h" -#include "Cpg.h" -#include "PollerDispatch.h" - -namespace qpid { - -namespace framing { -class AMQBody; -} - -namespace cluster { -class Core; -class MessageHandler; - -/** - * Dispatch events received from CPG. - * Thread unsafe: only called in CPG deliver thread context. - */ -class EventHandler : public Cpg::Handler -{ - public: - EventHandler(Core&); - ~EventHandler(); - - void deliver( // CPG deliver callback. - cpg_handle_t /*handle*/, - const struct cpg_name *group, - uint32_t /*nodeid*/, - uint32_t /*pid*/, - void* /*msg*/, - int /*msg_len*/); - - void configChange( // CPG config change callback. - cpg_handle_t /*handle*/, - const struct cpg_name */*group*/, - const struct cpg_address */*members*/, int /*nMembers*/, - const struct cpg_address */*left*/, int /*nLeft*/, - const struct cpg_address */*joined*/, int /*nJoined*/ - ); - - - MemberId getSender() { return sender; } - MemberId getSelf() { return self; } - Core& getCore() { return core; } - Cpg& getCpg() { return cpg; } - - private: - void invoke(const framing::AMQBody& body); - - Core& core; - Cpg cpg; - PollerDispatch dispatcher; - MemberId sender; // sender of current event. - MemberId self; - std::auto_ptr<MessageHandler> messageHandler; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_EVENTHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/LockedMap.h b/cpp/src/qpid/cluster/LockedMap.h deleted file mode 100644 index 0736e7ac35..0000000000 --- a/cpp/src/qpid/cluster/LockedMap.h +++ /dev/null @@ -1,73 +0,0 @@ -#ifndef QPID_CLUSTER_LOCKEDMAP_H -#define QPID_CLUSTER_LOCKEDMAP_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Mutex.h" -#include <map> - -namespace qpid { -namespace cluster { - -/** - * A reader-writer locked thread safe map. - */ -template <class Key, class Value> -class LockedMap -{ - public: - /** Get value associated with key, returns Value() if none. */ - Value get(const Key& key) const { - sys::RWlock::ScopedRlock r(lock); - typename Map::const_iterator i = map.find(key); - if (i == map.end()) return Value(); - else return i->second; - } - - /** Associate value with key, overwriting any previous value for key. */ - void put(const Key& key, const Value& value) { - sys::RWlock::ScopedWlock w(lock); - map[key] = value; - } - - /** Associate value with key if there is not already a value associated with key. - * Returns true if the value was added. - */ - bool add(const Key& key, const Value& value) { - sys::RWlock::ScopedWlock w(lock); - return map.insert(key, value).second; - } - - /** Erase the value associated with key if any. Return true if a value was erased. */ - bool erase(const Key& key) { - sys::RWlock::ScopedWlock w(lock); - return map.erase(key); - } - - private: - typedef std::map<Key, Value> Map; - Map map; - mutable sys::RWlock lock; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_LOCKEDMAP_H*/ diff --git a/cpp/src/qpid/cluster/MessageHandler.cpp b/cpp/src/qpid/cluster/MessageHandler.cpp deleted file mode 100644 index fbbdad38a3..0000000000 --- a/cpp/src/qpid/cluster/MessageHandler.cpp +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Core.h" -#include "MessageHandler.h" -#include "BrokerHandler.h" -#include "EventHandler.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/Queue.h" -#include "qpid/framing/Buffer.h" -#include "qpid/sys/Thread.h" -#include <boost/shared_ptr.hpp> - -namespace qpid { -namespace cluster { -using namespace broker; - -MessageHandler::MessageHandler(EventHandler& e) : - broker(e.getCore().getBroker()), - eventHandler(e), - brokerHandler(e.getCore().getBrokerHandler()) -{} - -MessageHandler::~MessageHandler() {} - -MemberId MessageHandler::sender() { return eventHandler.getSender(); } -MemberId MessageHandler::self() { return eventHandler.getSelf(); } - -void MessageHandler::routing(uint64_t sequence, const std::string& message) { - MessageId id(sender(), sequence); - boost::intrusive_ptr<Message> msg; - if (sender() == self()) - msg = eventHandler.getCore().getRoutingMap().get(sequence); - if (!msg) { - framing::Buffer buf(const_cast<char*>(&message[0]), message.size()); - msg = new Message; - msg->decodeHeader(buf); - msg->decodeContent(buf); - } - routingMap[id] = msg; -} - -void MessageHandler::enqueue(uint64_t sequence, const std::string& q) { - MessageId id(sender(), sequence); - boost::shared_ptr<Queue> queue = broker.getQueues().find(q); - if (!queue) throw Exception(QPID_MSG("Cluster message for unknown queue " << q)); - boost::intrusive_ptr<Message> msg = routingMap[id]; - if (!msg) throw Exception(QPID_MSG("Unknown cluster message for queue " << q)); - BrokerHandler::ScopedSuppressReplication ssr; - // TODO aconway 2010-10-21: configable option for strict (wait - // for CPG deliver to do local deliver) vs. loose (local deliver - // immediately). - queue->deliver(msg); -} - -void MessageHandler::routed(uint64_t sequence) { - MessageId id(sender(), sequence); - routingMap.erase(id); - eventHandler.getCore().getRoutingMap().erase(sequence); -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/MessageHandler.h b/cpp/src/qpid/cluster/MessageHandler.h deleted file mode 100644 index 5c32bf474e..0000000000 --- a/cpp/src/qpid/cluster/MessageHandler.h +++ /dev/null @@ -1,70 +0,0 @@ -#ifndef QPID_CLUSTER_MESSAGEHANDLER_H -#define QPID_CLUSTER_MESSAGEHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -// TODO aconway 2010-10-19: experimental cluster code. - -#include "qpid/framing/AMQP_AllOperations.h" -#include "MessageId.h" -#include <boost/intrusive_ptr.hpp> -#include <map> - -namespace qpid { - -namespace broker { -class Message; -class Broker; -} - -namespace cluster { -class EventHandler; -class BrokerHandler; - -/** - * Handler for message disposition events. - */ -class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler -{ - public: - MessageHandler(EventHandler&); - ~MessageHandler(); - - void routing(uint64_t sequence, const std::string& message); - void enqueue(uint64_t sequence, const std::string& queue); - void routed(uint64_t sequence); - - private: - typedef std::map<MessageId, boost::intrusive_ptr<broker::Message> > RoutingMap; - - MemberId sender(); - MemberId self(); - - broker::Broker& broker; - EventHandler& eventHandler; - BrokerHandler& brokerHandler; - RoutingMap routingMap; - -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_MESSAGEHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/MessageId.cpp b/cpp/src/qpid/cluster/MessageId.cpp deleted file mode 100644 index fbd248ed69..0000000000 --- a/cpp/src/qpid/cluster/MessageId.cpp +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "MessageId.h" -#include <ostream> - -namespace qpid { -namespace cluster { - -bool operator<(const MessageId& a, const MessageId& b) { - return a.member < b.member || ((a.member == b.member) && a.sequence < b.sequence); -} - -std::ostream& operator<<(std::ostream& o, const MessageId& m) { - return o << m.member << ":" << m.sequence; -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/MessageId.h b/cpp/src/qpid/cluster/MessageId.h deleted file mode 100644 index 16bf7ddd6d..0000000000 --- a/cpp/src/qpid/cluster/MessageId.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef QPID_CLUSTER_MESSAGEID_H -#define QPID_CLUSTER_MESSAGEID_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "types.h" -#include <iosfwd> - -namespace qpid { -namespace cluster { - -// TODO aconway 2010-10-20: experimental new cluster code. - -/** Sequence number used in message identifiers */ -typedef uint64_t SequenceNumber; - -/** - * Message identifier - */ -struct MessageId { - MemberId member; /// Member that created the message - SequenceNumber sequence; /// Sequence number assiged by member. - MessageId(MemberId m=MemberId(), SequenceNumber s=0) : member(m), sequence(s) {} -}; - -bool operator<(const MessageId&, const MessageId&); - -std::ostream& operator<<(std::ostream&, const MessageId&); - - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_MESSAGEID_H*/ diff --git a/cpp/src/qpid/cluster/PollerDispatch.cpp b/cpp/src/qpid/cluster/PollerDispatch.cpp index 43c171efe8..b8d94b95a5 100644 --- a/cpp/src/qpid/cluster/PollerDispatch.cpp +++ b/cpp/src/qpid/cluster/PollerDispatch.cpp @@ -37,11 +37,9 @@ PollerDispatch::PollerDispatch(Cpg& c, boost::shared_ptr<sys::Poller> p, started(false) {} -PollerDispatch::~PollerDispatch() { stop(); } - -void PollerDispatch::stop() { - if (started) dispatchHandle.stopWatch(); - started = false; +PollerDispatch::~PollerDispatch() { + if (started) + dispatchHandle.stopWatch(); } void PollerDispatch::start() { @@ -56,7 +54,6 @@ void PollerDispatch::dispatch(sys::DispatchHandle& h) { h.rewatch(); } catch (const std::exception& e) { QPID_LOG(critical, "Error in cluster dispatch: " << e.what()); - stop(); onError(); } } diff --git a/cpp/src/qpid/cluster/PollerDispatch.h b/cpp/src/qpid/cluster/PollerDispatch.h index f16d5ece95..63801e0de9 100644 --- a/cpp/src/qpid/cluster/PollerDispatch.h +++ b/cpp/src/qpid/cluster/PollerDispatch.h @@ -41,7 +41,6 @@ class PollerDispatch { ~PollerDispatch(); void start(); - void stop(); private: // Poller callbacks |
