summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp140
1 files changed, 102 insertions, 38 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 030d6e34c1..0691aae711 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -24,6 +24,8 @@
#include "Cluster.h"
#include "UpdateReceiver.h"
#include "qpid/assert.h"
+#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/TxBuffer.h"
@@ -114,7 +116,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
if (!updateIn.nextShadowMgmtId.empty())
connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
updateIn.nextShadowMgmtId.clear();
- }
+ }
init();
QPID_LOG(debug, cluster << " local connection " << *this);
}
@@ -167,7 +169,7 @@ void Connection::announce(
AMQFrame frame;
while (frame.decode(buf))
connection->received(frame);
- connection->setUserId(username);
+ connection->setUserId(username);
}
// Do managment actions now that the connection is replicated.
connection->raiseConnectEvent();
@@ -214,16 +216,9 @@ void Connection::received(framing::AMQFrame& f) {
}
}
-bool Connection::checkUnsupported(const AMQBody& body) {
- std::string message;
- if (body.getMethod()) {
- switch (body.getMethod()->amqpClassId()) {
- case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
- }
- }
- if (!message.empty())
- connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
- return !message.empty();
+bool Connection::checkUnsupported(const AMQBody&) {
+ // Throw an exception for unsupported commands. Currently all are supported.
+ return false;
}
struct GiveReadCreditOnExit {
@@ -464,11 +459,21 @@ void Connection::shadowReady(
output.setSendMax(sendMax);
}
+void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) {
+ broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+ broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID
+ uint32_t index = v.first.second; // Index
+ v.second->setDtxBuffer((*record)[index]);
+}
+
+// Marks the end of the update.
void Connection::membership(const FieldTable& joiners, const FieldTable& members,
const framing::SequenceNumber& frameSeq)
{
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
updateIn.consumerNumbering.clear();
+ for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(),
+ boost::bind(&Connection::setDtxBuffer, this, _1));
closeUpdated();
cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
}
@@ -536,8 +541,16 @@ void Connection::deliveryRecord(const string& qname,
} else { // Message at original position in original queue
m = queue->find(position);
}
- if (!m.payload)
- throw Exception(QPID_MSG("deliveryRecord no update message"));
+ // FIXME aconway 2011-08-19: removed:
+ // if (!m.payload)
+ // throw Exception(QPID_MSG("deliveryRecord no update message"));
+ //
+ // It seems this could happen legitimately in the case one
+ // session browses message M, then another session acquires
+ // it. In that case the browsers delivery record is !acquired
+ // but the message is not on its original Queue. In that case
+ // we'll get a deliveryRecord with no payload for the browser.
+ //
}
broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
@@ -545,7 +558,11 @@ void Connection::deliveryRecord(const string& qname,
if (cancelled) dr.cancel(dr.getTag());
if (completed) dr.complete();
if (ended) dr.setEnded(); // Exsitance of message
- semanticState().record(dr); // Part of the session's unacked list.
+
+ if (dtxBuffer) // Record for next dtx-ack
+ dtxAckRecords.push_back(dr);
+ else
+ semanticState().record(dr); // Record on session's unacked list.
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -561,29 +578,29 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri
namespace {
- // find a StatefulQueueObserver that matches a given identifier
- class ObserverFinder {
- const std::string id;
- boost::shared_ptr<broker::QueueObserver> target;
- ObserverFinder(const ObserverFinder&) {}
- public:
- ObserverFinder(const std::string& _id) : id(_id) {}
- broker::StatefulQueueObserver *getObserver()
- {
- if (target)
- return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
- return 0;
- }
- void operator() (boost::shared_ptr<broker::QueueObserver> o)
- {
- if (!target) {
- broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
- if (p && p->getId() == id) {
- target = o;
- }
+// find a StatefulQueueObserver that matches a given identifier
+class ObserverFinder {
+ const std::string id;
+ boost::shared_ptr<broker::QueueObserver> target;
+ ObserverFinder(const ObserverFinder&) {}
+ public:
+ ObserverFinder(const std::string& _id) : id(_id) {}
+ broker::StatefulQueueObserver *getObserver()
+ {
+ if (target)
+ return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
+ return 0;
+ }
+ void operator() (boost::shared_ptr<broker::QueueObserver> o)
+ {
+ if (!target) {
+ broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+ if (p && p->getId() == id) {
+ target = o;
}
}
- };
+ }
+};
}
@@ -615,6 +632,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
void Connection::txStart() {
txBuffer.reset(new broker::TxBuffer());
}
+
void Connection::txAccept(const framing::SequenceSet& acked) {
txBuffer->enlist(boost::shared_ptr<broker::TxAccept>(
new broker::TxAccept(acked, semanticState().getUnacked())));
@@ -630,8 +648,10 @@ void Connection::txEnqueue(const std::string& queue) {
new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
}
-void Connection::txPublish(const framing::Array& queues, bool delivered) {
- boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
+void Connection::txPublish(const framing::Array& queues, bool delivered)
+{
+ boost::shared_ptr<broker::TxPublish> txPub(
+ new broker::TxPublish(getUpdateMessage().payload));
for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
txPub->deliverTo(findQueue((*i)->get<std::string>()));
txPub->delivered = delivered;
@@ -646,6 +666,50 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
semanticState().setAccumulatedAck(s);
}
+void Connection::dtxStart(const std::string& xid,
+ bool ended,
+ bool suspended,
+ bool failed,
+ bool expired)
+{
+ dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired));
+ txBuffer = dtxBuffer;
+}
+
+void Connection::dtxEnd() {
+ broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+ std::string xid = dtxBuffer->getXid();
+ if (mgr.exists(xid))
+ mgr.join(xid, dtxBuffer);
+ else
+ mgr.start(xid, dtxBuffer);
+ dtxBuffer.reset();
+ txBuffer.reset();
+}
+
+// Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords
+void Connection::dtxAck() {
+ dtxBuffer->enlist(
+ boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords)));
+ dtxAckRecords.clear();
+}
+
+void Connection::dtxBufferRef(const std::string& xid, uint32_t index) {
+ // Save the association between DtxBuffer and session so we can
+ // set the DtxBuffer on the session at the end of the update
+ // when the DtxManager has been replicated.
+ updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState();
+}
+
+// Sent at end of work record.
+void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout)
+{
+ broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+ if (timeout) mgr.setTimeout(xid, timeout);
+ if (prepared) mgr.prepare(xid);
+}
+
+
void Connection::exchange(const std::string& encoded) {
Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);