summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-05-15 15:12:05 +0000
committerAlan Conway <aconway@apache.org>2009-05-15 15:12:05 +0000
commite5a0aff72c3117114d2572c3e3d6e77238b2263b (patch)
tree761f1ea0a3a4632b648da8c380a53b55533da631 /cpp/src/qpid/cluster
parent90f49326a937bc0c767b99c922e2bcf29058ef36 (diff)
downloadqpid-python-e5a0aff72c3117114d2572c3e3d6e77238b2263b.tar.gz
Undo change from r774809.
This fix is incorrect. The timer will go off in each member, and each one will send a response message which is replicated, resulting in a response from each member being enqueued rather than a single response. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@775182 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp66
-rw-r--r--cpp/src/qpid/cluster/Cluster.h7
-rw-r--r--cpp/src/qpid/cluster/EventFrame.h1
3 files changed, 4 insertions, 70 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 8a93773718..5f51bb9dad 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -98,7 +98,6 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionState.h"
-#include "qpid/framing/frame_functors.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
@@ -110,15 +109,6 @@
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
#include "qpid/framing/ClusterUpdateRequestBody.h"
-
-#include "qpid/framing/ConnectionStartOkBody.h"
-#include "qpid/framing/ConnectionTuneBody.h"
-#include "qpid/framing/ConnectionOpenBody.h"
-#include "qpid/framing/SessionAttachBody.h"
-#include "qpid/framing/SessionRequestTimeoutBody.h"
-#include "qpid/framing/SessionCommandPointBody.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
#include "qpid/management/IdAllocator.h"
@@ -135,7 +125,6 @@
#include <iterator>
#include <map>
#include <ostream>
-#include <sstream>
namespace qpid {
namespace cluster {
@@ -206,10 +195,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// Failover exchange provides membership updates to clients.
failoverExchange.reset(new FailoverExchange(this));
broker.getExchanges().registerExchange(failoverExchange);
+
+ // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
- broker.setClusterMessageHandler(*this);
- if (settings.quorum) quorum.init();
+ if (settings.quorum) quorum.init();
cpg.join(name);
// pump the CPG dispatch manually till we get initialized.
while (!initialized)
@@ -744,7 +734,6 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
}
void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
- if (state == LEFT) return;
// If we receive an errorCheck here, it's because we have processed past the point
// of the error so respond with ERROR_TYPE_NONE
assert(map.getFrameSeq() >= frameSeq);
@@ -753,53 +742,4 @@ void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock
ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
}
-size_t accumulateEncodedSize(size_t total, const AMQFrame& f) { return total + f.encodedSize(); }
-
-//
-// If the broker needs to send messages to itself in an
-// unpredictable context (e.g. management messages generated when
-// a timer expires) it uses "selfConnection"
-//
-// selfConnection behaves as a local client connection, with
-// respect to replication. However instead of mcasting data from a
-// client, data for the selfConnection is mcast directly from
-// Cluster::handle.
-//
-void Cluster::handle(const boost::intrusive_ptr<broker::Message>& msg) {
- // NOTE: don't take the lock here. We don't need to as mcast is thread safe,
- // and locking here can cause deadlock with management locks.
- //
-
- // Create self-connection on demand
- if (selfConnection == ConnectionId()) {
- QPID_LOG(debug, "Initialize self-connection");
- ostringstream name;
- name << "qpid.cluster-self." << self;
- ConnectionPtr selfc = new Connection(*this, shadowOut, name.str(), self, false, false);
- selfConnection = selfc->getId();
- vector<AMQFrame> frames;
- frames.push_back(AMQFrame((ConnectionStartOkBody())));
- frames.push_back(AMQFrame((ConnectionTuneBody(ProtocolVersion(),32767,65535,0,120))));
- frames.push_back(AMQFrame((ConnectionOpenBody())));
- frames.push_back(AMQFrame((SessionAttachBody(ProtocolVersion(), name.str(), false))));
- frames.push_back(AMQFrame(SessionRequestTimeoutBody(ProtocolVersion(), 0)));
- frames.push_back(AMQFrame(SessionCommandPointBody(ProtocolVersion(), 0, 0)));
- size_t size = accumulate(frames.begin(), frames.end(), 0, accumulateEncodedSize);
- vector<char> store(size);
- Buffer buf(store.data(), size);
- for_each(frames.begin(), frames.end(), boost::bind(&AMQFrame::encode, _1, boost::ref(buf)));
- assert(buf.available() == 0);
- selfc->decode(store.data(), size); // Multicast
- }
-
- QPID_LOG(trace, "Message to self on " << selfConnection << ": " << *msg->getFrames().getMethod());
- const FrameSet& frames = msg->getFrames();
- size_t size = accumulate(frames.begin(), frames.end(), 0, accumulateEncodedSize);
- Event e(DATA, selfConnection, size);
- Buffer buf(e.getData(), e.getSize());
- EncodeFrame encoder(buf);
- msg->getFrames().map(encoder);
- mcast.mcast(e);
-}
-
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 10d49484a8..bd401f3715 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -38,7 +38,6 @@
#include "qmf/org/apache/qpid/cluster/Cluster.h"
#include "qpid/Url.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/MessageHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/sys/Monitor.h"
@@ -65,7 +64,7 @@ class EventFrame;
/**
* Connection to the cluster
*/
-class Cluster : private Cpg::Handler, public management::Manageable, public broker::MessageHandler {
+class Cluster : private Cpg::Handler, public management::Manageable {
public:
typedef boost::intrusive_ptr<Connection> ConnectionPtr;
typedef std::vector<ConnectionPtr> ConnectionVector;
@@ -114,9 +113,6 @@ class Cluster : private Cpg::Handler, public management::Manageable, public brok
Decoder& getDecoder() { return decoder; }
ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
-
- // Called in timer threads by management to replicate messages.
- void handle(const boost::intrusive_ptr<broker::Message>&);
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -203,7 +199,6 @@ class Cluster : private Cpg::Handler, public management::Manageable, public brok
const std::string name;
Url myUrl;
const MemberId self;
- ConnectionId selfConnection;
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
qpid::management::ManagementAgent* mAgent;
diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h
index 752d32be17..e275aac7aa 100644
--- a/cpp/src/qpid/cluster/EventFrame.h
+++ b/cpp/src/qpid/cluster/EventFrame.h
@@ -43,7 +43,6 @@ struct EventFrame
bool isCluster() const { return connectionId.getNumber() == 0; }
bool isConnection() const { return connectionId.getNumber() != 0; }
- bool isControl() const { return type == CONTROL; }
bool isLastInEvent() const { return readCredit; }
MemberId getMemberId() const { return connectionId.getMember(); }