summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-05 15:22:47 +0000
committerAlan Conway <aconway@apache.org>2008-11-05 15:22:47 +0000
commitad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b (patch)
tree9ee2e8cdcad566d355233da8b4a45b92c9f6ed3f /cpp/src/qpid/cluster
parentd3f652de187cac449e1fae4e00fce59c204f020a (diff)
downloadqpid-python-ad6e0ac1bfa48ef5bdc26d979558c83f52e8cd5b.tar.gz
Cluster: replicate transaction state to newcomers.
constants.rb: generate type code constants for AMQP types. Useful with Array. framing/Array: - added some std:::vector like functions & typedefs. - use TypeCode enums, human readable ostream << operator. rubygen - fixed error in generation of exceptions for bad codes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp3
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp65
-rw-r--r--cpp/src/qpid/cluster/Connection.h10
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp77
-rw-r--r--cpp/src/qpid/cluster/DumpClient.h6
5 files changed, 126 insertions, 35 deletions
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 9526a33ac6..fad0563872 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -90,7 +90,4 @@ struct ClusterPlugin : public Plugin {
static ClusterPlugin instance; // Static initialization.
-// For test purposes.
-Cluster& getGlobalCluster() { assert(instance.cluster); return *instance.cluster; }
-
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index ada26ab2fb..513816735d 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -24,7 +24,11 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/RecoveredEnqueue.h"
+#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
@@ -36,7 +40,7 @@
#include <boost/current_function.hpp>
-// FIXME aconway 2008-11-03:
+// TODO aconway 2008-11-03:
//
// Disproportionate amount of code here is dedicated to receiving a
// brain-dump when joining a cluster and building initial
@@ -113,7 +117,6 @@ bool Connection::checkUnsupported(const AMQBody& body) {
std::string message;
if (body.getMethod()) {
switch (body.getMethod()->amqpClassId()) {
- case TX_CLASS_ID: message = "TX transactions are not currently supported by cluster."; break;
case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
}
}
@@ -122,13 +125,13 @@ bool Connection::checkUnsupported(const AMQBody& body) {
if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster.";
}
if (!message.empty())
- connection.close(execution::ERROR_CODE_INTERNAL_ERROR, message, 0, 0);
+ connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message, 0, 0);
return !message.empty();
}
// Delivered from cluster.
void Connection::delivered(framing::AMQFrame& f) {
- QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f);
+ QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f);
assert(!catchUp);
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol.
@@ -247,11 +250,15 @@ bool Connection::isDumped() const {
return self.first == cluster.getId() && self.second == 0;
}
+
+shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
+ shared_ptr<broker::Queue> queue = cluster.getBroker().getQueues().find(qname);
+ if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " << qname));
+ return queue;
+}
+
broker::QueuedMessage Connection::getDumpMessage() {
- // Get a message from the DUMP queue.
- broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
- if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue"));
- broker::QueuedMessage m = dumpQueue->get();
+ broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get();
if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue"));
return m;
}
@@ -267,14 +274,11 @@ void Connection::deliveryRecord(const string& qname,
bool ended,
bool windowing)
{
- broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname);
- if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname));
broker::QueuedMessage m;
+ broker::Queue::shared_ptr queue = findQueue(qname);
if (!ended) { // Has a message
- if (acquired) { // Message at front of dump queue
- broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
- m = dumpQueue->get();
- }
+ if (acquired) // Message is on the dump queue
+ m = getDumpMessage();
else // Message at original position in original queue
m = queue->find(position);
if (!m.payload)
@@ -286,8 +290,7 @@ void Connection::deliveryRecord(const string& qname,
if (cancelled) dr.cancel(dr.getTag());
if (completed) dr.complete();
if (ended) dr.setEnded(); // Exsitance of message
-
- semanticState().record(dr);
+ semanticState().record(dr); // Part of the session's unacked list.
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -304,6 +307,36 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
}
+void Connection::txStart() {
+ txBuffer = make_shared_ptr(new broker::TxBuffer());
+}
+void Connection::txAccept(const framing::SequenceSet& acked) {
+ txBuffer->enlist(make_shared_ptr(new broker::TxAccept(acked, semanticState().getUnacked())));
+}
+
+void Connection::txDequeue(const std::string& queue) {
+ txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload)));
+}
+
+void Connection::txEnqueue(const std::string& queue) {
+ txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload)));
+}
+
+void Connection::txPublish(const framing::Array& queues, bool delivered) {
+ boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getDumpMessage().payload));
+ for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
+ txPub->deliverTo(findQueue((*i)->get<std::string>()));
+ txPub->delivered = delivered;
+ txBuffer->enlist(txPub);
+}
+
+void Connection::txEnd() {
+ semanticState().setTxBuffer(txBuffer);
+}
+
+void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
+ semanticState().setAccumulatedAck(s);
+}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 331ac33ab0..2eafa90f32 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -125,12 +125,21 @@ class Connection :
void queuePosition(const std::string&, const framing::SequenceNumber&);
+ void txStart();
+ void txAccept(const framing::SequenceSet&);
+ void txDequeue(const std::string&);
+ void txEnqueue(const std::string&);
+ void txPublish(const qpid::framing::Array&, bool);
+ void txEnd();
+ void accumulatedAck(const qpid::framing::SequenceSet&);
+
private:
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
void deliverDoOutput(uint32_t requested);
void sendDoOutput();
+ boost::shared_ptr<broker::Queue> findQueue(const std::string& qname);
broker::SessionState& sessionState();
broker::SemanticState& semanticState();
broker::QueuedMessage getDumpMessage();
@@ -148,6 +157,7 @@ class Connection :
framing::SequenceNumber mcastSeq;
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
+ boost::shared_ptr<broker::TxBuffer> txBuffer;
friend std::ostream& operator<<(std::ostream&, const Connection&);
};
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index a2860f6f32..bb3cfdfa56 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -32,6 +32,12 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/broker/TxOpVisitor.h"
+#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -43,7 +49,7 @@
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
-
+#include <algorithm>
namespace qpid {
namespace cluster {
@@ -198,7 +204,7 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConn
shadowConnection = catchUpConnection();
broker::Connection& bc = dumpConnection->getBrokerConnection();
- // FIXME aconway 2008-10-20: What authentication info to reconnect?
+ // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
@@ -227,7 +233,10 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
QPID_LOG(debug, dumperId << " dumping unacknowledged messages.");
- ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1));
+ broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
+ std::for_each(drs.begin(), drs.end(), boost::bind(&DumpClient::dumpUnacked, this, _1));
+
+ dumpTxState(ss->getSemanticState()); // Tx transaction state.
// Adjust for command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
@@ -283,22 +292,12 @@ void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) {
}
void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
- dumpDeliveryRecordMessage(dr);
- dumpDeliveryRecord(dr);
-}
-
-void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) {
- // Dump the message associated with a dr if need be.
if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
// dumpees queue, put it on the dump queue for dumpee to pick up.
//
MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage());
}
-}
-
-void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) {
- // Assumes the associated message has already been dumped (if needed)
ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
dr.getMessage().position,
@@ -312,4 +311,56 @@ void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) {
dr.isWindowing());
}
+class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper {
+ public:
+ TxOpDumper(DumpClient& dc, client::AsyncSession s)
+ : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s) {}
+
+ void operator()(const broker::DtxAck& ) {
+ throw InternalErrorException("DTX transactions not currently supported by cluster.");
+ }
+
+ void operator()(const broker::RecoveredDequeue& rdeq) {
+ dumpMessage(rdeq.getMessage());
+ proxy.txEnqueue(rdeq.getQueue()->getName());
+ }
+
+ void operator()(const broker::RecoveredEnqueue& renq) {
+ dumpMessage(renq.getMessage());
+ proxy.txEnqueue(renq.getQueue()->getName());
+ }
+
+ void operator()(const broker::TxAccept& txAccept) {
+ proxy.txAccept(txAccept.getAcked());
+ }
+
+ void operator()(const broker::TxPublish& txPub) {
+ dumpMessage(txPub.getMessage());
+ typedef std::list<Queue::shared_ptr> QueueList;
+ const QueueList& qlist = txPub.getQueues();
+ Array qarray(TYPE_CODE_STR8);
+ for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
+ qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+ proxy.txPublish(qarray, txPub.delivered);
+ }
+
+ private:
+ DumpClient& parent;
+ client::AsyncSession session;
+ ClusterConnectionProxy proxy;
+};
+
+void DumpClient::dumpTxState(broker::SemanticState& s) {
+ QPID_LOG(debug, dumperId << " dumping TX transaction state.");
+ ClusterConnectionProxy proxy(shadowSession);
+ proxy.accumulatedAck(s.getAccumulatedAck());
+ broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
+ if (txBuffer) {
+ proxy.txStart();
+ TxOpDumper dumper(*this, shadowSession);
+ txBuffer->accept(dumper);
+ proxy.txEnd();
+ }
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h
index 716e7dcc3a..23676e7646 100644
--- a/cpp/src/qpid/cluster/DumpClient.h
+++ b/cpp/src/qpid/cluster/DumpClient.h
@@ -71,6 +71,8 @@ class DumpClient : public sys::Runnable {
void dump();
void run(); // Will delete this when finished.
+ void dumpUnacked(const broker::DeliveryRecord&);
+
private:
void dumpQueue(const boost::shared_ptr<broker::Queue>&);
void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
@@ -79,10 +81,8 @@ class DumpClient : public sys::Runnable {
void dumpBinding(const std::string& queue, const broker::QueueBinding& binding);
void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
void dumpSession(broker::SessionHandler& s);
+ void dumpTxState(broker::SemanticState& s);
void dumpConsumer(const broker::SemanticState::ConsumerImpl*);
- void dumpUnacked(const broker::DeliveryRecord&);
- void dumpDeliveryRecord(const broker::DeliveryRecord&);
- void dumpDeliveryRecordMessage(const broker::DeliveryRecord&);
MemberId dumperId;
MemberId dumpeeId;