diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp | 66 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/AsyncCommandCallback.h | 63 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 10 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 35 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 103 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 110 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/TxAccept.cpp | 27 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/TxAccept.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/TxBuffer.cpp | 63 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/TxBuffer.h | 21 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 30 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 93 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.h | 14 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/TransactionObserverTest.cpp | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/TxBufferTest.cpp | 11 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 46 |
18 files changed, 496 insertions, 216 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index c018bcba76..aa93b855f7 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1224,6 +1224,8 @@ set (qpidbroker_SOURCES ${ssl_SOURCES} qpid/amqp_0_10/Connection.h qpid/amqp_0_10/Connection.cpp + qpid/broker/AsyncCommandCallback.h + qpid/broker/AsyncCommandCallback.cpp qpid/broker/Broker.cpp qpid/broker/Credit.cpp qpid/broker/Exchange.cpp diff --git a/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp new file mode 100644 index 0000000000..2b1f3ad8e4 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AsyncCommandCallback.h" +#include "SessionOutputException.h" + + +namespace qpid { +namespace broker { + +using namespace framing; + +AsyncCommandCallback::AsyncCommandCallback(SessionState& ss, Command f) : + AsyncCommandContext(ss), command(f), channel(ss.getChannel()) +{} + +void AsyncCommandCallback::completed(bool sync) { + if (sync) + doCommand(); // In initiating thread, execute now. + else + completerContext->schedule( + boost::bind(&AsyncCommandCallback::complete, + boost::intrusive_ptr<AsyncCommandCallback>(this))); +} + +boost::intrusive_ptr<AsyncCompletion::Callback> AsyncCommandCallback::clone() { + return new AsyncCommandCallback(*this); +} + +void AsyncCommandCallback::complete() { + try{ + doCommand(); + } catch (const SessionException& e) { + throw SessionOutputException(e, channel); + } catch (const std::exception& e) { + throw SessionOutputException(InternalErrorException(e.what()), channel); + } +} + +void AsyncCommandCallback::doCommand() { + SessionState* session = completerContext->getSession(); + if (session && session->isAttached()) + session->completeCommand(id, false, requiresSync, command()); + else + throw InternalErrorException("Cannot complete command, no session"); +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h new file mode 100644 index 0000000000..dd8214683a --- /dev/null +++ b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h @@ -0,0 +1,63 @@ +#ifndef QPID_BROKER_ASYNCCOMMANDCALLBACK_H +#define QPID_BROKER_ASYNCCOMMANDCALLBACK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/SessionState.h" +#include "qpid/broker/AsyncCompletion.h" + +namespace qpid { +namespace broker { + +/** + * An AsyncCompletion::Callback that executes the final part of an + * async-completed command in the proper context: + * + * - Complete synchronously: Called in the initiating thread. + * - Complete asynchronously: Scheduled on the IO thread. + * + * Errors thrown by the command are returned to the correct session on the client + * even if we are executed via an IO callback. + */ +class AsyncCommandCallback : public SessionState::AsyncCommandContext { + public: + /** Command function returns a string containing the encoded result of the + * command, or empty for no result. It may raise an exception. + */ + typedef boost::function<std::string ()> Command; + + AsyncCommandCallback(SessionState& ss, Command f); + + void completed(bool sync); + + boost::intrusive_ptr<AsyncCompletion::Callback> clone(); + + private: + void complete(); + void doCommand(); + + Command command; + uint16_t channel; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_ASYNCCOMMANDCALLBACK_H*/ diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 19cb2f30c3..5a11db81bb 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -52,7 +52,7 @@ class RecoverableQueueImpl : public RecoverableQueue public: RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {} ~RecoverableQueueImpl() {}; - void setPersistenceId(uint64_t id); + void setPersistenceId(uint64_t id); uint64_t getPersistenceId() const; const std::string& getName() const; void setExternalQueueStore(ExternalQueueStore* inst); @@ -126,7 +126,7 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff return m; } -RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, +RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn) { boost::intrusive_ptr<DtxBuffer> buffer(new DtxBuffer()); @@ -212,7 +212,7 @@ const std::string& RecoverableQueueImpl::getName() const { return queue->getName(); } - + void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst) { queue->setExternalQueueStore(inst); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 469fe760a0..94e680c211 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/SessionState.h" +#include "qpid/broker/AsyncCommandCallback.h" #include "qpid/broker/Broker.h" #include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/DeliverableMessage.h" @@ -110,6 +111,7 @@ void SemanticState::closed() { cancel(i->second); } closeComplete = true; + if (txBuffer) txBuffer->rollback(); } } @@ -166,32 +168,47 @@ bool SemanticState::cancel(const string& tag) void SemanticState::startTx() { + accumulatedAck.clear(); txBuffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer()); session.getBroker().getBrokerObservers().startTx(txBuffer); session.startTx(); //just to update statistics } +namespace { +struct StartTxOnExit { + SemanticState& session; + StartTxOnExit(SemanticState& ss) : session(ss) {} + ~StartTxOnExit() { session.startTx(); } +}; +} // namespace + void SemanticState::commit(MessageStore* const store) { if (!txBuffer) throw - CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); - session.commitTx(); //just to update statistics + CommandInvalidException( + QPID_MSG("Session has not been selected for use with transactions")); + // Start a new TX regardless of outcome of this one. + StartTxOnExit e(*this); + session.getCurrentCommand().setCompleteSync(false); // Async completion + txBuffer->begin(); // Begin async completion. + session.commitTx(); //just to update statistics TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked))); txBuffer->enlist(txAck); - if (txBuffer->commitLocal(store)) { - accumulatedAck.clear(); - } else { - throw InternalErrorException(QPID_MSG("Commit failed")); - } + // In a HA cluster, tx.commit may complete asynchronously. + txBuffer->startCommit(store); + AsyncCommandCallback callback( + session, + boost::bind(&TxBuffer::endCommit, txBuffer, store)); + txBuffer->end(callback); } void SemanticState::rollback() { if (!txBuffer) throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); - session.rollbackTx(); //just to update statistics + session.rollbackTx(); // Just to update statistics txBuffer->rollback(); - accumulatedAck.clear(); + startTx(); // Start a new TX automatically. } void SemanticState::selectDtx() diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 3995eb85dc..8e128ae0df 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -191,26 +191,21 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } -void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { - currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks). - Invoker::Result invocation = invoke(adapter, *method); - if (currentCommandComplete) receiverCompleted(id); - - if (!invocation.wasHandled()) { +void SessionState::handleCommand(framing::AMQMethodBody* method) { + Invoker::Result result = invoke(adapter, *method); + if (!result.wasHandled()) throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); - } else if (invocation.hasResult()) { - getProxy().getExecution().result(id, invocation.getResult()); - } - - if (method->isSync() && currentCommandComplete) { - sendAcceptAndCompletion(); - } + if (currentCommand.isCompleteSync()) + completeCommand( + currentCommand.getId(), false/*needAccept*/, currentCommand.isSyncRequired(), + result.getResult()); } -void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) + +void SessionState::handleContent(AMQFrame& frame) { if (frame.getBof() && frame.getBos()) //start of frameset - msgBuilder.start(id); + msgBuilder.start(currentCommand.getId()); intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(msgBuilder.getMessage()); msgBuilder.handle(frame); if (frame.getEof() && frame.getEos()) {//end of frameset @@ -244,23 +239,27 @@ void SessionState::sendAcceptAndCompletion() sendCompletion(); } -/** Invoked when the given inbound message is finished being processed - * by all interested parties (eg. it is done being enqueued to all queues, - * its credit has been accounted for, etc). At this point, msg is considered - * by this receiver as 'completed' (as defined by AMQP 0_10) +/** Invoked when the given command is finished being processed by all interested + * parties (eg. it is done being enqueued to all queues, its credit has been + * accounted for, etc). At this point the command is considered by this + * receiver as 'completed' (as defined by AMQP 0_10) */ -void SessionState::completeRcvMsg(SequenceNumber id, - bool requiresAccept, - bool requiresSync) +void SessionState::completeCommand(SequenceNumber id, + bool requiresAccept, + bool requiresSync, + const std::string& result=std::string()) { bool callSendCompletion = false; receiverCompleted(id); if (requiresAccept) - // will cause msg's seq to appear in the next message.accept we send. + // will cause cmd's seq to appear in the next message.accept we send. accepted.add(id); + if (!result.empty()) + getProxy().getExecution().result(id, result); + // Are there any outstanding Execution.Sync commands pending the - // completion of this msg? If so, complete them. + // completion of this cmd? If so, complete them. while (!pendingExecutionSyncs.empty() && receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) { const SequenceNumber id = pendingExecutionSyncs.front(); @@ -277,14 +276,15 @@ void SessionState::completeRcvMsg(SequenceNumber id, } void SessionState::handleIn(AMQFrame& frame) { - SequenceNumber commandId = receiverGetCurrent(); //TODO: make command handling more uniform, regardless of whether //commands carry content. AMQMethodBody* m = frame.getMethod(); + currentCommand = CurrentCommand(receiverGetCurrent(), m && m->isSync()); + if (m == 0 || m->isContentBearing()) { - handleContent(frame, commandId); + handleContent(frame); } else if (frame.getBof() && frame.getEof()) { - handleCommand(frame.getMethod(), commandId); + handleCommand(frame.getMethod()); } else { throw InternalErrorException("Cannot handle multi-frame command segments yet"); } @@ -345,9 +345,9 @@ void SessionState::setTimeout(uint32_t) { } // (called via the invoker() in handleCommand() above) void SessionState::addPendingExecutionSync() { - SequenceNumber syncCommandId = receiverGetCurrent(); + SequenceNumber syncCommandId = currentCommand.getId(); if (receiverGetIncomplete().front() < syncCommandId) { - currentCommandComplete = false; + currentCommand.setCompleteSync(false); pendingExecutionSyncs.push(syncCommandId); asyncCommandCompleter->flushPendingMessages(); QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); @@ -389,25 +389,16 @@ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) */ session = 0; QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); - completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync); + completerContext->scheduleCommandCompletion(id, requiresAccept, requiresSync); } else { // this path runs directly from the ac->end() call in handleContent() above, // so *session is definately valid. if (session->isAttached()) { QPID_LOG(debug, ": receive completed for msg seq=" << id); - session->completeRcvMsg(id, requiresAccept, requiresSync); + session->completeCommand(id, requiresAccept, requiresSync); } } - completerContext = boost::intrusive_ptr<AsyncCommandCompleter>(); -} - - -/** Scheduled from an asynchronous command's completed callback to run on - * the IO thread. - */ -void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt) -{ - ctxt->completeCommands(); + completerContext.reset(); } @@ -450,22 +441,27 @@ void SessionState::AsyncCommandCompleter::flushPendingMessages() /** mark an ingress Message.Transfer command as completed. * This method must be thread safe - it may run on any thread. */ -void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd, - bool requiresAccept, - bool requiresSync) +void SessionState::AsyncCommandCompleter::scheduleCommandCompletion( + SequenceNumber cmd, + bool requiresAccept, + bool requiresSync) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); if (session && isAttached) { - MessageInfo msg(cmd, requiresAccept, requiresSync); - completedMsgs.push_back(msg); - if (completedMsgs.size() == 1) { - session->getConnection().requestIOProcessing(boost::bind(&schedule, - session->asyncCommandCompleter)); + CommandInfo info(cmd, requiresAccept, requiresSync); + completedCmds.push_back(info); + if (completedCmds.size() == 1) { + session->getConnection().requestIOProcessing( + boost::bind(&AsyncCommandCompleter::completeCommands, + session->asyncCommandCompleter)); } } } +void SessionState::AsyncCommandCompleter::schedule(boost::function<void()> f) { + if (session && isAttached) session->getConnection().requestIOProcessing(f); +} /** Cause the session to complete all completed commands. * Executes on the IO thread. @@ -476,12 +472,13 @@ void SessionState::AsyncCommandCompleter::completeCommands() // when session is destroyed, it clears the session pointer via cancel(). if (session && session->isAttached()) { - for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin(); - msg != completedMsgs.end(); ++msg) { - session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync); + for (std::vector<CommandInfo>::iterator cmd = completedCmds.begin(); + cmd != completedCmds.end(); ++cmd) { + session->completeCommand( + cmd->cmd, cmd->requiresAccept, cmd->requiresSync); } } - completedMsgs.clear(); + completedCmds.clear(); } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 500a211a6f..ad29c4427b 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -132,13 +132,12 @@ class SessionState : public qpid::SessionState, void commitTx(); void rollbackTx(); + /** Send result and completion for a given command to the client. */ + void completeCommand(SequenceNumber id, bool requiresAccept, bool requiresSync, + const std::string& result); private: - void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); - void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); - - // indicate that the given ingress msg has been completely received by the - // broker, and the msg's message.transfer command can be considered completed. - void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync); + void handleCommand(framing::AMQMethodBody* method); + void handleContent(framing::AMQFrame& frame); void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); @@ -160,7 +159,37 @@ class SessionState : public qpid::SessionState, // sequence numbers for pending received Execution.Sync commands std::queue<SequenceNumber> pendingExecutionSyncs; - bool currentCommandComplete; + + public: + + /** Information about the currently executing command. + * Can only be used in the IO thread during command execution. + */ + class CurrentCommand { + public: + CurrentCommand( + SequenceNumber id_=0, bool syncRequired_=false, bool completeSync_=true ) : + id(id_), syncRequired(syncRequired_), completeSync(completeSync_) + {} + + SequenceNumber getId() const { return id; } + + /**@return true if the sync flag was set for the command. */ + bool isSyncRequired() const { return syncRequired; } + + /**@return true if the command should be completed synchronously + * in the handling thread. + */ + bool isCompleteSync() const { return completeSync; } + void setCompleteSync(bool b) { completeSync = b; } + + private: + SequenceNumber id; ///< Command identifier. + bool syncRequired; ///< True if sync flag set for the command. + bool completeSync; ///< Will be completed by handCommand. + }; + + CurrentCommand& getCurrentCommand() { return currentCommand; } /** This class provides a context for completing asynchronous commands in a thread * safe manner. Asynchronous commands save their completion state in this class. @@ -175,15 +204,17 @@ class SessionState : public qpid::SessionState, bool isAttached; qpid::sys::Mutex completerLock; - // special-case message.transfer commands for optimization - struct MessageInfo { + struct CommandInfo { SequenceNumber cmd; // message.transfer command id bool requiresAccept; bool requiresSync; - MessageInfo(SequenceNumber c, bool a, bool s) - : cmd(c), requiresAccept(a), requiresSync(s) {} + + CommandInfo( + SequenceNumber c, bool a, bool s) + : cmd(c), requiresAccept(a), requiresSync(s) {} }; - std::vector<MessageInfo> completedMsgs; + + std::vector<CommandInfo> completedCmds; // If an ingress message does not require a Sync, we need to // hold a reference to it in case an Execution.Sync command is received and we // have to manually flush the message. @@ -192,9 +223,6 @@ class SessionState : public qpid::SessionState, /** complete all pending commands, runs in IO thread */ void completeCommands(); - /** for scheduling a run of "completeCommands()" on the IO thread */ - static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>); - public: AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {}; ~AsyncCommandCompleter() {}; @@ -203,15 +231,21 @@ class SessionState : public qpid::SessionState, void addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m); void deletePendingMessage(SequenceNumber id); void flushPendingMessages(); - /** schedule the processing of a completed ingress message.transfer command */ - void scheduleMsgCompletion(SequenceNumber cmd, - bool requiresAccept, - bool requiresSync); + /** schedule the processing of command completion. */ + void scheduleCommandCompletion(SequenceNumber cmd, + bool requiresAccept, + bool requiresSync); + void schedule(boost::function<void()>); void cancel(); // called by SessionState destructor. void attached(); // called by SessionState on attach() void detached(); // called by SessionState on detach() + + SessionState* getSession() const { return session; } }; - boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter; + + boost::intrusive_ptr<AsyncCommandCompleter> getAsyncCommandCompleter() { + return asyncCommandCompleter; + } /** Abstract class that represents a single asynchronous command that is * pending completion. @@ -219,15 +253,29 @@ class SessionState : public qpid::SessionState, class AsyncCommandContext : public AsyncCompletion::Callback { public: - AsyncCommandContext( SessionState *ss, SequenceNumber _id ) - : id(_id), completerContext(ss->asyncCommandCompleter) {} + AsyncCommandContext(SessionState& ss ) + : id(ss.getCurrentCommand().getId()), + requiresSync(ss.getCurrentCommand().isSyncRequired()), + completerContext(ss.getAsyncCommandCompleter()) + {} + + AsyncCommandContext(const AsyncCommandContext& x) : + id(x.id), requiresSync(x.requiresSync), completerContext(x.completerContext) + {} + virtual ~AsyncCommandContext() {} protected: SequenceNumber id; + bool requiresSync; boost::intrusive_ptr<AsyncCommandCompleter> completerContext; }; + + private: + boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter; + CurrentCommand currentCommand; + /** incomplete Message.transfer commands - inbound to broker from client */ class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext @@ -235,21 +283,17 @@ class SessionState : public qpid::SessionState, public: IncompleteIngressMsgXfer( SessionState *ss, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m) - : AsyncCommandContext(ss, m->getCommandId()), + : AsyncCommandContext(*ss), session(ss), msg(m), requiresAccept(m->requiresAccept()), requiresSync(m->getFrames().getMethod()->isSync()), - pending(false) {} - IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x ) - : AsyncCommandContext(x.session, x.msg->getCommandId()), - session(x.session), - msg(x.msg), - requiresAccept(x.requiresAccept), - requiresSync(x.requiresSync), - pending(x.pending) {} + pending(false) + { + assert(id == m->getCommandId()); + } - virtual ~IncompleteIngressMsgXfer() {}; + virtual ~IncompleteIngressMsgXfer() {} virtual void completed(bool); virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone(); @@ -262,7 +306,7 @@ class SessionState : public qpid::SessionState, bool pending; // true if msg saved on pending list... }; - friend class SessionManager; + friend class SessionManager; }; diff --git a/qpid/cpp/src/qpid/broker/TxAccept.cpp b/qpid/cpp/src/qpid/broker/TxAccept.cpp index ee30dff957..496f8a2a42 100644 --- a/qpid/cpp/src/qpid/broker/TxAccept.cpp +++ b/qpid/cpp/src/qpid/broker/TxAccept.cpp @@ -33,14 +33,20 @@ using qpid::framing::SequenceNumber; TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) : acked(_acked), unacked(_unacked) -{ - for(SequenceSet::RangeIterator i = acked.rangesBegin(); i != acked.rangesEnd(); ++i) - ranges.push_back(DeliveryRecord::findRange(unacked, i->first(), i->last())); -} +{} void TxAccept::each(boost::function<void(DeliveryRecord&)> f) { - for(AckRanges::iterator i = ranges.begin(); i != ranges.end(); ++i) - for_each(i->start, i->end, f); + DeliveryRecords::iterator dr = unacked.begin(); + SequenceSet::iterator seq = acked.begin(); + while(dr != unacked.end() && seq != acked.end()) { + if (dr->getId() == *seq) { + f(*dr); + ++dr; + ++seq; + } + else if (dr->getId() < *seq) ++dr; + else if (dr->getId() > *seq) ++seq; + } } bool TxAccept::prepare(TransactionContext* ctxt) throw() @@ -63,12 +69,11 @@ void TxAccept::commit() throw() each(bind(&DeliveryRecord::committed, _1)); each(bind(&DeliveryRecord::setEnded, _1)); //now remove if isRedundant(): - if (!ranges.empty()) { - DeliveryRecords::iterator begin = ranges.front().start; - DeliveryRecords::iterator end = ranges.back().end; + if (!acked.empty()) { + AckRange r = DeliveryRecord::findRange(unacked, acked.front(), acked.back()); DeliveryRecords::iterator removed = - remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant)); - unacked.erase(removed, end); + remove_if(r.start, r.end, mem_fun_ref(&DeliveryRecord::isRedundant)); + unacked.erase(removed, r.end); } } catch (const std::exception& e) { QPID_LOG(error, "Failed to commit: " << e.what()); diff --git a/qpid/cpp/src/qpid/broker/TxAccept.h b/qpid/cpp/src/qpid/broker/TxAccept.h index b4d55155a8..97e82ffa3f 100644 --- a/qpid/cpp/src/qpid/broker/TxAccept.h +++ b/qpid/cpp/src/qpid/broker/TxAccept.h @@ -37,14 +37,12 @@ namespace broker { * a transactional channel. */ class TxAccept : public TxOp { - typedef std::vector<AckRange> AckRanges; typedef boost::shared_ptr<TransactionObserver> ObserverPtr; void each(boost::function<void(DeliveryRecord&)>); framing::SequenceSet acked; DeliveryRecords& unacked; - AckRanges ranges; public: /** diff --git a/qpid/cpp/src/qpid/broker/TxBuffer.cpp b/qpid/cpp/src/qpid/broker/TxBuffer.cpp index a8df4fb214..6dc2f5c2f4 100644 --- a/qpid/cpp/src/qpid/broker/TxBuffer.cpp +++ b/qpid/cpp/src/qpid/broker/TxBuffer.cpp @@ -21,22 +21,28 @@ #include "qpid/broker/TxBuffer.h" #include "qpid/broker/TransactionObserver.h" #include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" #include <boost/mem_fn.hpp> #include <boost/bind.hpp> + +namespace qpid { +namespace broker{ + using boost::mem_fn; -using namespace qpid::broker; +using framing::InternalErrorException; TxBuffer::TxBuffer() : observer(new NullTransactionObserver) {} bool TxBuffer::prepare(TransactionContext* const ctxt) { + // The observer may call startCompleter to delay completion. if (!observer->prepare()) return false; for(op_iterator i = ops.begin(); i != ops.end(); i++){ - if(!(*i)->prepare(ctxt)){ - return false; - } + if(!(*i)->prepare(ctxt)) return false; } + // At this point prepare has succeeded locally but if completion is delayed, + // then completing threads may call setError to indicate an error. return true; } @@ -60,24 +66,37 @@ void TxBuffer::enlist(TxOp::shared_ptr op) ops.push_back(op); } -bool TxBuffer::commitLocal(TransactionalStore* const store) +void TxBuffer::startCommit(TransactionalStore* const store) { - if (!store) return false; - try { - std::auto_ptr<TransactionContext> ctxt = store->begin(); - if (prepare(ctxt.get())) { - store->commit(*ctxt); - commit(); - return true; - } else { - store->abort(*ctxt); - rollback(); - return false; - } - } catch (std::exception& e) { - QPID_LOG(error, "Commit failed with exception: " << e.what()); - } catch (...) { - QPID_LOG(error, "Commit failed with unknown exception"); + if (!store) throw Exception("Can't commit transaction, no store."); + txContext.reset(store->begin().release()); + if (!prepare(txContext.get())) + setError("Transaction prepare failed."); +} + +// Called when async completion is complete. +std::string TxBuffer::endCommit(TransactionalStore* const store) { + std::string e; + { + sys::Mutex::ScopedLock l(errorLock); + e = error; + } + if (!e.empty()) { + store->abort(*txContext); + rollback(); + throw InternalErrorException(e); } - return false; + else { + store->commit(*txContext); + commit(); + } + return std::string(); // There is no result from tx.commit +} + +void TxBuffer::setError(const std::string& e) { + QPID_LOG(error, "Asynchronous transaction error: " << e); + sys::Mutex::ScopedLock l(errorLock); + error = e; } + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/TxBuffer.h b/qpid/cpp/src/qpid/broker/TxBuffer.h index f4cdcb3ae2..3a6db37d1d 100644 --- a/qpid/cpp/src/qpid/broker/TxBuffer.h +++ b/qpid/cpp/src/qpid/broker/TxBuffer.h @@ -26,6 +26,7 @@ #include "qpid/broker/TransactionalStore.h" #include "qpid/broker/TxOp.h" #include "qpid/broker/AsyncCompletion.h" +#include "qpid/sys/Mutex.h" #include <algorithm> #include <functional> #include <vector> @@ -68,10 +69,13 @@ class TransactionObserver; * asynchronously if the broker is part of a HA cluster. */ class TxBuffer : public AsyncCompletion { - private: + private: typedef std::vector<TxOp::shared_ptr>::iterator op_iterator; std::vector<TxOp::shared_ptr> ops; boost::shared_ptr<TransactionObserver> observer; + std::auto_ptr<TransactionContext> txContext; + std::string error; + sys::Mutex errorLock; public: QPID_BROKER_EXTERN TxBuffer(); @@ -114,11 +118,14 @@ class TxBuffer : public AsyncCompletion { QPID_BROKER_EXTERN void rollback(); /** - * Helper method for managing the process of server local - * commit + * Start a local commit - may complete asynchronously. */ - QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store); + QPID_BROKER_EXTERN void startCommit(TransactionalStore* const store); + /** End a commit, called via async completion. + *@return encoded result, not used here. + */ + QPID_BROKER_EXTERN std::string endCommit(TransactionalStore* const store); QPID_BROKER_EXTERN void setObserver(boost::shared_ptr<TransactionObserver> o) { observer = o; @@ -127,6 +134,12 @@ class TxBuffer : public AsyncCompletion { QPID_BROKER_EXTERN boost::shared_ptr<TransactionObserver> getObserver() const { return observer; } + + /** Set an error to be raised from endCommit when the commit completes. + * Called from completer threads if we are doing async completion. + * This is the only TxBuffer function called outside the IO thread. + */ + void setError(const std::string& message); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 3a7ab3b0fc..0c1858ceb1 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -232,15 +232,16 @@ void Primary::skip( if (i != replicas.end()) i->second->addSkip(ids); } +// Called from ReplicatingSubscription::cancel void Primary::removeReplica(const ReplicatingSubscription& rs) { - sys::Mutex::ScopedLock l(lock); - replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())); - - TxMap::const_iterator i = txMap.find(rs.getQueue()->getName()); - if (i != txMap.end()) { - boost::shared_ptr<PrimaryTxObserver> tx = i->second.lock(); - if (tx) tx->cancel(rs); + boost::shared_ptr<PrimaryTxObserver> tx; + { + sys::Mutex::ScopedLock l(lock); + replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())); + TxMap::const_iterator i = txMap.find(rs.getQueue()->getName()); + if (i != txMap.end()) tx = i->second.lock(); } + if (tx) tx->cancel(rs); // Outside of lock. } // NOTE: Called with queue registry lock held. @@ -401,19 +402,22 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) backup->startCatchup(); } -shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() { - shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker)); +shared_ptr<PrimaryTxObserver> Primary::makeTxObserver( + const boost::intrusive_ptr<broker::TxBuffer>& txBuffer) +{ + shared_ptr<PrimaryTxObserver> observer( + new PrimaryTxObserver(*this, haBroker, txBuffer)); observer->initialize(); txMap[observer->getTxQueue()->getName()] = observer; return observer; } -void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& tx) { - tx->setObserver(makeTxObserver()); +void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& txBuffer) { + txBuffer->setObserver(makeTxObserver(txBuffer)); } -void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& dtx) { - dtx->setObserver(makeTxObserver()); +void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& ) { + QPID_LOG(notice, "DTX transactions in a HA cluster are not yet atomic"); } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 97b7f956b7..7f98f06fec 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -63,6 +63,10 @@ class PrimaryTxObserver; * - sets queue guards on new queues for each backup. * * THREAD SAFE: called concurrently in arbitrary connection threads. + * + * Locking rules: BrokerObserver create/destroy functions are called with + * the QueueRegistry lock held. Functions holding Primary::lock *must not* + * directly or indirectly call on the queue registry. */ class Primary : public Role { @@ -126,7 +130,8 @@ class Primary : public Role void checkReady(RemoteBackupPtr); void setCatchupQueues(const RemoteBackupPtr&, bool createGuards); void deduplicate(); - boost::shared_ptr<PrimaryTxObserver> makeTxObserver(); + boost::shared_ptr<PrimaryTxObserver> makeTxObserver( + const boost::intrusive_ptr<broker::TxBuffer>&); mutable sys::Mutex lock; HaBroker& haBroker; diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 41494694de..ca833cf085 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -42,6 +42,7 @@ namespace ha { using namespace std; using namespace qpid::broker; using namespace qpid::framing; +using types::Uuid; // Exchange to receive prepare OK events. class PrimaryTxObserver::Exchange : public broker::Exchange { @@ -78,12 +79,15 @@ class PrimaryTxObserver::Exchange : public broker::Exchange { const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer"); -PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : - haBroker(hb), broker(hb.getBroker()), +PrimaryTxObserver::PrimaryTxObserver( + Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx +) : + primary(p), haBroker(hb), broker(hb.getBroker()), replicationTest(hb.getSettings().replicateDefault.get()), + txBuffer(tx), id(true), exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()), - failed(false), ended(false), complete(false) + complete(false) { logPrefix = "Primary transaction "+shortStr(id)+": "; @@ -106,8 +110,9 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : txQueue->deliver(TxMembersEvent(members).message()); } -PrimaryTxObserver::~PrimaryTxObserver() {} - +PrimaryTxObserver::~PrimaryTxObserver() { + QPID_LOG(debug, logPrefix << "Ended"); +} void PrimaryTxObserver::initialize() { boost::shared_ptr<Exchange> ex(new Exchange(shared_from_this())); @@ -141,37 +146,51 @@ void PrimaryTxObserver::dequeue( } } -void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) { - boost::shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole())); - assert(primary); - // Tell replicating subscriptions to skip IDs in the transaction. - for (UuidSet::iterator b = members.begin(); b != members.end(); ++b) - for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) - primary->skip(*b, q->first, q->second); -} +namespace { +struct Skip { + Uuid backup; + boost::shared_ptr<broker::Queue> queue; + ReplicationIdSet ids; + + Skip(const Uuid& backup_, + const boost::shared_ptr<broker::Queue>& queue_, + const ReplicationIdSet& ids_) : + backup(backup_), queue(queue_), ids(ids_) {} + + void skip(Primary& p) const { p.skip(backup, queue, ids); } +}; +} // namespace bool PrimaryTxObserver::prepare() { - sys::Mutex::ScopedLock l(lock); - QPID_LOG(debug, logPrefix << "Prepare"); - deduplicate(l); + QPID_LOG(debug, logPrefix << "Prepare " << members); + vector<Skip> skips; + { + sys::Mutex::ScopedLock l(lock); + for (size_t i = 0; i < members.size(); ++i) txBuffer->startCompleter(); + + // Tell replicating subscriptions to skip IDs in the transaction. + for (UuidSet::iterator b = members.begin(); b != members.end(); ++b) + for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) + skips.push_back(Skip(*b, q->first, q->second)); + } + // Outside lock + for_each(skips.begin(), skips.end(), + boost::bind(&Skip::skip, _1, boost::ref(primary))); txQueue->deliver(TxPrepareEvent().message()); - // TODO aconway 2013-09-04: Blocks the current thread till backups respond. - // Need a non-blocking approach (e.g. async completion or borrowing a thread) - while (!unprepared.empty() && !failed) lock.wait(); - return !failed; + return true; } void PrimaryTxObserver::commit() { - sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Commit"); + sys::Mutex::ScopedLock l(lock); txQueue->deliver(TxCommitEvent().message()); complete = true; end(l); } void PrimaryTxObserver::rollback() { - sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Rollback"); + sys::Mutex::ScopedLock l(lock); txQueue->deliver(TxRollbackEvent().message()); complete = true; end(l); @@ -180,8 +199,8 @@ void PrimaryTxObserver::rollback() { void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) { // Don't destroy the tx-queue until the transaction is complete and there // are no connected subscriptions. - if (!ended && complete && unfinished.empty()) { - ended = true; + if (txBuffer && complete && unfinished.empty()) { + txBuffer.reset(); // Break pointer cycle. try { haBroker.getBroker().deleteQueue(txQueue->getName(), haBroker.getUserId(), string()); } catch (const std::exception& e) { @@ -198,29 +217,33 @@ void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) { void PrimaryTxObserver::txPrepareOkEvent(const string& data) { sys::Mutex::ScopedLock l(lock); types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker; - QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup); - unprepared.erase(backup); - lock.notify(); + if (unprepared.erase(backup)) { + QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup); + txBuffer->finishCompleter(); + } } void PrimaryTxObserver::txPrepareFailEvent(const string& data) { sys::Mutex::ScopedLock l(lock); types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker; - QPID_LOG(error, logPrefix << "Backup prepare failed: " << backup); - unprepared.erase(backup); - failed = true; - lock.notify(); + if (unprepared.erase(backup)) { + QPID_LOG(error, logPrefix << "Prepare failed on backup: " << backup); + txBuffer->setError( + QPID_MSG(logPrefix << "Prepare failed on backup: " << backup)); + txBuffer->finishCompleter(); + } } void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) { sys::Mutex::ScopedLock l(lock); types::Uuid backup = rs.getBrokerInfo().getSystemId(); - if (unprepared.find(backup) != unprepared.end()) { - complete = failed = true; // Canceled before prepared. - unprepared.erase(backup); // Consider it prepared-fail + if (unprepared.erase(backup) ){ + complete = true; // Cancelled before prepared. + txBuffer->setError( + QPID_MSG(logPrefix << "Backup disconnected: " << rs.getBrokerInfo())); + txBuffer->finishCompleter(); } unfinished.erase(backup); - lock.notify(); end(l); } diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h index fb9db25e85..cd6c88ad41 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -24,12 +24,14 @@ #include "types.h" #include "ReplicationTest.h" +#include "qpid/broker/SessionState.h" #include "qpid/broker/TransactionObserver.h" #include "qpid/log/Statement.h" #include "qpid/types/Uuid.h" #include "qpid/sys/unordered_map.h" #include "qpid/sys/Monitor.h" #include <boost/enable_shared_from_this.hpp> +#include <boost/intrusive_ptr.hpp> namespace qpid { @@ -37,11 +39,13 @@ namespace broker { class Broker; class Message; class Consumer; +class AsyncCompletion; } namespace ha { class HaBroker; class ReplicatingSubscription; +class Primary; /** * Observe events in the lifecycle of a transaction. @@ -62,7 +66,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, public boost::enable_shared_from_this<PrimaryTxObserver> { public: - PrimaryTxObserver(HaBroker&); + PrimaryTxObserver(Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&); ~PrimaryTxObserver(); /** Call immediately after constructor, uses shared_from_this. */ @@ -87,7 +91,6 @@ class PrimaryTxObserver : public broker::TransactionObserver, QueuePtr, ReplicationIdSet, Hasher<QueuePtr> > QueueIdsMap; void membership(const BrokerInfo::Map&); - void deduplicate(sys::Mutex::ScopedLock&); void end(sys::Mutex::ScopedLock&); void txPrepareOkEvent(const std::string& data); void txPrepareFailEvent(const std::string& data); @@ -95,16 +98,19 @@ class PrimaryTxObserver : public broker::TransactionObserver, sys::Monitor lock; std::string logPrefix; + Primary& primary; HaBroker& haBroker; broker::Broker& broker; ReplicationTest replicationTest; + // NOTE: There is an intrusive_ptr cycle between PrimaryTxObserver + // and TxBuffer. The cycle is broken in PrimaryTxObserver::end() + boost::intrusive_ptr<broker::TxBuffer> txBuffer; types::Uuid id; std::string exchangeName; QueuePtr txQueue; QueueIdsMap enqueues; - bool failed, ended, complete; - + bool complete; UuidSet members; // All members of transaction. UuidSet unprepared; // Members that have not yet responded to prepare. UuidSet unfinished; // Members that have not yet disconnected. diff --git a/qpid/cpp/src/tests/TransactionObserverTest.cpp b/qpid/cpp/src/tests/TransactionObserverTest.cpp index 2a7d94b1ae..80ef494c21 100644 --- a/qpid/cpp/src/tests/TransactionObserverTest.cpp +++ b/qpid/cpp/src/tests/TransactionObserverTest.cpp @@ -79,8 +79,10 @@ struct MockBrokerObserver : public BrokerObserver { MockBrokerObserver(bool prep_=true) : prep(prep_) {} void startTx(const intrusive_ptr<TxBuffer>& buffer) { - tx.reset(new MockTransactionObserver(prep)); - buffer->setObserver(tx); + if (!tx) { // Don't overwrite first tx with automatically started second tx. + tx.reset(new MockTransactionObserver(prep)); + buffer->setObserver(tx); + } } }; @@ -94,7 +96,7 @@ Session simpleTxTransaction(MessagingFixture& fix) { return txSession; } -QPID_AUTO_TEST_CASE(tesTxtCommit) { +QPID_AUTO_TEST_CASE(testTxCommit) { MessagingFixture fix; shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver); fix.broker->getBrokerObservers().add(brokerObserver); @@ -114,6 +116,7 @@ QPID_AUTO_TEST_CASE(testTxFail) { fix.broker->getBrokerObservers().add(brokerObserver); Session txSession = simpleTxTransaction(fix); try { + ScopedSuppressLogging sl; // Suppress messages for expected error. txSession.commit(); BOOST_FAIL("Expected exception"); } catch(...) {} diff --git a/qpid/cpp/src/tests/TxBufferTest.cpp b/qpid/cpp/src/tests/TxBufferTest.cpp index 4807026ab7..3f052d213e 100644 --- a/qpid/cpp/src/tests/TxBufferTest.cpp +++ b/qpid/cpp/src/tests/TxBufferTest.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/TxBuffer.h" #include "unit_test.h" +#include "test_tools.h" #include <iostream> #include <vector> #include "TxMocks.h" @@ -50,7 +51,8 @@ QPID_AUTO_TEST_CASE(testCommitLocal) buffer.enlist(static_pointer_cast<TxOp>(opB));//opB enlisted twice buffer.enlist(static_pointer_cast<TxOp>(opC)); - BOOST_CHECK(buffer.commitLocal(&store)); + buffer.startCommit(&store); + buffer.endCommit(&store); store.check(); BOOST_CHECK(store.isCommitted()); opA->check(); @@ -75,7 +77,12 @@ QPID_AUTO_TEST_CASE(testFailOnCommitLocal) buffer.enlist(static_pointer_cast<TxOp>(opB)); buffer.enlist(static_pointer_cast<TxOp>(opC)); - BOOST_CHECK(!buffer.commitLocal(&store)); + try { + ScopedSuppressLogging sl; // Suppress messages for expected error. + buffer.startCommit(&store); + buffer.endCommit(&store); + BOOST_FAIL("Expected exception"); + } catch (...) {} BOOST_CHECK(store.isAborted()); store.check(); opA->check(); diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index ad546afc62..79024d48e3 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1346,24 +1346,19 @@ class TransactionTests(HaBrokerTest): tx.commit() tx.sync() + tx.close() for b in cluster: self.assert_simple_commit_outcome(b, tx_queues) - def assert_tx_cleanup(self, b, tx_queues): + def assert_tx_clean(self, b): """Verify that there are no transaction artifacts (exchanges, queues, subscriptions) on b.""" - - self.assertEqual(0, len(b.agent().tx_queues()), msg=b) - self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b) - - # TX exchanges don't show up in management so test for existence by name. - s = b.connect_admin().session() - try: - for q in tx_queues: - try: - s.sender("%s;{node:{type:topic}}"%q) - self.fail("Found tx exchange %s on %s "%(q,b)) - except NotFound: pass - finally: s.connection.close() + queues=[] + def txq(): queues = b.agent().tx_queues(); return not queues + assert retry(txq), "%s: unexpected %s"%(b,queues) + subs=[] + def txs(): subs = self.tx_subscriptions(b); return not subs + assert retry(txs), "%s: unexpected %s"%(b,subs) + # TODO aconway 2013-10-15: TX exchanges don't show up in management. def assert_simple_commit_outcome(self, b, tx_queues): b.assert_browse_backup("a", [], msg=b) @@ -1379,7 +1374,7 @@ class TransactionTests(HaBrokerTest): <commit tx=1> """ self.assertEqual(expect, open_read(b.store_log), msg=b) - self.assert_tx_cleanup(b, tx_queues) + self.assert_tx_clean(b) def test_tx_simple_rollback(self): cluster = HaCluster(self, 2, test_store=True) @@ -1388,6 +1383,7 @@ class TransactionTests(HaBrokerTest): tx_queues = cluster[0].agent().tx_queues() tx.acknowledge() tx.rollback() + tx.close() # For clean test. for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) def assert_simple_rollback_outcome(self, b, tx_queues): @@ -1399,7 +1395,7 @@ class TransactionTests(HaBrokerTest): <enqueue a z> """ self.assertEqual(open_read(b.store_log), expect, msg=b) - self.assert_tx_cleanup(b, tx_queues) + self.assert_tx_clean(b) def test_tx_simple_failover(self): cluster = HaCluster(self, 3, test_store=True) @@ -1423,6 +1419,7 @@ class TransactionTests(HaBrokerTest): tx.commit() tx.sync() tx_queues = cluster[0].agent().tx_queues() + tx.close() self.assert_simple_commit_outcome(cluster[0], tx_queues) # Test rollback @@ -1433,6 +1430,7 @@ class TransactionTests(HaBrokerTest): tx.acknowledge() tx.rollback() tx.sync() + tx.close() self.assert_simple_rollback_outcome(cluster[0], tx_queues) def assert_commit_raises(self, tx): @@ -1448,7 +1446,7 @@ class TransactionTests(HaBrokerTest): for m in ["foo","bang","bar"]: s.send(Message(m, durable=True)) self.assert_commit_raises(tx) for b in cluster: b.assert_browse_backup("q", []) - self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n") + self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n") self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n") def test_tx_join_leave(self): @@ -1465,14 +1463,15 @@ class TransactionTests(HaBrokerTest): cluster[1].kill(final=False) s.send("b") self.assert_commit_raises(tx) - self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]]) - + for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b) # Joining tx = cluster[0].connect().session(transactional=True) s = tx.sender("q;{create:always}") s.send("foo") cluster.restart(1) tx.commit() + tx.close() + for b in cluster: self.assert_tx_clean(b) # The new member is not in the tx but receives the results normal replication. for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b) @@ -1493,6 +1492,15 @@ class TransactionTests(HaBrokerTest): for t in threads: t.join() for s in sessions: s.connection.close() + def test_broker_tx_tests(self): + cluster = HaCluster(self, 3) + print "Running python broker tx tests" + p = subprocess.Popen(["qpid-python-test", + "-m", "qpid_tests.broker_0_10", + "-b", "localhost:%s"%(cluster[0].port()), + "*.tx.*"]) + assert not p.wait() + print "Finished python broker tx tests" if __name__ == "__main__": outdir = "ha_tests.tmp" |
