summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/CMakeLists.txt6
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp3
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionSettings.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp10
-rw-r--r--qpid/cpp/src/qpid/ha/Event.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/Event.h94
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp54
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h22
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp307
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h133
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp15
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp21
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h7
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp45
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h50
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp273
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.h136
-rw-r--r--qpid/cpp/src/qpid/ha/types.cpp1
-rw-r--r--qpid/cpp/src/qpid/sys/SocketTransport.cpp2
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py206
24 files changed, 35 insertions, 1367 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index cddab6b5dd..41dbb76b16 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -552,8 +552,6 @@ if (BUILD_HA)
qpid/ha/Primary.cpp
qpid/ha/Primary.h
qpid/ha/PrimaryQueueLimits.h
- qpid/ha/PrimaryTxObserver.cpp
- qpid/ha/PrimaryTxObserver.h
qpid/ha/QueueGuard.cpp
qpid/ha/QueueGuard.h
qpid/ha/QueueReplicator.cpp
@@ -571,10 +569,6 @@ if (BUILD_HA)
qpid/ha/StandAlone.h
qpid/ha/StatusCheck.cpp
qpid/ha/StatusCheck.h
- qpid/ha/TxReplicatingSubscription.cpp
- qpid/ha/TxReplicatingSubscription.h
- qpid/ha/TxReplicator.cpp
- qpid/ha/TxReplicator.h
qpid/ha/types.cpp
qpid/ha/types.h
)
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 9c215d197f..3873e41cc9 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -74,7 +74,7 @@ QueueFlowLimit::QueueFlowLimit(const std::string& _queueName,
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
flowStopped(false), count(0), size(0), broker(0)
{
- QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount
+ QPID_LOG(debug, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount
<< ", flowResumeCount=" << flowResumeCount
<< ", flowStopSize=" << flowStopSize << ", flowResumeSize=" << flowResumeSize );
}
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp
index 518c2fa9d0..4682c1f917 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp
@@ -411,7 +411,7 @@ bool Connection::doOutput() {
}
void Connection::sendHeartbeat() {
- adapter.heartbeat();
+ requestIOProcessing(boost::bind(&ConnectionHandler::heartbeat, &adapter));
}
void Connection::closeChannel(uint16_t id) {
diff --git a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
index 6ff624ef75..0ae4d8356d 100644
--- a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
@@ -263,8 +263,7 @@ void SslProtocolFactory::establishedCommon(sys::Poller::shared_ptr poller,
const qpid::sys::Socket& s) {
if (tcpNoDelay) {
s.setTcpNoDelay();
- QPID_LOG(info,
- "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+ QPID_LOG(debug, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
async->init(aio, brokerTimer, maxNegotiateTime);
diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
index 7d41b48abd..3bfde0656b 100644
--- a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
@@ -51,7 +51,7 @@ void ConnectionSettings::configureSocket(qpid::sys::Socket& socket) const
{
if (tcpNoDelay) {
socket.setTcpNoDelay();
- QPID_LOG(info, "Set TCP_NODELAY");
+ QPID_LOG(debug, "Set TCP_NODELAY");
}
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index a62080932d..d664f13893 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -21,7 +21,6 @@
#include "BrokerReplicator.h"
#include "HaBroker.h"
#include "QueueReplicator.h"
-#include "TxReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/Queue.h"
@@ -772,10 +771,7 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
const boost::shared_ptr<Queue>& queue)
{
if (replicationTest.getLevel(*queue) == ALL) {
- if (TxReplicator::isTxQueue(queue->getName()))
- return TxReplicator::create(haBroker, queue, link);
- else
- return QueueReplicator::create(haBroker, queue, link);
+ return QueueReplicator::create(haBroker, queue, link);
}
return boost::shared_ptr<QueueReplicator>();
}
@@ -886,10 +882,6 @@ void BrokerReplicator::disconnectedQueueReplicator(
const boost::shared_ptr<QueueReplicator>& qr)
{
qr->disconnect();
- if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
- // Transactions are aborted on failover so clean up tx-queues
- deleteQueue(qr->getQueue()->getName());
- }
}
// Called by ConnectionObserver::disconnected, disconnected from the network side.
diff --git a/qpid/cpp/src/qpid/ha/Event.cpp b/qpid/cpp/src/qpid/ha/Event.cpp
index ff336d0b2b..8c1b52ea7a 100644
--- a/qpid/cpp/src/qpid/ha/Event.cpp
+++ b/qpid/cpp/src/qpid/ha/Event.cpp
@@ -44,14 +44,6 @@ bool isEventKey(const std::string& key) {
const string DequeueEvent::KEY(QPID_HA+"de");
const string IdEvent::KEY(QPID_HA+"id");
-const string TxEnqueueEvent::KEY(QPID_HA+"txenq");
-const string TxDequeueEvent::KEY(QPID_HA+"txdeq");
-const string TxPrepareEvent::KEY(QPID_HA+"txpre");
-const string TxCommitEvent::KEY(QPID_HA+"txcom");
-const string TxRollbackEvent::KEY(QPID_HA+"txrb");
-const string TxPrepareOkEvent::KEY(QPID_HA+"txok");
-const string TxPrepareFailEvent::KEY(QPID_HA+"txno");
-const string TxBackupsEvent::KEY(QPID_HA+"txmem");
broker::Message makeMessage(
const string& data, const string& destination, const string& routingKey)
diff --git a/qpid/cpp/src/qpid/ha/Event.h b/qpid/cpp/src/qpid/ha/Event.h
index 7b96e36f64..308673657c 100644
--- a/qpid/cpp/src/qpid/ha/Event.h
+++ b/qpid/cpp/src/qpid/ha/Event.h
@@ -94,100 +94,6 @@ struct IdEvent : public EventBase<IdEvent> {
void print(std::ostream& o) const { o << id; }
};
-//// Transaction events
-
-struct TxEnqueueEvent : public EventBase<TxEnqueueEvent> {
- static const std::string KEY;
- framing::LongString queue;
- ReplicationId id;
-
- TxEnqueueEvent(std::string q=std::string(), ReplicationId i=ReplicationId())
- : queue(q), id(i) {}
- void encode(framing::Buffer& b) const { b.put(queue); b.put(id); }
- void decode(framing::Buffer& b) { b.get(queue); b.get(id); }
- virtual size_t encodedSize() const { return queue.encodedSize()+id.encodedSize(); }
- void print(std::ostream& o) const { o << queue.value << " " << id; }
-};
-
-struct TxDequeueEvent : public EventBase<TxDequeueEvent> {
- static const std::string KEY;
- framing::LongString queue;
- ReplicationId id;
-
- TxDequeueEvent(std::string q=std::string(), ReplicationId r=0) :
- queue(q), id(r) {}
- void encode(framing::Buffer& b) const { b.put(queue);b.put(id); }
- void decode(framing::Buffer& b) { b.get(queue);b.get(id); }
- virtual size_t encodedSize() const { return queue.encodedSize()+id.encodedSize(); }
- void print(std::ostream& o) const { o << queue.value << " " << id; }
-};
-
-struct TxPrepareEvent : public EventBase<TxPrepareEvent> {
- static const std::string KEY;
- void encode(framing::Buffer&) const {}
- void decode(framing::Buffer&) {}
- virtual size_t encodedSize() const { return 0; }
- void print(std::ostream&) const {}
-};
-
-struct TxCommitEvent : public EventBase<TxCommitEvent> {
- static const std::string KEY;
- void encode(framing::Buffer&) const {}
- void decode(framing::Buffer&) {}
- virtual size_t encodedSize() const { return 0; }
- void print(std::ostream&) const {}
-};
-
-struct TxRollbackEvent : public EventBase<TxRollbackEvent> {
- static const std::string KEY;
- void encode(framing::Buffer&) const {}
- void decode(framing::Buffer&) {}
- virtual size_t encodedSize() const { return 0; }
- void print(std::ostream&) const {}
-};
-
-struct TxPrepareOkEvent : public EventBase<TxPrepareOkEvent> {
- static const std::string KEY;
- types::Uuid broker;
- TxPrepareOkEvent(const types::Uuid& b=types::Uuid()) : broker(b) {}
-
- void encode(framing::Buffer& b) const {
- b.putRawData(broker.data(), broker.size());
- }
-
- void decode(framing::Buffer& b) {
- std::string s;
- b.getRawData(s, broker.size());
- broker = types::Uuid(&s[0]);
- }
- virtual size_t encodedSize() const { return broker.size(); }
- void print(std::ostream& o) const { o << broker; }
-};
-
-struct TxPrepareFailEvent : public EventBase<TxPrepareFailEvent> {
- static const std::string KEY;
- types::Uuid broker;
- TxPrepareFailEvent(const types::Uuid& b=types::Uuid()) : broker(b) {}
- void encode(framing::Buffer& b) const { b.putRawData(broker.data(), broker.size()); }
- void decode(framing::Buffer& b) {
- std::string s;
- b.getRawData(s, broker.size());
- broker = types::Uuid(&s[0]);
- }
- virtual size_t encodedSize() const { return broker.size(); }
- void print(std::ostream& o) const { o << broker; }
-};
-
-struct TxBackupsEvent : public EventBase<TxBackupsEvent> {
- static const std::string KEY;
- UuidSet backups;
- TxBackupsEvent(const UuidSet& s=UuidSet()) : backups(s) {}
- void encode(framing::Buffer& b) const { b.put(backups); }
- void decode(framing::Buffer& b) { b.get(backups); }
- size_t encodedSize() const { return backups.encodedSize(); }
- void print(std::ostream& o) const { o << backups; }
-};
-
}} // namespace qpid::ha
#endif /*!QPID_HA_EVENT_H*/
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 870e4723b2..ca92ad77dc 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -27,11 +27,11 @@
#include "RemoteBackup.h"
#include "ConnectionObserver.h"
#include "QueueReplicator.h"
-#include "PrimaryTxObserver.h"
#include "qpid/assert.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionHandlerObserver.h"
#include "qpid/framing/FieldTable.h"
@@ -77,8 +77,6 @@ class PrimaryBrokerObserver : public broker::BrokerObserver
void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
- void startTx(const intrusive_ptr<broker::TxBuffer>& tx) { primary.startTx(tx); }
- void startDtx(const intrusive_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx); }
private:
Primary& primary;
@@ -268,38 +266,6 @@ void Primary::addReplica(ReplicatingSubscription& rs) {
replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
}
-void Primary::skipEnqueues(
- const types::Uuid& backup,
- const boost::shared_ptr<broker::Queue>& queue,
- const ReplicationIdSet& ids)
-{
- sys::Mutex::ScopedLock l(lock);
- ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
- if (i != replicas.end()) i->second->skipEnqueues(ids);
-}
-
-void Primary::skipDequeues(
- const types::Uuid& backup,
- const boost::shared_ptr<broker::Queue>& queue,
- const ReplicationIdSet& ids)
-{
- sys::Mutex::ScopedLock l(lock);
- ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
- if (i != replicas.end()) i->second->skipDequeues(ids);
-}
-
-// Called from ReplicatingSubscription::cancel
-void Primary::removeReplica(const ReplicatingSubscription& rs) {
- boost::shared_ptr<PrimaryTxObserver> tx;
- {
- sys::Mutex::ScopedLock l(lock);
- replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
- TxMap::const_iterator i = txMap.find(rs.getQueue()->getName());
- if (i != txMap.end()) tx = i->second.lock();
- }
- if (tx) tx->cancel(rs); // Outside of lock.
-}
-
// NOTE: Called with queue registry lock held.
void Primary::queueCreate(const QueuePtr& q) {
// Set replication argument.
@@ -477,22 +443,4 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards)
backup->startCatchup();
}
-shared_ptr<PrimaryTxObserver> Primary::makeTxObserver(
- const boost::intrusive_ptr<broker::TxBuffer>& txBuffer)
-{
- shared_ptr<PrimaryTxObserver> observer =
- PrimaryTxObserver::create(*this, haBroker, txBuffer);
- sys::Mutex::ScopedLock l(lock);
- txMap[observer->getTxQueue()->getName()] = observer;
- return observer;
-}
-
-void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& txBuffer) {
- txBuffer->setObserver(makeTxObserver(txBuffer));
-}
-
-void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& ) {
- QPID_LOG(warning, "DTX transactions in a HA cluster are not yet atomic");
-}
-
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 84d714fc01..58e6e684ea 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -44,8 +44,6 @@ class Connection;
class ConnectionObserver;
class BrokerObserver;
class SessionHandlerObserver;
-class TxBuffer;
-class DtxBuffer;
}
namespace sys {
@@ -58,7 +56,6 @@ class ReplicatingSubscription;
class RemoteBackup;
class QueueGuard;
class Membership;
-class PrimaryTxObserver;
/**
* State associated with a primary broker:
@@ -87,25 +84,12 @@ class Primary : public Role
void readyReplica(const ReplicatingSubscription&);
void addReplica(ReplicatingSubscription&);
- void removeReplica(const ReplicatingSubscription&);
-
- /** Skip replication of ids to queue on backup. */
- void skipEnqueues(const types::Uuid& backup,
- const boost::shared_ptr<broker::Queue>& queue,
- const ReplicationIdSet& ids);
-
- /** Skip replication of dequeue of ids to queue on backup. */
- void skipDequeues(const types::Uuid& backup,
- const boost::shared_ptr<broker::Queue>& queue,
- const ReplicationIdSet& ids);
// Called via BrokerObserver
void queueCreate(const QueuePtr&);
void queueDestroy(const QueuePtr&);
void exchangeCreate(const ExchangePtr&);
void exchangeDestroy(const ExchangePtr&);
- void startTx(const boost::intrusive_ptr<broker::TxBuffer>&);
- void startDtx(const boost::intrusive_ptr<broker::DtxBuffer>&);
// Called via ConnectionObserver
void opened(broker::Connection& connection);
@@ -126,9 +110,6 @@ class Primary : public Role
typedef sys::unordered_map<UuidQueue, ReplicatingSubscription*,
Hasher<UuidQueue> > ReplicaMap;
- // Map of PrimaryTxObservers by tx-queue name
- typedef sys::unordered_map<std::string, boost::weak_ptr<PrimaryTxObserver> > TxMap;
-
RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
@@ -136,8 +117,6 @@ class Primary : public Role
void checkReady(RemoteBackupPtr);
void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
void deduplicate();
- boost::shared_ptr<PrimaryTxObserver> makeTxObserver(
- const boost::intrusive_ptr<broker::TxBuffer>&);
mutable sys::Mutex lock;
HaBroker& haBroker;
@@ -161,7 +140,6 @@ class Primary : public Role
boost::shared_ptr<broker::SessionHandlerObserver> sessionHandlerObserver;
boost::intrusive_ptr<sys::TimerTask> timerTask;
ReplicaMap replicas;
- TxMap txMap;
PrimaryQueueLimits queueLimits;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
deleted file mode 100644
index 56815ef89d..0000000000
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Event.h"
-#include "HaBroker.h"
-#include "Primary.h"
-#include "PrimaryTxObserver.h"
-#include "QueueGuard.h"
-#include "RemoteBackup.h"
-#include "ReplicatingSubscription.h"
-
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/framing/reply_exceptions.h"
-#include <boost/lexical_cast.hpp>
-#include <algorithm>
-
-namespace qpid {
-namespace framing {
-class FieldTable;
-}
-namespace ha {
-
-using namespace std;
-using namespace sys;
-using namespace broker;
-using namespace framing;
-using types::Uuid;
-
-// Exchange to receive prepare OK events.
-class PrimaryTxObserver::Exchange : public broker::Exchange {
- public:
- Exchange(const boost::shared_ptr<PrimaryTxObserver>& tx_) :
- broker::Exchange(tx_->getExchangeName()),
- tx(tx_)
- {
- args.setString(QPID_REPLICATE, printable(NONE).str()); // Set replication arg.
- dispatch[TxPrepareOkEvent::KEY] =
- boost::bind(&PrimaryTxObserver::txPrepareOkEvent, tx, _1);
- dispatch[TxPrepareFailEvent::KEY] =
- boost::bind(&PrimaryTxObserver::txPrepareFailEvent, tx, _1);
- }
-
- void route(Deliverable& deliverable) {
- const broker::Message& message(deliverable.getMessage());
- DispatchMap::iterator i = dispatch.find(message.getRoutingKey());
- if (i != dispatch.end()) i->second(message.getContent());
- }
-
- bool bind(boost::shared_ptr<Queue>, const string&, const FieldTable*) { return false; }
- bool unbind(boost::shared_ptr<Queue>, const string&, const FieldTable*) { return false; }
- bool isBound(boost::shared_ptr<Queue>, const string* const, const FieldTable* const) { return false; }
- bool hasBindings() { return false; }
- string getType() const { return TYPE_NAME; }
-
- private:
- static const string TYPE_NAME;
- typedef boost::function<void(const std::string&)> DispatchFn;
- typedef unordered_map<std::string, DispatchFn> DispatchMap;
-
- DispatchMap dispatch;
- boost::shared_ptr<PrimaryTxObserver> tx;
-};
-
-const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
-
-boost::shared_ptr<PrimaryTxObserver> PrimaryTxObserver::create(
- Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx) {
- boost::shared_ptr<PrimaryTxObserver> pto(new PrimaryTxObserver(p, hb, tx));
- pto->initialize();
- return pto;
-}
-
-
-PrimaryTxObserver::PrimaryTxObserver(
- Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx
-) :
- state(SENDING),
- logPrefix(hb.logPrefix),
- primary(p), haBroker(hb), broker(hb.getBroker()),
- replicationTest(hb.getSettings().replicateDefault.get()),
- txBuffer(tx),
- id(true),
- exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
- empty(true)
-{
- logPrefix = "Primary TX "+shortStr(id)+": ";
-
- // The brokers known at this point are the ones that will be included
- // in the transaction. Brokers that join later are not included.
- //
- BrokerInfo::Set backups_(haBroker.getMembership().otherBackups());
- std::transform(backups_.begin(), backups_.end(), inserter(backups, backups.begin()),
- boost::bind(&BrokerInfo::getSystemId, _1));
-
- // Delay completion of TX untill all backups have responded to prepare.
- incomplete = backups;
- for (size_t i = 0; i < incomplete.size(); ++i)
- txBuffer->startCompleter();
-
- QPID_LOG(debug, logPrefix << "Started, backups " << backups);
-}
-
-void PrimaryTxObserver::initialize() {
- boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this()));
- broker.getExchanges().registerExchange(ex);
- pair<QueuePtr, bool> result =
- broker.createQueue(
- exchangeName,
- QueueSettings(/*durable*/false, /*autodelete*/true),
- 0, // no owner regardless of exclusivity on primary
- string(), // No alternate exchange
- haBroker.getUserId(),
- string()); // Remote host.
- if (!result.second)
- throw InvalidArgumentException(
- QPID_MSG(logPrefix << "TX replication queue already exists."));
- txQueue = result.first;
- txQueue->markInUse(); // Prevent auto-delete till we are done.
- txQueue->deliver(TxBackupsEvent(backups).message());
-}
-
-
-PrimaryTxObserver::~PrimaryTxObserver() {}
-
-void PrimaryTxObserver::checkState(State expect, const std::string& msg) {
- if (state != expect)
- throw IllegalStateException(QPID_MSG(logPrefix << "Illegal state: " << msg));
-}
-
-void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
-{
- Mutex::ScopedLock l(lock);
- if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
- QPID_LOG(trace, logPrefix << "Enqueue: " << logMessageId(*q, m.getReplicationId()));
- checkState(SENDING, "Too late for enqueue");
- empty = false;
- enqueues[q] += m.getReplicationId();
- txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
- txQueue->deliver(m);
- }
-}
-
-void PrimaryTxObserver::dequeue(
- const QueuePtr& q, QueuePosition pos, ReplicationId id)
-{
- Mutex::ScopedLock l(lock);
- checkState(SENDING, "Too late for dequeue");
- if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
- QPID_LOG(trace, logPrefix << "Dequeue: " << logMessageId(*q, pos, id));
- empty = false;
- dequeues[q] += id;
- txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
- }
-}
-
-namespace {
-struct Skip {
- Uuid backup;
- boost::shared_ptr<broker::Queue> queue;
- ReplicationIdSet ids;
-
- Skip(const Uuid& backup_,
- const boost::shared_ptr<broker::Queue>& queue_,
- const ReplicationIdSet& ids_) :
- backup(backup_), queue(queue_), ids(ids_) {}
-
- void skipEnqueues(Primary& p) const { p.skipEnqueues(backup, queue, ids); }
- void skipDequeues(Primary& p) const { p.skipDequeues(backup, queue, ids); }
-};
-} // namespace
-
-void PrimaryTxObserver::skip(Mutex::ScopedLock&) {
- // Tell replicating subscriptions to skip IDs in the transaction.
- vector<Skip> skipEnq, skipDeq;
- for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) {
- for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
- skipEnq.push_back(Skip(*b, q->first, q->second));
- for (QueueIdsMap::iterator q = dequeues.begin(); q != dequeues.end(); ++q)
- skipDeq.push_back(Skip(*b, q->first, q->second));
- }
- Mutex::ScopedUnlock u(lock); // Outside lock
- for_each(skipEnq.begin(), skipEnq.end(), boost::bind(&Skip::skipEnqueues, _1, boost::ref(primary)));
- for_each(skipDeq.begin(), skipDeq.end(), boost::bind(&Skip::skipDequeues, _1, boost::ref(primary)));
-}
-
-bool PrimaryTxObserver::prepare() {
- QPID_LOG(debug, logPrefix << "Prepare " << backups);
- Mutex::ScopedLock l(lock);
- checkState(SENDING, "Too late for prepare");
- state = PREPARING;
- skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue.
- txQueue->deliver(TxPrepareEvent().message());
- return true;
-}
-
-void PrimaryTxObserver::commit() {
- QPID_LOG(debug, logPrefix << "Commit");
- Mutex::ScopedLock l(lock);
- checkState(PREPARING, "Cannot commit, not preparing");
- if (incomplete.size() == 0) {
- txQueue->deliver(TxCommitEvent().message());
- end(l);
- } else {
- txQueue->deliver(TxRollbackEvent().message());
- end(l);
- throw PreconditionFailedException(
- QPID_MSG(logPrefix << "Cannot commit, " << incomplete.size()
- << " incomplete backups"));
- }
-}
-
-void PrimaryTxObserver::rollback() {
- Mutex::ScopedLock l(lock);
- // Don't bleat about rolling back empty transactions, this happens all the time
- // when a session closes and rolls back its outstanding transaction.
- if (!empty) QPID_LOG(debug, logPrefix << "Rollback");
- if (state != ENDED) {
- txQueue->deliver(TxRollbackEvent().message());
- end(l);
- }
-}
-
-void PrimaryTxObserver::end(Mutex::ScopedLock&) {
- if (state == ENDED) return;
- state = ENDED;
- // If there are no outstanding completions, break pointer cycle here.
- // Otherwise break it in cancel() when the remaining completions are done.
- if (incomplete.empty()) txBuffer = 0;
- txQueue->releaseFromUse(); // txQueue will auto-delete
- txQueue->scheduleAutoDelete();
- txQueue.reset();
- try {
- broker.getExchanges().destroy(getExchangeName());
- } catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Deleting TX exchange: " << e.what());
- }
-}
-
-bool PrimaryTxObserver::completed(const Uuid& id, Mutex::ScopedLock&) {
- if (incomplete.erase(id)) {
- txBuffer->finishCompleter();
- return true;
- }
- return false;
-}
-
-bool PrimaryTxObserver::error(const Uuid& id, const std::string& msg, Mutex::ScopedLock& l)
-{
- if (incomplete.find(id) != incomplete.end()) {
- // Note: setError before completed since completed may trigger completion.
- // Only use the TX part of the log prefix.
- txBuffer->setError(Msg() << logPrefix.get() << msg << shortStr(id) << ".");
- completed(id, l);
- return true;
- }
- return false;
-}
-
-void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
- Mutex::ScopedLock l(lock);
- types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker;
- if (completed(backup, l)) {
- QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup);
- } else {
- QPID_LOG(error, logPrefix << "Unexpected prepare-ok response from " << backup);
- }
-}
-
-void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
- Mutex::ScopedLock l(lock);
- types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker;
- if (error(backup, "Prepare failed on backup ", l)) {
- QPID_LOG(error, logPrefix << "Prepare failed on backup " << backup);
- } else {
- QPID_LOG(error, logPrefix << "Unexpected prepare-fail response from " << backup);
- }
-}
-
-void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
- Mutex::ScopedLock l(lock);
- types::Uuid backup = rs.getBrokerInfo().getSystemId();
- // Normally the backup should be completed before it is cancelled.
- if (completed(backup, l)) error(backup, "Unexpected disconnect:", l);
- // Break the pointer cycle if backups have completed and we are done with txBuffer.
- if (state == ENDED && incomplete.empty()) txBuffer = 0;
-}
-
-}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
deleted file mode 100644
index 6f445ee212..0000000000
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ /dev/null
@@ -1,133 +0,0 @@
-#ifndef QPID_HA_PRIMARYTXOBSERVER_H
-#define QPID_HA_PRIMARYTXOBSERVER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "types.h"
-#include "ReplicationTest.h"
-#include "LogPrefix.h"
-#include "qpid/broker/SessionState.h"
-#include "qpid/broker/TransactionObserver.h"
-#include "qpid/log/Statement.h"
-#include "qpid/types/Uuid.h"
-#include "qpid/sys/unordered_map.h"
-#include "qpid/sys/Monitor.h"
-#include <boost/enable_shared_from_this.hpp>
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-
-namespace broker {
-class Broker;
-class Message;
-class Consumer;
-class AsyncCompletion;
-}
-
-namespace ha {
-class HaBroker;
-class ReplicatingSubscription;
-class Primary;
-
-/**
- * Observe events in the lifecycle of a transaction.
- *
- * The observer is called by TxBuffer for each transactional event.
- * It puts the events on a special tx-queue.
- * A TxReplicator on the backup replicates the tx-queue and creates
- * a TxBuffer on the backup equivalent to the one on the primary.
- *
- * Creates an exchange to receive prepare-ok/prepare-fail messages from backups.
- *
- * Monitors for tx-queue subscription cancellations.
- *
- * THREAD SAFE: called in user connection thread for TX events,
- * and in backup connection threads for prepare-completed events
- * and unsubscriptions.
- */
-class PrimaryTxObserver : public broker::TransactionObserver,
- public boost::enable_shared_from_this<PrimaryTxObserver>
-{
- public:
- static boost::shared_ptr<PrimaryTxObserver> create(
- Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&);
-
- ~PrimaryTxObserver();
-
- void enqueue(const QueuePtr&, const broker::Message&);
- void dequeue(const QueuePtr& queue, QueuePosition, ReplicationId);
- bool prepare();
- void commit();
- void rollback();
-
- types::Uuid getId() const { return id; }
- QueuePtr getTxQueue() const { return txQueue; }
- std::string getExchangeName() const { return exchangeName; }
-
- // Notify that a backup subscription has been cancelled.
- void cancel(const ReplicatingSubscription&);
-
- private:
- class Exchange;
- typedef qpid::sys::unordered_map<
- QueuePtr, ReplicationIdSet, Hasher<QueuePtr> > QueueIdsMap;
-
- enum State {
- SENDING, ///< Sending TX messages and acks
- PREPARING, ///< Prepare sent, waiting for response
- ENDED ///< Commit or rollback sent, local transaction ended.
- };
-
- PrimaryTxObserver(Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&);
- void initialize();
-
- void skip(sys::Mutex::ScopedLock&);
- void checkState(State expect, const std::string& msg);
- void end(sys::Mutex::ScopedLock&);
- void txPrepareOkEvent(const std::string& data);
- void txPrepareFailEvent(const std::string& data);
- bool completed(const types::Uuid& id, sys::Mutex::ScopedLock&);
- bool error(const types::Uuid& id, const std::string& msg, sys::Mutex::ScopedLock& l);
-
- sys::Monitor lock;
- State state;
- LogPrefix2 logPrefix;
- Primary& primary;
- HaBroker& haBroker;
- broker::Broker& broker;
- ReplicationTest replicationTest;
- // NOTE: There is an intrusive_ptr cycle between PrimaryTxObserver
- // and TxBuffer. The cycle is broken in PrimaryTxObserver::end()
- boost::intrusive_ptr<broker::TxBuffer> txBuffer;
-
- types::Uuid id;
- std::string exchangeName;
- QueuePtr txQueue;
- QueueIdsMap enqueues, dequeues;
- UuidSet backups; // All backups of transaction.
- UuidSet incomplete; // Incomplete backups (not yet responded to prepare)
- bool empty; // True if the transaction is empty - no enqueues/dequeues.
-};
-
-}} // namespace qpid::ha
-
-#endif /*!QPID_HA_PRIMARYTXOBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 3045829ce8..c0d2689685 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -82,28 +82,27 @@ void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) {
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
- : queueReplicator(qr), logPrefix(qr->logPrefix) {}
+ : queueReplicator(qr), logPrefix(qr->logPrefix.prePrefix, qr->logPrefix.get()) {}
void connectionException(framing::connection::CloseCode code, const std::string& msg) {
- QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+ QPID_LOG(error, logPrefix << "Outgoing " << framing::createConnectionException(code, msg).what());
}
void channelException(framing::session::DetachCode code, const std::string& msg) {
- QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+ QPID_LOG(error, logPrefix << "Outgoing " << framing::createChannelException(code, msg).what());
}
void executionException(framing::execution::ErrorCode code, const std::string& msg) {
- QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+ QPID_LOG(error, logPrefix << "Outgoing " << framing::createSessionException(code, msg).what());
}
void incomingExecutionException(ErrorCode code, const std::string& msg) {
boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock();
- if (qr && !qr->deletedOnPrimary(code, msg))
- QPID_LOG(error, logPrefix << "Incoming "
- << framing::createSessionException(code, msg).what());
+ if (!(qr && qr->deletedOnPrimary(code, msg)))
+ QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
}
void detach() {}
private:
boost::weak_ptr<QueueReplicator> queueReplicator;
- const LogPrefix& logPrefix;
+ LogPrefix2 logPrefix;
};
class QueueReplicator::QueueObserver : public broker::QueueObserver {
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index d511b5bd0e..cbf00d0a8f 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -20,7 +20,6 @@
*/
#include "RemoteBackup.h"
#include "QueueGuard.h"
-#include "TxReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index ca4dd0099f..fb4cdd014c 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -24,7 +24,6 @@
#include "QueueGuard.h"
#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
-#include "TxReplicatingSubscription.h"
#include "Primary.h"
#include "HaBroker.h"
#include "qpid/assert.h"
@@ -52,7 +51,6 @@ const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION(QPID_HA+"rep
const string ReplicatingSubscription::QPID_BROKER_INFO(QPID_HA+"info");
const string ReplicatingSubscription::QPID_ID_SET(QPID_HA+"ids");
const string ReplicatingSubscription::QPID_QUEUE_REPLICATOR(QPID_HA+"qrep");
-const string ReplicatingSubscription::QPID_TX_REPLICATOR(QPID_HA+"txrep");
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
@@ -76,12 +74,6 @@ ReplicatingSubscription::Factory::create(
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
}
- else if (type == QPID_TX_REPLICATOR) {
- rs.reset(new TxReplicatingSubscription(
- haBroker,
- parent, name, queue, ack, acquire, exclusive, tag,
- resumeId, resumeTtl, arguments));
- }
if (rs) rs->initialize();
return rs;
}
@@ -254,7 +246,6 @@ void ReplicatingSubscription::cancel()
cancelled = true;
}
QPID_LOG(debug, logPrefix << "Cancelled");
- if (primary) primary->removeReplica(*this);
getQueue()->getObservers().remove(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
guard->cancel();
@@ -280,8 +271,6 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
{
ReplicationIdSet oldDequeues = dequeues;
- dequeues -= skipDequeue; // Don't send skipped dequeues
- skipDequeue -= oldDequeues; // Forget dequeues that would have been sent.
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
sendEvent(DequeueEvent(dequeues), l);
@@ -333,14 +322,4 @@ bool ReplicatingSubscription::doDispatch()
}
}
-void ReplicatingSubscription::skipEnqueues(const ReplicationIdSet& ids) {
- Mutex::ScopedLock l(lock);
- skipEnqueue += ids;
-}
-
-void ReplicatingSubscription::skipDequeues(const ReplicationIdSet& ids) {
- Mutex::ScopedLock l(lock);
- skipDequeue += ids;
-}
-
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index d6d41dd2cf..c2e51971cc 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -97,7 +97,6 @@ class ReplicatingSubscription :
static const std::string QPID_ID_SET;
// Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument.
static const std::string QPID_QUEUE_REPLICATOR;
- static const std::string QPID_TX_REPLICATOR;
ReplicatingSubscription(HaBroker& haBroker,
broker::SemanticState* parent,
@@ -138,9 +137,6 @@ class ReplicatingSubscription :
BrokerInfo getBrokerInfo() const { return info; }
- void skipEnqueues(const ReplicationIdSet& ids);
- void skipDequeues(const ReplicationIdSet& ids);
-
protected:
bool doDispatch();
@@ -148,8 +144,7 @@ class ReplicatingSubscription :
LogPrefix2 logPrefix;
QueuePosition position;
ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
- ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup and tx enqueues.
- ReplicationIdSet skipDequeue; // Dequeues to skip: tx dequeues.
+ ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup.
ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
bool wasStopped;
bool ready;
diff --git a/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp
deleted file mode 100644
index 15b33fe89d..0000000000
--- a/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "TxReplicatingSubscription.h"
-
-namespace qpid {
-namespace ha {
-using namespace std;
-using namespace broker;
-
-TxReplicatingSubscription::TxReplicatingSubscription(
- HaBroker& hb,
- SemanticState* parent,
- const string& name,
- boost::shared_ptr<Queue> queue,
- bool ack,
- bool acquire,
- bool exclusive,
- const string& tag,
- const string& resumeId,
- uint64_t resumeTtl,
- const framing::FieldTable& arguments
-) : ReplicatingSubscription(hb, parent, name, queue, ack, acquire, exclusive, tag,
- resumeId, resumeTtl, arguments)
-{}
-
-}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h
deleted file mode 100644
index a363d262a0..0000000000
--- a/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h
+++ /dev/null
@@ -1,50 +0,0 @@
-#ifndef QPID_HA_TXREPLICATINGSUBSCRIPTION_H
-#define QPID_HA_TXREPLICATINGSUBSCRIPTION_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ReplicatingSubscription.h"
-
-namespace qpid {
-namespace ha {
-
-/**
- * Replicating subscription for a TX queue.
- */
-class TxReplicatingSubscription : public ReplicatingSubscription
-{
- public:
- TxReplicatingSubscription(HaBroker& haBroker,
- broker::SemanticState* parent,
- const std::string& name, boost::shared_ptr<broker::Queue> ,
- bool ack, bool acquire, bool exclusive, const std::string& tag,
- const std::string& resumeId, uint64_t resumeTtl,
- const framing::FieldTable& arguments);
-
- /** A TxReplicatingSubscription is counted for auto-delete so we can clean
- * up the TX queue when all backups are done.
- */
- bool isCounted() { return true; }
-};
-}} // namespace qpid::ha
-
-#endif /*!QPID_HA_TXREPLICATINGSUBSCRIPTION_H*/
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
deleted file mode 100644
index 33adc9780d..0000000000
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-#include "TxReplicator.h"
-#include "Role.h"
-#include "Backup.h"
-#include "BrokerReplicator.h"
-#include "Event.h"
-#include "HaBroker.h"
-#include "ReplicatingSubscription.h"
-#include "types.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Link.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/SessionHandler.h"
-#include "qpid/broker/TxBuffer.h"
-#include "qpid/broker/TxAccept.h"
-#include "qpid/broker/amqp_0_10/Connection.h"
-#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/framing/BufferTypes.h"
-#include "qpid/log/Statement.h"
-#include "qpid/broker/amqp_0_10/MessageTransfer.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include <boost/shared_ptr.hpp>
-#include <boost/bind.hpp>
-#include <sstream>
-
-namespace qpid {
-namespace ha {
-
-using namespace std;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using qpid::broker::amqp_0_10::MessageTransfer;
-using qpid::types::Uuid;
-
-namespace {
-const string PREFIX(TRANSACTION_REPLICATOR_PREFIX);
-} // namespace
-
-bool TxReplicator::isTxQueue(const string& q) {
- return startsWith(q, PREFIX);
-}
-
-Uuid TxReplicator::getTxId(const string& q) {
- if (TxReplicator::isTxQueue(q)) {
- std::istringstream is(q);
- is.seekg(PREFIX.size());
- Uuid id;
- is >> id;
- if (!is.fail()) return id;
- }
- throw Exception(QPID_MSG("Invalid tx queue: " << q));
-}
-
-string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; }
-
-boost::shared_ptr<TxReplicator> TxReplicator::create(
- HaBroker& hb,
- const boost::shared_ptr<broker::Queue>& txQueue,
- const boost::shared_ptr<broker::Link>& link)
-{
- boost::shared_ptr<TxReplicator> tr(new TxReplicator(hb, txQueue, link));
- tr->initialize();
- return tr;
-}
-
-TxReplicator::TxReplicator(
- HaBroker& hb,
- const boost::shared_ptr<broker::Queue>& txQueue,
- const boost::shared_ptr<broker::Link>& link) :
- QueueReplicator(hb, txQueue, link),
- logPrefix(hb.logPrefix),
- store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
- channel(link->nextChannel()),
- empty(true), ended(false),
- dequeueState(hb.getBroker().getQueues())
-{
- logPrefix = "Backup of TX "+shortStr(getTxId(txQueue->getName()))+": ";
- QPID_LOG(debug, logPrefix << "Started");
- if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded."));
-
- // Dispatch transaction events.
- dispatch[TxEnqueueEvent::KEY] =
- boost::bind(&TxReplicator::enqueue, this, _1, _2);
- dispatch[TxDequeueEvent::KEY] =
- boost::bind(&TxReplicator::dequeue, this, _1, _2);
- dispatch[TxPrepareEvent::KEY] =
- boost::bind(&TxReplicator::prepare, this, _1, _2);
- dispatch[TxCommitEvent::KEY] =
- boost::bind(&TxReplicator::commit, this, _1, _2);
- dispatch[TxRollbackEvent::KEY] =
- boost::bind(&TxReplicator::rollback, this, _1, _2);
- dispatch[TxBackupsEvent::KEY] =
- boost::bind(&TxReplicator::backups, this, _1, _2);
-}
-
-TxReplicator::~TxReplicator() {
- link->returnChannel(channel);
-}
-
-// Send a message to the primary tx.
-void TxReplicator::sendMessage(const broker::Message& msg, sys::Mutex::ScopedLock&) {
- assert(sessionHandler);
- const MessageTransfer& transfer(MessageTransfer::get(msg));
- for (FrameSet::const_iterator i = transfer.getFrames().begin();
- i != transfer.getFrames().end();
- ++i)
- {
- sessionHandler->out.handle(const_cast<AMQFrame&>(*i));
- }
-}
-
-void TxReplicator::deliver(const broker::Message& m_) {
- boost::intrusive_ptr<broker::TxBuffer> txbuf;
- broker::Message m(m_);
- {
- sys::Mutex::ScopedLock l(lock);
- if (!txBuffer) return;
- txbuf = txBuffer;
- m.setReplicationId(enq.id); // Use enqueued replicated id.
- }
- // Deliver message to the target queue, not the tx-queue.
- boost::shared_ptr<broker::Queue> queue = haBroker.getBroker().getQueues().get(enq.queue);
- QPID_LOG(trace, logPrefix << "Deliver " << logMessageId(*queue, m.getReplicationId()));
- DeliverableMessage dm(m, txbuf.get());
- dm.deliverTo(queue);
-}
-
-void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
- sys::Mutex::ScopedLock l(lock);
- if (!txBuffer) return;
- TxEnqueueEvent e;
- decodeStr(data, e);
- QPID_LOG(trace, logPrefix << "Enqueue: " << e);
- enq = e;
- empty = false;
-}
-
-void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
- sys::Mutex::ScopedLock l(lock);
- if (!txBuffer) return;
- TxDequeueEvent e;
- decodeStr(data, e);
- QPID_LOG(trace, logPrefix << "Dequeue: " << e);
- // NOTE: Backup does not see transactional dequeues until the transaction is
- // prepared, then they are all receieved before the prepare event.
- // We collect the events here so we can do a single scan of the queue in prepare.
- dequeueState.add(e);
- empty = false;
-}
-
-void TxReplicator::DequeueState::add(const TxDequeueEvent& event) {
- events[event.queue] += event.id;
-}
-
-// Use this function as a seek() predicate to find the dequeued messages.
-bool TxReplicator::DequeueState::addRecord(
- const broker::Message& m, const boost::shared_ptr<Queue>& queue,
- const ReplicationIdSet& rids)
-{
- if (rids.contains(m.getReplicationId())) {
- DeliveryRecord dr(cursor, m.getSequence(), m.getReplicationId(), queue,
- string() /*tag*/,
- boost::shared_ptr<Consumer>(),
- true /*acquired*/,
- false /*accepted*/,
- false /*credit.isWindowMode()*/,
- 0 /*credit*/);
- // Generate record ids, unique within this transaction.
- dr.setId(nextId++);
- records.push_back(dr);
- recordIds += dr.getId();
- }
- return false;
-}
-
-void TxReplicator::DequeueState::addRecords(const EventMap::value_type& entry) {
- // Process all the dequeues for a single queue, in one pass of seek()
- boost::shared_ptr<broker::Queue> q = queues.get(entry.first);
- q->seek(cursor, boost::bind(&TxReplicator::DequeueState::addRecord,
- this, _1, q, entry.second));
-}
-
-boost::shared_ptr<TxAccept> TxReplicator::DequeueState::makeAccept() {
- for_each(events.begin(), events.end(),
- boost::bind(&TxReplicator::DequeueState::addRecords, this, _1));
- return boost::shared_ptr<TxAccept>(
- new TxAccept(boost::cref(recordIds), boost::ref(records)));
-}
-
-void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) {
- if (!txBuffer) return;
- txBuffer->enlist(dequeueState.makeAccept());
- context = store->begin();
- if (txBuffer->prepare(context.get())) {
- QPID_LOG(debug, logPrefix << "Local prepare OK");
- sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l);
- } else {
- QPID_LOG(error, logPrefix << "Local prepare failed");
- sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l);
- }
-}
-
-void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
- if (!txBuffer) return;
- QPID_LOG(debug, logPrefix << "Commit");
- if (context.get()) store->commit(*context);
- txBuffer->commit();
- end(l);
-}
-
-void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
- if (!txBuffer) return;
- // Don't bleat about rolling back empty transactions, this happens all the time
- // when a session closes and rolls back its outstanding transaction.
- if (!empty) QPID_LOG(debug, logPrefix << "Rollback");
- if (context.get()) store->abort(*context);
- txBuffer->rollback();
- end(l);
-}
-
-void TxReplicator::backups(const string& data, sys::Mutex::ScopedLock& l) {
- TxBackupsEvent e;
- decodeStr(data, e);
- if (!e.backups.count(haBroker.getMembership().getSelf().getSystemId())) {
- QPID_LOG(info, logPrefix << "Not participating");
- end(l);
- } else {
- QPID_LOG(debug, logPrefix << "Backups: " << e.backups);
- txBuffer = new broker::TxBuffer;
- }
-}
-
-void TxReplicator::end(sys::Mutex::ScopedLock&) {
- ended = true;
- txBuffer = 0;
- // QueueReplicator::destroy cancels subscription to the primary tx-queue
- // which allows the primary to clean up resources.
- sys::Mutex::ScopedUnlock u(lock);
- QueueReplicator::destroy();
-}
-
-// Called when the tx queue is deleted.
-void TxReplicator::destroy(sys::Mutex::ScopedLock& l) {
- if (!ended) {
- if (!empty) QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback");
- rollback(string(), l);
- }
- QueueReplicator::destroy(l);
-}
-
-}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h
deleted file mode 100644
index c7599d21b1..0000000000
--- a/qpid/cpp/src/qpid/ha/TxReplicator.h
+++ /dev/null
@@ -1,136 +0,0 @@
-#ifndef QPID_HA_TRANSACTIONREPLICATOR_H
-#define QPID_HA_TRANSACTIONREPLICATOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "LogPrefix.h"
-#include "QueueReplicator.h"
-#include "Event.h"
-#include "qpid/broker/DeliveryRecord.h"
-#include "qpid/broker/TransactionalStore.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/types/Uuid.h"
-
-namespace qpid {
-
-namespace broker {
-class TxBuffer;
-class TxAccept;
-class DtxBuffer;
-class Broker;
-class MessageStore;
-class Deliverable;
-}
-
-namespace ha {
-class BrokerReplicator;
-
-/**
- * Exchange created on a backup broker to replicate a transaction on the primary.
- *
- * Subscribes to a tx-queue like a normal queue but puts replicated messages and
- * transaction events into a local TxBuffer.
- *
- * THREAD SAFE: Called in different connection threads.
- */
-class TxReplicator : public QueueReplicator {
- public:
- typedef boost::shared_ptr<broker::Queue> QueuePtr;
- typedef boost::shared_ptr<broker::Link> LinkPtr;
-
- static bool isTxQueue(const std::string& queue);
- static types::Uuid getTxId(const std::string& queue);
-
- static boost::shared_ptr<TxReplicator> create(
- HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
-
- ~TxReplicator();
-
- std::string getType() const;
-
- // QueueReplicator overrides
- using QueueReplicator::destroy;
- void destroy(sys::Mutex::ScopedLock&);
-
- protected:
-
- void deliver(const broker::Message&);
-
- private:
-
- typedef void (TxReplicator::*DispatchFunction)(
- const std::string&, sys::Mutex::ScopedLock&);
- typedef qpid::sys::unordered_map<std::string, DispatchFunction> DispatchMap;
- typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> DequeueMap;
-
- TxReplicator(HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
- void sendMessage(const broker::Message&, sys::Mutex::ScopedLock&);
- void enqueue(const std::string& data, sys::Mutex::ScopedLock&);
- void dequeue(const std::string& data, sys::Mutex::ScopedLock&);
- void prepare(const std::string& data, sys::Mutex::ScopedLock&);
- void commit(const std::string& data, sys::Mutex::ScopedLock&);
- void rollback(const std::string& data, sys::Mutex::ScopedLock&);
- void backups(const std::string& data, sys::Mutex::ScopedLock&);
- void end(sys::Mutex::ScopedLock&);
-
- LogPrefix2 logPrefix;
- TxEnqueueEvent enq; // Enqueue data for next deliver.
- boost::intrusive_ptr<broker::TxBuffer> txBuffer;
- broker::MessageStore* store;
- std::auto_ptr<broker::TransactionContext> context;
- framing::ChannelId channel; // Channel to send prepare-complete.
- bool empty, ended;
-
- // Class to process dequeues and create DeliveryRecords to populate a
- // TxAccept.
- class DequeueState {
- public:
- DequeueState(broker::QueueRegistry& qr) : queues(qr) {}
- void add(const TxDequeueEvent&);
- boost::shared_ptr<broker::TxAccept> makeAccept();
-
- private:
- // Delivery record IDs are command IDs from the session.
- // On a backup we will just fake these Ids.
- typedef framing::SequenceNumber Id;
- typedef framing::SequenceSet IdSet;
- typedef qpid::sys::unordered_map<std::string, ReplicationIdSet> EventMap;
-
- bool addRecord(const broker::Message& m,
- const boost::shared_ptr<broker::Queue>&,
- const ReplicationIdSet& );
- void addRecords(const DequeueMap::value_type& entry);
-
- broker::QueueRegistry& queues;
- EventMap events;
- broker::DeliveryRecords records;
- broker::QueueCursor cursor;
- framing::SequenceNumber nextId;
- IdSet recordIds;
- };
- DequeueState dequeueState;
-};
-
-
-}} // namespace qpid::ha
-
-#endif /*!QPID_HA_TRANSACTIONREPLICATOR_H*/
diff --git a/qpid/cpp/src/qpid/ha/types.cpp b/qpid/cpp/src/qpid/ha/types.cpp
index 60cc0f27ce..3088661c95 100644
--- a/qpid/cpp/src/qpid/ha/types.cpp
+++ b/qpid/cpp/src/qpid/ha/types.cpp
@@ -39,7 +39,6 @@ const string QPID_HA_UUID("qpid.ha-uuid");
const char* QPID_HA_PREFIX = "qpid.ha-";
const char* QUEUE_REPLICATOR_PREFIX = "qpid.ha-q:";
-const char* TRANSACTION_REPLICATOR_PREFIX = "qpid.ha-tx:";
bool startsWith(const string& name, const string& prefix) {
return name.compare(0, prefix.size(), prefix) == 0;
diff --git a/qpid/cpp/src/qpid/sys/SocketTransport.cpp b/qpid/cpp/src/qpid/sys/SocketTransport.cpp
index 86c9d301e9..36edcf24a3 100644
--- a/qpid/cpp/src/qpid/sys/SocketTransport.cpp
+++ b/qpid/cpp/src/qpid/sys/SocketTransport.cpp
@@ -48,7 +48,7 @@ namespace {
{
if (opts.tcpNoDelay) {
s.setTcpNoDelay();
- QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+ QPID_LOG(debug, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
AsynchIO* aio = AsynchIO::create
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 82ca808cb1..ace225a509 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -267,6 +267,8 @@ acl allow all all
c = self.connect_admin()
try:
wait_address(c, queue)
+ if not "msg" in kwargs:
+ kwargs["msg"]=str(self)
assert_browse_retry(c.session(), queue, expected, **kwargs)
finally: c.close()
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 2ee2e291e2..0efb8182ec 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1327,28 +1327,25 @@ class TransactionTests(HaBrokerTest):
sb.close()
return tx
- def tx_subscriptions(self, broker):
- """Return list of queue names for tx subscriptions"""
- return [q for q in broker.agent.repsub_queues()
- if q.startswith("qpid.ha-tx")]
-
def test_tx_simple_commit(self):
cluster = HaCluster(self, 2, test_store=True, wait=True)
tx = self.tx_simple_setup(cluster)
tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
-
- # NOTE: backup does not process transactional dequeues until prepare
- cluster[1].assert_browse_backup("a", ["x","y","z"])
- cluster[1].assert_browse_backup("b", ['0', '1', '2'])
-
tx.acknowledge()
+ # Pre transaction - messages are acquired on primary but not yet dequeued
+ # so still there on backup.
+ cluster[0].assert_browse_backup("a", [])
+ cluster[1].assert_browse_backup("a", ['x', 'y', 'z'])
+ for b in cluster:
+ b.assert_browse_backup("b", ['0', '1', '2'])
tx.commit()
tx.sync()
tx.close()
+ # Post transaction: all synced.
for b in cluster:
- self.assert_simple_commit_outcome(b, tx_queues)
+ b.assert_browse_backup("a", [])
+ b.assert_browse_backup("b", ['0', '1', '2', "x","y","z"])
# Verify non-tx dequeue is replicated correctly
c = cluster.connect(0, protocol=self.tx_protocol)
@@ -1360,121 +1357,22 @@ class TransactionTests(HaBrokerTest):
c.close()
tx.connection.close()
-
- def check_enq_deq(self, cluster, queue, expect):
- for b in cluster:
- q = b.agent.getQueue(queue)
- self.assertEqual(
- (b.name,)+expect,
- (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues))
-
- def test_tx_enq_notx_deq(self):
- """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
- cluster = HaCluster(self, 2, test_store=True)
- c = cluster.connect(0, protocol=self.tx_protocol)
-
- tx = c.session(transactional=True)
- c.session().sender("qq;{create:always}").send("m1")
- tx.sender("qq;{create:always}").send("tx")
- tx.commit()
- tx.close()
- c.session().sender("qq;{create:always}").send("m2")
- self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0))
-
- notx = c.session()
- self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))])
- notx.acknowledge()
- self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0))
- for b in cluster: b.assert_browse_backup('qq', [], msg=b)
- for b in cluster: self.assert_tx_clean(b)
-
- def test_tx_enq_notx_deq_qpid_send(self):
- """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
- cluster = HaCluster(self, 2, test_store=True)
-
- self.popen(
- ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1',
- '--content-string=foo']
- ).assert_exit_ok()
- for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b)
- self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0))
-
- self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok()
- self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0))
- for b in cluster: b.assert_browse_backup('qq', [], msg=b)
- for b in cluster: self.assert_tx_clean(b)
-
- def assert_tx_clean(self, b):
- """Verify that there are no transaction artifacts
- (exchanges, queues, subscriptions) on b."""
- class FunctionCache: # Call a function and cache the result.
- def __init__(self, f): self.f, self.value = f, None
- def __call__(self): self.value = self.f(); return self.value
-
- txq= FunctionCache(b.agent.tx_queues)
- assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value)
- txsub = FunctionCache(lambda: self.tx_subscriptions(b))
- assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value)
- # TODO aconway 2013-10-15: TX exchanges don't show up in management.
-
- def assert_simple_commit_outcome(self, b, tx_queues):
- b.assert_browse_backup("a", [], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
- # Check for expected actions on the store
- expect = """<enqueue a x>
-<enqueue a y>
-<enqueue a z>
-<begin tx 1>
-<dequeue a x tx=1>
-<dequeue a y tx=1>
-<dequeue a z tx=1>
-<commit tx=1>
-"""
- self.assertEqual(expect, open_read(b.store_log), msg=b)
- self.assert_tx_clean(b)
-
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster)
tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
tx.rollback()
- tx.close() # For clean test.
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+
+ for b in cluster:
+ b.assert_browse_backup("a", ["x","y","z"])
+ b.assert_browse_backup("b", ['0', '1', '2'])
+
+ tx.close()
tx.connection.close()
- def assert_simple_rollback_outcome(self, b, tx_queues):
- b.assert_browse_backup("a", ["x","y","z"], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
- # Check for expected actions on the store
- expect = """<enqueue a x>
-<enqueue a y>
-<enqueue a z>
-"""
- self.assertEqual(open_read(b.store_log), expect, msg=b)
- self.assert_tx_clean(b)
def test_tx_simple_failure(self):
- """Verify we throw TransactionAborted if there is a store error during a transaction"""
- cluster = HaCluster(self, 3, test_store=True)
- tx = self.tx_simple_setup(cluster)
- tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
- tx.acknowledge()
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- cluster.bounce(0) # Should cause roll-back
- tx.connection.session() # Wait for reconnect
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
- self.assertRaises(qm.TransactionAborted, tx.sync)
- self.assertRaises(qm.TransactionAborted, tx.commit)
- try: tx.connection.close()
- except qm.TransactionAborted: pass # Occasionally get exception on close.
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
- finally: l.restore()
-
- def test_tx_simple_failover(self):
"""Verify we throw TransactionAborted if there is a fail-over during a transaction"""
cluster = HaCluster(self, 3, test_store=True)
tx = self.tx_simple_setup(cluster)
@@ -1485,79 +1383,15 @@ class TransactionTests(HaBrokerTest):
try:
cluster.bounce(0) # Should cause roll-back
tx.connection.session() # Wait for reconnect
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
self.assertRaises(qm.TransactionAborted, tx.sync)
self.assertRaises(qm.TransactionAborted, tx.commit)
try: tx.connection.close()
except qm.TransactionAborted: pass # Occasionally get exception on close.
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ for b in cluster:
+ b.assert_browse_backup("a", ["x","y","z"])
+ b.assert_browse_backup("b", ['0', '1', '2'])
finally: l.restore()
- def test_tx_unknown_failover(self):
- """Verify we throw TransactionUnknown if there is a failure during commit"""
- cluster = HaCluster(self, 3, test_store=True)
- tx = self.tx_simple_setup(cluster)
- tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
- tx.acknowledge()
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- os.kill(cluster[2].pid, signal.SIGSTOP) # Delay prepare response
- class CommitThread(Thread):
- def run(self):
- try: tx.commit()
- except Exception, e:
- self.error = e
- t = CommitThread()
- t.start() # Commit in progress
- t.join(timeout=0.01)
- self.assertTrue(t.isAlive())
- cluster.bounce(0)
- os.kill(cluster[2].pid, signal.SIGCONT)
- t.join()
- try: raise t.error
- except qm.TransactionUnknown: pass
- for b in cluster: self.assert_tx_clean(b)
- try: tx.connection.close()
- except qm.TransactionUnknown: pass # Occasionally get exception on close.
- finally: l.restore()
-
- def test_tx_no_backups(self):
- """Test the special case of a TX where there are no backups"""
-
- # Test commit
- cluster = HaCluster(self, 1, test_store=True)
- tx = self.tx_simple_setup(cluster)
- tx.acknowledge()
- tx.commit()
- tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
- tx.close()
- self.assert_simple_commit_outcome(cluster[0], tx_queues)
-
- # Test rollback
- cluster = HaCluster(self, 1, test_store=True)
- tx = self.tx_simple_setup(cluster)
- tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
- tx.acknowledge()
- tx.rollback()
- tx.sync()
- tx.close()
- self.assert_simple_rollback_outcome(cluster[0], tx_queues)
-
- def test_tx_backup_fail(self):
- cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]])
- c = cluster[0].connect(protocol=self.tx_protocol)
- tx = c.session(transactional=True)
- s = tx.sender("q;{create:always,node:{durable:true}}")
- for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True))
- def commit_sync(): tx.commit(); tx.sync()
- self.assertRaises(qm.TransactionAborted, commit_sync)
- for b in cluster: b.assert_browse_backup("q", [])
- self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
- self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<abort tx=1>\n")
-
def test_tx_join_leave(self):
"""Test cluster members joining/leaving cluster.
Also check that tx-queues are cleaned up at end of transaction."""
@@ -1568,13 +1402,11 @@ class TransactionTests(HaBrokerTest):
tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("a", sync=True)
- self.assertEqual([1,1,1], [len(b.agent.tx_queues()) for b in cluster])
cluster[1].kill(final=False)
s.send("b")
tx.commit()
tx.connection.close()
for b in [cluster[0],cluster[2]]:
- self.assert_tx_clean(b)
b.assert_browse_backup("q", ["a","b"], msg=b)
# Joining
tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True)
@@ -1583,7 +1415,6 @@ class TransactionTests(HaBrokerTest):
cluster.restart(1) # Not a part of the current transaction.
tx.commit()
tx.connection.close()
- for b in cluster: self.assert_tx_clean(b)
# The new member is not in the tx but receives the results normal replication.
for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b)
@@ -1596,7 +1427,6 @@ class TransactionTests(HaBrokerTest):
for s in sessions:
sn = s.sender("qq;{create:always,node:{durable:true}}")
sn.send(qm.Message("foo", durable=True))
- self.assertEqual(n, len(cluster[1].agent.tx_queues()))
threads = [ Thread(target=s.commit) for s in sessions]
for t in threads: t.start()
cluster[0].ready(timeout=1) # Check for deadlock