summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/CMakeLists.txt2
-rw-r--r--qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp66
-rw-r--r--qpid/cpp/src/qpid/broker/AsyncCommandCallback.h63
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp35
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp103
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h110
-rw-r--r--qpid/cpp/src/qpid/broker/TxAccept.cpp27
-rw-r--r--qpid/cpp/src/qpid/broker/TxAccept.h2
-rw-r--r--qpid/cpp/src/qpid/broker/TxBuffer.cpp63
-rw-r--r--qpid/cpp/src/qpid/broker/TxBuffer.h21
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp30
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h7
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp93
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h14
-rw-r--r--qpid/cpp/src/tests/TransactionObserverTest.cpp9
-rw-r--r--qpid/cpp/src/tests/TxBufferTest.cpp11
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py46
18 files changed, 496 insertions, 216 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index c018bcba76..aa93b855f7 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -1224,6 +1224,8 @@ set (qpidbroker_SOURCES
${ssl_SOURCES}
qpid/amqp_0_10/Connection.h
qpid/amqp_0_10/Connection.cpp
+ qpid/broker/AsyncCommandCallback.h
+ qpid/broker/AsyncCommandCallback.cpp
qpid/broker/Broker.cpp
qpid/broker/Credit.cpp
qpid/broker/Exchange.cpp
diff --git a/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp
new file mode 100644
index 0000000000..2b1f3ad8e4
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsyncCommandCallback.h"
+#include "SessionOutputException.h"
+
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+
+AsyncCommandCallback::AsyncCommandCallback(SessionState& ss, Command f) :
+ AsyncCommandContext(ss), command(f), channel(ss.getChannel())
+{}
+
+void AsyncCommandCallback::completed(bool sync) {
+ if (sync)
+ doCommand(); // In initiating thread, execute now.
+ else
+ completerContext->schedule(
+ boost::bind(&AsyncCommandCallback::complete,
+ boost::intrusive_ptr<AsyncCommandCallback>(this)));
+}
+
+boost::intrusive_ptr<AsyncCompletion::Callback> AsyncCommandCallback::clone() {
+ return new AsyncCommandCallback(*this);
+}
+
+void AsyncCommandCallback::complete() {
+ try{
+ doCommand();
+ } catch (const SessionException& e) {
+ throw SessionOutputException(e, channel);
+ } catch (const std::exception& e) {
+ throw SessionOutputException(InternalErrorException(e.what()), channel);
+ }
+}
+
+void AsyncCommandCallback::doCommand() {
+ SessionState* session = completerContext->getSession();
+ if (session && session->isAttached())
+ session->completeCommand(id, false, requiresSync, command());
+ else
+ throw InternalErrorException("Cannot complete command, no session");
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h
new file mode 100644
index 0000000000..dd8214683a
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h
@@ -0,0 +1,63 @@
+#ifndef QPID_BROKER_ASYNCCOMMANDCALLBACK_H
+#define QPID_BROKER_ASYNCCOMMANDCALLBACK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/SessionState.h"
+#include "qpid/broker/AsyncCompletion.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * An AsyncCompletion::Callback that executes the final part of an
+ * async-completed command in the proper context:
+ *
+ * - Complete synchronously: Called in the initiating thread.
+ * - Complete asynchronously: Scheduled on the IO thread.
+ *
+ * Errors thrown by the command are returned to the correct session on the client
+ * even if we are executed via an IO callback.
+ */
+class AsyncCommandCallback : public SessionState::AsyncCommandContext {
+ public:
+ /** Command function returns a string containing the encoded result of the
+ * command, or empty for no result. It may raise an exception.
+ */
+ typedef boost::function<std::string ()> Command;
+
+ AsyncCommandCallback(SessionState& ss, Command f);
+
+ void completed(bool sync);
+
+ boost::intrusive_ptr<AsyncCompletion::Callback> clone();
+
+ private:
+ void complete();
+ void doCommand();
+
+ Command command;
+ uint16_t channel;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_ASYNCCOMMANDCALLBACK_H*/
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 19cb2f30c3..5a11db81bb 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -52,7 +52,7 @@ class RecoverableQueueImpl : public RecoverableQueue
public:
RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {}
~RecoverableQueueImpl() {};
- void setPersistenceId(uint64_t id);
+ void setPersistenceId(uint64_t id);
uint64_t getPersistenceId() const;
const std::string& getName() const;
void setExternalQueueStore(ExternalQueueStore* inst);
@@ -126,7 +126,7 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff
return m;
}
-RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
+RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn)
{
boost::intrusive_ptr<DtxBuffer> buffer(new DtxBuffer());
@@ -212,7 +212,7 @@ const std::string& RecoverableQueueImpl::getName() const
{
return queue->getName();
}
-
+
void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst)
{
queue->setExternalQueueStore(inst);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 469fe760a0..94e680c211 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/SessionState.h"
+#include "qpid/broker/AsyncCommandCallback.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/DeliverableMessage.h"
@@ -110,6 +111,7 @@ void SemanticState::closed() {
cancel(i->second);
}
closeComplete = true;
+ if (txBuffer) txBuffer->rollback();
}
}
@@ -166,32 +168,47 @@ bool SemanticState::cancel(const string& tag)
void SemanticState::startTx()
{
+ accumulatedAck.clear();
txBuffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer());
session.getBroker().getBrokerObservers().startTx(txBuffer);
session.startTx(); //just to update statistics
}
+namespace {
+struct StartTxOnExit {
+ SemanticState& session;
+ StartTxOnExit(SemanticState& ss) : session(ss) {}
+ ~StartTxOnExit() { session.startTx(); }
+};
+} // namespace
+
void SemanticState::commit(MessageStore* const store)
{
if (!txBuffer) throw
- CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
- session.commitTx(); //just to update statistics
+ CommandInvalidException(
+ QPID_MSG("Session has not been selected for use with transactions"));
+ // Start a new TX regardless of outcome of this one.
+ StartTxOnExit e(*this);
+ session.getCurrentCommand().setCompleteSync(false); // Async completion
+ txBuffer->begin(); // Begin async completion.
+ session.commitTx(); //just to update statistics
TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
txBuffer->enlist(txAck);
- if (txBuffer->commitLocal(store)) {
- accumulatedAck.clear();
- } else {
- throw InternalErrorException(QPID_MSG("Commit failed"));
- }
+ // In a HA cluster, tx.commit may complete asynchronously.
+ txBuffer->startCommit(store);
+ AsyncCommandCallback callback(
+ session,
+ boost::bind(&TxBuffer::endCommit, txBuffer, store));
+ txBuffer->end(callback);
}
void SemanticState::rollback()
{
if (!txBuffer)
throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
- session.rollbackTx(); //just to update statistics
+ session.rollbackTx(); // Just to update statistics
txBuffer->rollback();
- accumulatedAck.clear();
+ startTx(); // Start a new TX automatically.
}
void SemanticState::selectDtx()
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 3995eb85dc..8e128ae0df 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -191,26 +191,21 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
return status;
}
-void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
- currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks).
- Invoker::Result invocation = invoke(adapter, *method);
- if (currentCommandComplete) receiverCompleted(id);
-
- if (!invocation.wasHandled()) {
+void SessionState::handleCommand(framing::AMQMethodBody* method) {
+ Invoker::Result result = invoke(adapter, *method);
+ if (!result.wasHandled())
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
- } else if (invocation.hasResult()) {
- getProxy().getExecution().result(id, invocation.getResult());
- }
-
- if (method->isSync() && currentCommandComplete) {
- sendAcceptAndCompletion();
- }
+ if (currentCommand.isCompleteSync())
+ completeCommand(
+ currentCommand.getId(), false/*needAccept*/, currentCommand.isSyncRequired(),
+ result.getResult());
}
-void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
+
+void SessionState::handleContent(AMQFrame& frame)
{
if (frame.getBof() && frame.getBos()) //start of frameset
- msgBuilder.start(id);
+ msgBuilder.start(currentCommand.getId());
intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(msgBuilder.getMessage());
msgBuilder.handle(frame);
if (frame.getEof() && frame.getEos()) {//end of frameset
@@ -244,23 +239,27 @@ void SessionState::sendAcceptAndCompletion()
sendCompletion();
}
-/** Invoked when the given inbound message is finished being processed
- * by all interested parties (eg. it is done being enqueued to all queues,
- * its credit has been accounted for, etc). At this point, msg is considered
- * by this receiver as 'completed' (as defined by AMQP 0_10)
+/** Invoked when the given command is finished being processed by all interested
+ * parties (eg. it is done being enqueued to all queues, its credit has been
+ * accounted for, etc). At this point the command is considered by this
+ * receiver as 'completed' (as defined by AMQP 0_10)
*/
-void SessionState::completeRcvMsg(SequenceNumber id,
- bool requiresAccept,
- bool requiresSync)
+void SessionState::completeCommand(SequenceNumber id,
+ bool requiresAccept,
+ bool requiresSync,
+ const std::string& result=std::string())
{
bool callSendCompletion = false;
receiverCompleted(id);
if (requiresAccept)
- // will cause msg's seq to appear in the next message.accept we send.
+ // will cause cmd's seq to appear in the next message.accept we send.
accepted.add(id);
+ if (!result.empty())
+ getProxy().getExecution().result(id, result);
+
// Are there any outstanding Execution.Sync commands pending the
- // completion of this msg? If so, complete them.
+ // completion of this cmd? If so, complete them.
while (!pendingExecutionSyncs.empty() &&
receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
const SequenceNumber id = pendingExecutionSyncs.front();
@@ -277,14 +276,15 @@ void SessionState::completeRcvMsg(SequenceNumber id,
}
void SessionState::handleIn(AMQFrame& frame) {
- SequenceNumber commandId = receiverGetCurrent();
//TODO: make command handling more uniform, regardless of whether
//commands carry content.
AMQMethodBody* m = frame.getMethod();
+ currentCommand = CurrentCommand(receiverGetCurrent(), m && m->isSync());
+
if (m == 0 || m->isContentBearing()) {
- handleContent(frame, commandId);
+ handleContent(frame);
} else if (frame.getBof() && frame.getEof()) {
- handleCommand(frame.getMethod(), commandId);
+ handleCommand(frame.getMethod());
} else {
throw InternalErrorException("Cannot handle multi-frame command segments yet");
}
@@ -345,9 +345,9 @@ void SessionState::setTimeout(uint32_t) { }
// (called via the invoker() in handleCommand() above)
void SessionState::addPendingExecutionSync()
{
- SequenceNumber syncCommandId = receiverGetCurrent();
+ SequenceNumber syncCommandId = currentCommand.getId();
if (receiverGetIncomplete().front() < syncCommandId) {
- currentCommandComplete = false;
+ currentCommand.setCompleteSync(false);
pendingExecutionSyncs.push(syncCommandId);
asyncCommandCompleter->flushPendingMessages();
QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
@@ -389,25 +389,16 @@ void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
*/
session = 0;
QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
- completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
+ completerContext->scheduleCommandCompletion(id, requiresAccept, requiresSync);
} else {
// this path runs directly from the ac->end() call in handleContent() above,
// so *session is definately valid.
if (session->isAttached()) {
QPID_LOG(debug, ": receive completed for msg seq=" << id);
- session->completeRcvMsg(id, requiresAccept, requiresSync);
+ session->completeCommand(id, requiresAccept, requiresSync);
}
}
- completerContext = boost::intrusive_ptr<AsyncCommandCompleter>();
-}
-
-
-/** Scheduled from an asynchronous command's completed callback to run on
- * the IO thread.
- */
-void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
-{
- ctxt->completeCommands();
+ completerContext.reset();
}
@@ -450,22 +441,27 @@ void SessionState::AsyncCommandCompleter::flushPendingMessages()
/** mark an ingress Message.Transfer command as completed.
* This method must be thread safe - it may run on any thread.
*/
-void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
- bool requiresAccept,
- bool requiresSync)
+void SessionState::AsyncCommandCompleter::scheduleCommandCompletion(
+ SequenceNumber cmd,
+ bool requiresAccept,
+ bool requiresSync)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
if (session && isAttached) {
- MessageInfo msg(cmd, requiresAccept, requiresSync);
- completedMsgs.push_back(msg);
- if (completedMsgs.size() == 1) {
- session->getConnection().requestIOProcessing(boost::bind(&schedule,
- session->asyncCommandCompleter));
+ CommandInfo info(cmd, requiresAccept, requiresSync);
+ completedCmds.push_back(info);
+ if (completedCmds.size() == 1) {
+ session->getConnection().requestIOProcessing(
+ boost::bind(&AsyncCommandCompleter::completeCommands,
+ session->asyncCommandCompleter));
}
}
}
+void SessionState::AsyncCommandCompleter::schedule(boost::function<void()> f) {
+ if (session && isAttached) session->getConnection().requestIOProcessing(f);
+}
/** Cause the session to complete all completed commands.
* Executes on the IO thread.
@@ -476,12 +472,13 @@ void SessionState::AsyncCommandCompleter::completeCommands()
// when session is destroyed, it clears the session pointer via cancel().
if (session && session->isAttached()) {
- for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
- msg != completedMsgs.end(); ++msg) {
- session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
+ for (std::vector<CommandInfo>::iterator cmd = completedCmds.begin();
+ cmd != completedCmds.end(); ++cmd) {
+ session->completeCommand(
+ cmd->cmd, cmd->requiresAccept, cmd->requiresSync);
}
}
- completedMsgs.clear();
+ completedCmds.clear();
}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 500a211a6f..ad29c4427b 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -132,13 +132,12 @@ class SessionState : public qpid::SessionState,
void commitTx();
void rollbackTx();
+ /** Send result and completion for a given command to the client. */
+ void completeCommand(SequenceNumber id, bool requiresAccept, bool requiresSync,
+ const std::string& result);
private:
- void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
- void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
-
- // indicate that the given ingress msg has been completely received by the
- // broker, and the msg's message.transfer command can be considered completed.
- void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync);
+ void handleCommand(framing::AMQMethodBody* method);
+ void handleContent(framing::AMQFrame& frame);
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
@@ -160,7 +159,37 @@ class SessionState : public qpid::SessionState,
// sequence numbers for pending received Execution.Sync commands
std::queue<SequenceNumber> pendingExecutionSyncs;
- bool currentCommandComplete;
+
+ public:
+
+ /** Information about the currently executing command.
+ * Can only be used in the IO thread during command execution.
+ */
+ class CurrentCommand {
+ public:
+ CurrentCommand(
+ SequenceNumber id_=0, bool syncRequired_=false, bool completeSync_=true ) :
+ id(id_), syncRequired(syncRequired_), completeSync(completeSync_)
+ {}
+
+ SequenceNumber getId() const { return id; }
+
+ /**@return true if the sync flag was set for the command. */
+ bool isSyncRequired() const { return syncRequired; }
+
+ /**@return true if the command should be completed synchronously
+ * in the handling thread.
+ */
+ bool isCompleteSync() const { return completeSync; }
+ void setCompleteSync(bool b) { completeSync = b; }
+
+ private:
+ SequenceNumber id; ///< Command identifier.
+ bool syncRequired; ///< True if sync flag set for the command.
+ bool completeSync; ///< Will be completed by handCommand.
+ };
+
+ CurrentCommand& getCurrentCommand() { return currentCommand; }
/** This class provides a context for completing asynchronous commands in a thread
* safe manner. Asynchronous commands save their completion state in this class.
@@ -175,15 +204,17 @@ class SessionState : public qpid::SessionState,
bool isAttached;
qpid::sys::Mutex completerLock;
- // special-case message.transfer commands for optimization
- struct MessageInfo {
+ struct CommandInfo {
SequenceNumber cmd; // message.transfer command id
bool requiresAccept;
bool requiresSync;
- MessageInfo(SequenceNumber c, bool a, bool s)
- : cmd(c), requiresAccept(a), requiresSync(s) {}
+
+ CommandInfo(
+ SequenceNumber c, bool a, bool s)
+ : cmd(c), requiresAccept(a), requiresSync(s) {}
};
- std::vector<MessageInfo> completedMsgs;
+
+ std::vector<CommandInfo> completedCmds;
// If an ingress message does not require a Sync, we need to
// hold a reference to it in case an Execution.Sync command is received and we
// have to manually flush the message.
@@ -192,9 +223,6 @@ class SessionState : public qpid::SessionState,
/** complete all pending commands, runs in IO thread */
void completeCommands();
- /** for scheduling a run of "completeCommands()" on the IO thread */
- static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
-
public:
AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
~AsyncCommandCompleter() {};
@@ -203,15 +231,21 @@ class SessionState : public qpid::SessionState,
void addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m);
void deletePendingMessage(SequenceNumber id);
void flushPendingMessages();
- /** schedule the processing of a completed ingress message.transfer command */
- void scheduleMsgCompletion(SequenceNumber cmd,
- bool requiresAccept,
- bool requiresSync);
+ /** schedule the processing of command completion. */
+ void scheduleCommandCompletion(SequenceNumber cmd,
+ bool requiresAccept,
+ bool requiresSync);
+ void schedule(boost::function<void()>);
void cancel(); // called by SessionState destructor.
void attached(); // called by SessionState on attach()
void detached(); // called by SessionState on detach()
+
+ SessionState* getSession() const { return session; }
};
- boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
+
+ boost::intrusive_ptr<AsyncCommandCompleter> getAsyncCommandCompleter() {
+ return asyncCommandCompleter;
+ }
/** Abstract class that represents a single asynchronous command that is
* pending completion.
@@ -219,15 +253,29 @@ class SessionState : public qpid::SessionState,
class AsyncCommandContext : public AsyncCompletion::Callback
{
public:
- AsyncCommandContext( SessionState *ss, SequenceNumber _id )
- : id(_id), completerContext(ss->asyncCommandCompleter) {}
+ AsyncCommandContext(SessionState& ss )
+ : id(ss.getCurrentCommand().getId()),
+ requiresSync(ss.getCurrentCommand().isSyncRequired()),
+ completerContext(ss.getAsyncCommandCompleter())
+ {}
+
+ AsyncCommandContext(const AsyncCommandContext& x) :
+ id(x.id), requiresSync(x.requiresSync), completerContext(x.completerContext)
+ {}
+
virtual ~AsyncCommandContext() {}
protected:
SequenceNumber id;
+ bool requiresSync;
boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
};
+
+ private:
+ boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
+ CurrentCommand currentCommand;
+
/** incomplete Message.transfer commands - inbound to broker from client
*/
class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
@@ -235,21 +283,17 @@ class SessionState : public qpid::SessionState,
public:
IncompleteIngressMsgXfer( SessionState *ss,
boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m)
- : AsyncCommandContext(ss, m->getCommandId()),
+ : AsyncCommandContext(*ss),
session(ss),
msg(m),
requiresAccept(m->requiresAccept()),
requiresSync(m->getFrames().getMethod()->isSync()),
- pending(false) {}
- IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
- : AsyncCommandContext(x.session, x.msg->getCommandId()),
- session(x.session),
- msg(x.msg),
- requiresAccept(x.requiresAccept),
- requiresSync(x.requiresSync),
- pending(x.pending) {}
+ pending(false)
+ {
+ assert(id == m->getCommandId());
+ }
- virtual ~IncompleteIngressMsgXfer() {};
+ virtual ~IncompleteIngressMsgXfer() {}
virtual void completed(bool);
virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
@@ -262,7 +306,7 @@ class SessionState : public qpid::SessionState,
bool pending; // true if msg saved on pending list...
};
- friend class SessionManager;
+ friend class SessionManager;
};
diff --git a/qpid/cpp/src/qpid/broker/TxAccept.cpp b/qpid/cpp/src/qpid/broker/TxAccept.cpp
index ee30dff957..496f8a2a42 100644
--- a/qpid/cpp/src/qpid/broker/TxAccept.cpp
+++ b/qpid/cpp/src/qpid/broker/TxAccept.cpp
@@ -33,14 +33,20 @@ using qpid::framing::SequenceNumber;
TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) :
acked(_acked), unacked(_unacked)
-{
- for(SequenceSet::RangeIterator i = acked.rangesBegin(); i != acked.rangesEnd(); ++i)
- ranges.push_back(DeliveryRecord::findRange(unacked, i->first(), i->last()));
-}
+{}
void TxAccept::each(boost::function<void(DeliveryRecord&)> f) {
- for(AckRanges::iterator i = ranges.begin(); i != ranges.end(); ++i)
- for_each(i->start, i->end, f);
+ DeliveryRecords::iterator dr = unacked.begin();
+ SequenceSet::iterator seq = acked.begin();
+ while(dr != unacked.end() && seq != acked.end()) {
+ if (dr->getId() == *seq) {
+ f(*dr);
+ ++dr;
+ ++seq;
+ }
+ else if (dr->getId() < *seq) ++dr;
+ else if (dr->getId() > *seq) ++seq;
+ }
}
bool TxAccept::prepare(TransactionContext* ctxt) throw()
@@ -63,12 +69,11 @@ void TxAccept::commit() throw()
each(bind(&DeliveryRecord::committed, _1));
each(bind(&DeliveryRecord::setEnded, _1));
//now remove if isRedundant():
- if (!ranges.empty()) {
- DeliveryRecords::iterator begin = ranges.front().start;
- DeliveryRecords::iterator end = ranges.back().end;
+ if (!acked.empty()) {
+ AckRange r = DeliveryRecord::findRange(unacked, acked.front(), acked.back());
DeliveryRecords::iterator removed =
- remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant));
- unacked.erase(removed, end);
+ remove_if(r.start, r.end, mem_fun_ref(&DeliveryRecord::isRedundant));
+ unacked.erase(removed, r.end);
}
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to commit: " << e.what());
diff --git a/qpid/cpp/src/qpid/broker/TxAccept.h b/qpid/cpp/src/qpid/broker/TxAccept.h
index b4d55155a8..97e82ffa3f 100644
--- a/qpid/cpp/src/qpid/broker/TxAccept.h
+++ b/qpid/cpp/src/qpid/broker/TxAccept.h
@@ -37,14 +37,12 @@ namespace broker {
* a transactional channel.
*/
class TxAccept : public TxOp {
- typedef std::vector<AckRange> AckRanges;
typedef boost::shared_ptr<TransactionObserver> ObserverPtr;
void each(boost::function<void(DeliveryRecord&)>);
framing::SequenceSet acked;
DeliveryRecords& unacked;
- AckRanges ranges;
public:
/**
diff --git a/qpid/cpp/src/qpid/broker/TxBuffer.cpp b/qpid/cpp/src/qpid/broker/TxBuffer.cpp
index a8df4fb214..6dc2f5c2f4 100644
--- a/qpid/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/qpid/cpp/src/qpid/broker/TxBuffer.cpp
@@ -21,22 +21,28 @@
#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TransactionObserver.h"
#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
#include <boost/mem_fn.hpp>
#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker{
+
using boost::mem_fn;
-using namespace qpid::broker;
+using framing::InternalErrorException;
TxBuffer::TxBuffer() : observer(new NullTransactionObserver) {}
bool TxBuffer::prepare(TransactionContext* const ctxt)
{
+ // The observer may call startCompleter to delay completion.
if (!observer->prepare()) return false;
for(op_iterator i = ops.begin(); i != ops.end(); i++){
- if(!(*i)->prepare(ctxt)){
- return false;
- }
+ if(!(*i)->prepare(ctxt)) return false;
}
+ // At this point prepare has succeeded locally but if completion is delayed,
+ // then completing threads may call setError to indicate an error.
return true;
}
@@ -60,24 +66,37 @@ void TxBuffer::enlist(TxOp::shared_ptr op)
ops.push_back(op);
}
-bool TxBuffer::commitLocal(TransactionalStore* const store)
+void TxBuffer::startCommit(TransactionalStore* const store)
{
- if (!store) return false;
- try {
- std::auto_ptr<TransactionContext> ctxt = store->begin();
- if (prepare(ctxt.get())) {
- store->commit(*ctxt);
- commit();
- return true;
- } else {
- store->abort(*ctxt);
- rollback();
- return false;
- }
- } catch (std::exception& e) {
- QPID_LOG(error, "Commit failed with exception: " << e.what());
- } catch (...) {
- QPID_LOG(error, "Commit failed with unknown exception");
+ if (!store) throw Exception("Can't commit transaction, no store.");
+ txContext.reset(store->begin().release());
+ if (!prepare(txContext.get()))
+ setError("Transaction prepare failed.");
+}
+
+// Called when async completion is complete.
+std::string TxBuffer::endCommit(TransactionalStore* const store) {
+ std::string e;
+ {
+ sys::Mutex::ScopedLock l(errorLock);
+ e = error;
+ }
+ if (!e.empty()) {
+ store->abort(*txContext);
+ rollback();
+ throw InternalErrorException(e);
}
- return false;
+ else {
+ store->commit(*txContext);
+ commit();
+ }
+ return std::string(); // There is no result from tx.commit
+}
+
+void TxBuffer::setError(const std::string& e) {
+ QPID_LOG(error, "Asynchronous transaction error: " << e);
+ sys::Mutex::ScopedLock l(errorLock);
+ error = e;
}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/TxBuffer.h b/qpid/cpp/src/qpid/broker/TxBuffer.h
index f4cdcb3ae2..3a6db37d1d 100644
--- a/qpid/cpp/src/qpid/broker/TxBuffer.h
+++ b/qpid/cpp/src/qpid/broker/TxBuffer.h
@@ -26,6 +26,7 @@
#include "qpid/broker/TransactionalStore.h"
#include "qpid/broker/TxOp.h"
#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/sys/Mutex.h"
#include <algorithm>
#include <functional>
#include <vector>
@@ -68,10 +69,13 @@ class TransactionObserver;
* asynchronously if the broker is part of a HA cluster.
*/
class TxBuffer : public AsyncCompletion {
- private:
+ private:
typedef std::vector<TxOp::shared_ptr>::iterator op_iterator;
std::vector<TxOp::shared_ptr> ops;
boost::shared_ptr<TransactionObserver> observer;
+ std::auto_ptr<TransactionContext> txContext;
+ std::string error;
+ sys::Mutex errorLock;
public:
QPID_BROKER_EXTERN TxBuffer();
@@ -114,11 +118,14 @@ class TxBuffer : public AsyncCompletion {
QPID_BROKER_EXTERN void rollback();
/**
- * Helper method for managing the process of server local
- * commit
+ * Start a local commit - may complete asynchronously.
*/
- QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store);
+ QPID_BROKER_EXTERN void startCommit(TransactionalStore* const store);
+ /** End a commit, called via async completion.
+ *@return encoded result, not used here.
+ */
+ QPID_BROKER_EXTERN std::string endCommit(TransactionalStore* const store);
QPID_BROKER_EXTERN void setObserver(boost::shared_ptr<TransactionObserver> o) {
observer = o;
@@ -127,6 +134,12 @@ class TxBuffer : public AsyncCompletion {
QPID_BROKER_EXTERN boost::shared_ptr<TransactionObserver> getObserver() const {
return observer;
}
+
+ /** Set an error to be raised from endCommit when the commit completes.
+ * Called from completer threads if we are doing async completion.
+ * This is the only TxBuffer function called outside the IO thread.
+ */
+ void setError(const std::string& message);
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 3a7ab3b0fc..0c1858ceb1 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -232,15 +232,16 @@ void Primary::skip(
if (i != replicas.end()) i->second->addSkip(ids);
}
+// Called from ReplicatingSubscription::cancel
void Primary::removeReplica(const ReplicatingSubscription& rs) {
- sys::Mutex::ScopedLock l(lock);
- replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
-
- TxMap::const_iterator i = txMap.find(rs.getQueue()->getName());
- if (i != txMap.end()) {
- boost::shared_ptr<PrimaryTxObserver> tx = i->second.lock();
- if (tx) tx->cancel(rs);
+ boost::shared_ptr<PrimaryTxObserver> tx;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
+ TxMap::const_iterator i = txMap.find(rs.getQueue()->getName());
+ if (i != txMap.end()) tx = i->second.lock();
}
+ if (tx) tx->cancel(rs); // Outside of lock.
}
// NOTE: Called with queue registry lock held.
@@ -401,19 +402,22 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards)
backup->startCatchup();
}
-shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() {
- shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
+shared_ptr<PrimaryTxObserver> Primary::makeTxObserver(
+ const boost::intrusive_ptr<broker::TxBuffer>& txBuffer)
+{
+ shared_ptr<PrimaryTxObserver> observer(
+ new PrimaryTxObserver(*this, haBroker, txBuffer));
observer->initialize();
txMap[observer->getTxQueue()->getName()] = observer;
return observer;
}
-void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& tx) {
- tx->setObserver(makeTxObserver());
+void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& txBuffer) {
+ txBuffer->setObserver(makeTxObserver(txBuffer));
}
-void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& dtx) {
- dtx->setObserver(makeTxObserver());
+void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& ) {
+ QPID_LOG(notice, "DTX transactions in a HA cluster are not yet atomic");
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 97b7f956b7..7f98f06fec 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -63,6 +63,10 @@ class PrimaryTxObserver;
* - sets queue guards on new queues for each backup.
*
* THREAD SAFE: called concurrently in arbitrary connection threads.
+ *
+ * Locking rules: BrokerObserver create/destroy functions are called with
+ * the QueueRegistry lock held. Functions holding Primary::lock *must not*
+ * directly or indirectly call on the queue registry.
*/
class Primary : public Role
{
@@ -126,7 +130,8 @@ class Primary : public Role
void checkReady(RemoteBackupPtr);
void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
void deduplicate();
- boost::shared_ptr<PrimaryTxObserver> makeTxObserver();
+ boost::shared_ptr<PrimaryTxObserver> makeTxObserver(
+ const boost::intrusive_ptr<broker::TxBuffer>&);
mutable sys::Mutex lock;
HaBroker& haBroker;
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 41494694de..ca833cf085 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -42,6 +42,7 @@ namespace ha {
using namespace std;
using namespace qpid::broker;
using namespace qpid::framing;
+using types::Uuid;
// Exchange to receive prepare OK events.
class PrimaryTxObserver::Exchange : public broker::Exchange {
@@ -78,12 +79,15 @@ class PrimaryTxObserver::Exchange : public broker::Exchange {
const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
-PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
- haBroker(hb), broker(hb.getBroker()),
+PrimaryTxObserver::PrimaryTxObserver(
+ Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx
+) :
+ primary(p), haBroker(hb), broker(hb.getBroker()),
replicationTest(hb.getSettings().replicateDefault.get()),
+ txBuffer(tx),
id(true),
exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
- failed(false), ended(false), complete(false)
+ complete(false)
{
logPrefix = "Primary transaction "+shortStr(id)+": ";
@@ -106,8 +110,9 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
txQueue->deliver(TxMembersEvent(members).message());
}
-PrimaryTxObserver::~PrimaryTxObserver() {}
-
+PrimaryTxObserver::~PrimaryTxObserver() {
+ QPID_LOG(debug, logPrefix << "Ended");
+}
void PrimaryTxObserver::initialize() {
boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this()));
@@ -141,37 +146,51 @@ void PrimaryTxObserver::dequeue(
}
}
-void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) {
- boost::shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()));
- assert(primary);
- // Tell replicating subscriptions to skip IDs in the transaction.
- for (UuidSet::iterator b = members.begin(); b != members.end(); ++b)
- for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
- primary->skip(*b, q->first, q->second);
-}
+namespace {
+struct Skip {
+ Uuid backup;
+ boost::shared_ptr<broker::Queue> queue;
+ ReplicationIdSet ids;
+
+ Skip(const Uuid& backup_,
+ const boost::shared_ptr<broker::Queue>& queue_,
+ const ReplicationIdSet& ids_) :
+ backup(backup_), queue(queue_), ids(ids_) {}
+
+ void skip(Primary& p) const { p.skip(backup, queue, ids); }
+};
+} // namespace
bool PrimaryTxObserver::prepare() {
- sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix << "Prepare");
- deduplicate(l);
+ QPID_LOG(debug, logPrefix << "Prepare " << members);
+ vector<Skip> skips;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ for (size_t i = 0; i < members.size(); ++i) txBuffer->startCompleter();
+
+ // Tell replicating subscriptions to skip IDs in the transaction.
+ for (UuidSet::iterator b = members.begin(); b != members.end(); ++b)
+ for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
+ skips.push_back(Skip(*b, q->first, q->second));
+ }
+ // Outside lock
+ for_each(skips.begin(), skips.end(),
+ boost::bind(&Skip::skip, _1, boost::ref(primary)));
txQueue->deliver(TxPrepareEvent().message());
- // TODO aconway 2013-09-04: Blocks the current thread till backups respond.
- // Need a non-blocking approach (e.g. async completion or borrowing a thread)
- while (!unprepared.empty() && !failed) lock.wait();
- return !failed;
+ return true;
}
void PrimaryTxObserver::commit() {
- sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Commit");
+ sys::Mutex::ScopedLock l(lock);
txQueue->deliver(TxCommitEvent().message());
complete = true;
end(l);
}
void PrimaryTxObserver::rollback() {
- sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Rollback");
+ sys::Mutex::ScopedLock l(lock);
txQueue->deliver(TxRollbackEvent().message());
complete = true;
end(l);
@@ -180,8 +199,8 @@ void PrimaryTxObserver::rollback() {
void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
// Don't destroy the tx-queue until the transaction is complete and there
// are no connected subscriptions.
- if (!ended && complete && unfinished.empty()) {
- ended = true;
+ if (txBuffer && complete && unfinished.empty()) {
+ txBuffer.reset(); // Break pointer cycle.
try {
haBroker.getBroker().deleteQueue(txQueue->getName(), haBroker.getUserId(), string());
} catch (const std::exception& e) {
@@ -198,29 +217,33 @@ void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker;
- QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup);
- unprepared.erase(backup);
- lock.notify();
+ if (unprepared.erase(backup)) {
+ QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup);
+ txBuffer->finishCompleter();
+ }
}
void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker;
- QPID_LOG(error, logPrefix << "Backup prepare failed: " << backup);
- unprepared.erase(backup);
- failed = true;
- lock.notify();
+ if (unprepared.erase(backup)) {
+ QPID_LOG(error, logPrefix << "Prepare failed on backup: " << backup);
+ txBuffer->setError(
+ QPID_MSG(logPrefix << "Prepare failed on backup: " << backup));
+ txBuffer->finishCompleter();
+ }
}
void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = rs.getBrokerInfo().getSystemId();
- if (unprepared.find(backup) != unprepared.end()) {
- complete = failed = true; // Canceled before prepared.
- unprepared.erase(backup); // Consider it prepared-fail
+ if (unprepared.erase(backup) ){
+ complete = true; // Cancelled before prepared.
+ txBuffer->setError(
+ QPID_MSG(logPrefix << "Backup disconnected: " << rs.getBrokerInfo()));
+ txBuffer->finishCompleter();
}
unfinished.erase(backup);
- lock.notify();
end(l);
}
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index fb9db25e85..cd6c88ad41 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -24,12 +24,14 @@
#include "types.h"
#include "ReplicationTest.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/broker/TransactionObserver.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
#include "qpid/sys/unordered_map.h"
#include "qpid/sys/Monitor.h"
#include <boost/enable_shared_from_this.hpp>
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
@@ -37,11 +39,13 @@ namespace broker {
class Broker;
class Message;
class Consumer;
+class AsyncCompletion;
}
namespace ha {
class HaBroker;
class ReplicatingSubscription;
+class Primary;
/**
* Observe events in the lifecycle of a transaction.
@@ -62,7 +66,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
public boost::enable_shared_from_this<PrimaryTxObserver>
{
public:
- PrimaryTxObserver(HaBroker&);
+ PrimaryTxObserver(Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&);
~PrimaryTxObserver();
/** Call immediately after constructor, uses shared_from_this. */
@@ -87,7 +91,6 @@ class PrimaryTxObserver : public broker::TransactionObserver,
QueuePtr, ReplicationIdSet, Hasher<QueuePtr> > QueueIdsMap;
void membership(const BrokerInfo::Map&);
- void deduplicate(sys::Mutex::ScopedLock&);
void end(sys::Mutex::ScopedLock&);
void txPrepareOkEvent(const std::string& data);
void txPrepareFailEvent(const std::string& data);
@@ -95,16 +98,19 @@ class PrimaryTxObserver : public broker::TransactionObserver,
sys::Monitor lock;
std::string logPrefix;
+ Primary& primary;
HaBroker& haBroker;
broker::Broker& broker;
ReplicationTest replicationTest;
+ // NOTE: There is an intrusive_ptr cycle between PrimaryTxObserver
+ // and TxBuffer. The cycle is broken in PrimaryTxObserver::end()
+ boost::intrusive_ptr<broker::TxBuffer> txBuffer;
types::Uuid id;
std::string exchangeName;
QueuePtr txQueue;
QueueIdsMap enqueues;
- bool failed, ended, complete;
-
+ bool complete;
UuidSet members; // All members of transaction.
UuidSet unprepared; // Members that have not yet responded to prepare.
UuidSet unfinished; // Members that have not yet disconnected.
diff --git a/qpid/cpp/src/tests/TransactionObserverTest.cpp b/qpid/cpp/src/tests/TransactionObserverTest.cpp
index 2a7d94b1ae..80ef494c21 100644
--- a/qpid/cpp/src/tests/TransactionObserverTest.cpp
+++ b/qpid/cpp/src/tests/TransactionObserverTest.cpp
@@ -79,8 +79,10 @@ struct MockBrokerObserver : public BrokerObserver {
MockBrokerObserver(bool prep_=true) : prep(prep_) {}
void startTx(const intrusive_ptr<TxBuffer>& buffer) {
- tx.reset(new MockTransactionObserver(prep));
- buffer->setObserver(tx);
+ if (!tx) { // Don't overwrite first tx with automatically started second tx.
+ tx.reset(new MockTransactionObserver(prep));
+ buffer->setObserver(tx);
+ }
}
};
@@ -94,7 +96,7 @@ Session simpleTxTransaction(MessagingFixture& fix) {
return txSession;
}
-QPID_AUTO_TEST_CASE(tesTxtCommit) {
+QPID_AUTO_TEST_CASE(testTxCommit) {
MessagingFixture fix;
shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver);
fix.broker->getBrokerObservers().add(brokerObserver);
@@ -114,6 +116,7 @@ QPID_AUTO_TEST_CASE(testTxFail) {
fix.broker->getBrokerObservers().add(brokerObserver);
Session txSession = simpleTxTransaction(fix);
try {
+ ScopedSuppressLogging sl; // Suppress messages for expected error.
txSession.commit();
BOOST_FAIL("Expected exception");
} catch(...) {}
diff --git a/qpid/cpp/src/tests/TxBufferTest.cpp b/qpid/cpp/src/tests/TxBufferTest.cpp
index 4807026ab7..3f052d213e 100644
--- a/qpid/cpp/src/tests/TxBufferTest.cpp
+++ b/qpid/cpp/src/tests/TxBufferTest.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/TxBuffer.h"
#include "unit_test.h"
+#include "test_tools.h"
#include <iostream>
#include <vector>
#include "TxMocks.h"
@@ -50,7 +51,8 @@ QPID_AUTO_TEST_CASE(testCommitLocal)
buffer.enlist(static_pointer_cast<TxOp>(opB));//opB enlisted twice
buffer.enlist(static_pointer_cast<TxOp>(opC));
- BOOST_CHECK(buffer.commitLocal(&store));
+ buffer.startCommit(&store);
+ buffer.endCommit(&store);
store.check();
BOOST_CHECK(store.isCommitted());
opA->check();
@@ -75,7 +77,12 @@ QPID_AUTO_TEST_CASE(testFailOnCommitLocal)
buffer.enlist(static_pointer_cast<TxOp>(opB));
buffer.enlist(static_pointer_cast<TxOp>(opC));
- BOOST_CHECK(!buffer.commitLocal(&store));
+ try {
+ ScopedSuppressLogging sl; // Suppress messages for expected error.
+ buffer.startCommit(&store);
+ buffer.endCommit(&store);
+ BOOST_FAIL("Expected exception");
+ } catch (...) {}
BOOST_CHECK(store.isAborted());
store.check();
opA->check();
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index ad546afc62..79024d48e3 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1346,24 +1346,19 @@ class TransactionTests(HaBrokerTest):
tx.commit()
tx.sync()
+ tx.close()
for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
- def assert_tx_cleanup(self, b, tx_queues):
+ def assert_tx_clean(self, b):
"""Verify that there are no transaction artifacts
(exchanges, queues, subscriptions) on b."""
-
- self.assertEqual(0, len(b.agent().tx_queues()), msg=b)
- self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b)
-
- # TX exchanges don't show up in management so test for existence by name.
- s = b.connect_admin().session()
- try:
- for q in tx_queues:
- try:
- s.sender("%s;{node:{type:topic}}"%q)
- self.fail("Found tx exchange %s on %s "%(q,b))
- except NotFound: pass
- finally: s.connection.close()
+ queues=[]
+ def txq(): queues = b.agent().tx_queues(); return not queues
+ assert retry(txq), "%s: unexpected %s"%(b,queues)
+ subs=[]
+ def txs(): subs = self.tx_subscriptions(b); return not subs
+ assert retry(txs), "%s: unexpected %s"%(b,subs)
+ # TODO aconway 2013-10-15: TX exchanges don't show up in management.
def assert_simple_commit_outcome(self, b, tx_queues):
b.assert_browse_backup("a", [], msg=b)
@@ -1379,7 +1374,7 @@ class TransactionTests(HaBrokerTest):
<commit tx=1>
"""
self.assertEqual(expect, open_read(b.store_log), msg=b)
- self.assert_tx_cleanup(b, tx_queues)
+ self.assert_tx_clean(b)
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
@@ -1388,6 +1383,7 @@ class TransactionTests(HaBrokerTest):
tx_queues = cluster[0].agent().tx_queues()
tx.acknowledge()
tx.rollback()
+ tx.close() # For clean test.
for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
def assert_simple_rollback_outcome(self, b, tx_queues):
@@ -1399,7 +1395,7 @@ class TransactionTests(HaBrokerTest):
<enqueue a z>
"""
self.assertEqual(open_read(b.store_log), expect, msg=b)
- self.assert_tx_cleanup(b, tx_queues)
+ self.assert_tx_clean(b)
def test_tx_simple_failover(self):
cluster = HaCluster(self, 3, test_store=True)
@@ -1423,6 +1419,7 @@ class TransactionTests(HaBrokerTest):
tx.commit()
tx.sync()
tx_queues = cluster[0].agent().tx_queues()
+ tx.close()
self.assert_simple_commit_outcome(cluster[0], tx_queues)
# Test rollback
@@ -1433,6 +1430,7 @@ class TransactionTests(HaBrokerTest):
tx.acknowledge()
tx.rollback()
tx.sync()
+ tx.close()
self.assert_simple_rollback_outcome(cluster[0], tx_queues)
def assert_commit_raises(self, tx):
@@ -1448,7 +1446,7 @@ class TransactionTests(HaBrokerTest):
for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
self.assert_commit_raises(tx)
for b in cluster: b.assert_browse_backup("q", [])
- self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n")
+ self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
def test_tx_join_leave(self):
@@ -1465,14 +1463,15 @@ class TransactionTests(HaBrokerTest):
cluster[1].kill(final=False)
s.send("b")
self.assert_commit_raises(tx)
- self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
-
+ for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b)
# Joining
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
cluster.restart(1)
tx.commit()
+ tx.close()
+ for b in cluster: self.assert_tx_clean(b)
# The new member is not in the tx but receives the results normal replication.
for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
@@ -1493,6 +1492,15 @@ class TransactionTests(HaBrokerTest):
for t in threads: t.join()
for s in sessions: s.connection.close()
+ def test_broker_tx_tests(self):
+ cluster = HaCluster(self, 3)
+ print "Running python broker tx tests"
+ p = subprocess.Popen(["qpid-python-test",
+ "-m", "qpid_tests.broker_0_10",
+ "-b", "localhost:%s"%(cluster[0].port()),
+ "*.tx.*"])
+ assert not p.wait()
+ print "Finished python broker tx tests"
if __name__ == "__main__":
outdir = "ha_tests.tmp"