From a2313ce0fc34fbe4864445595e1db1955c4918a1 Mon Sep 17 00:00:00 2001 From: "Stephen D. Huston" Date: Fri, 16 Apr 2010 20:12:55 +0000 Subject: Fix for QPID-2420 to correctly handle restoring and commit/abort prepared transactions. The basic approach is documented in QPID-2420. This also makes improvements in the way changes are done to the tblMessageMap table which should perform much better, avoiding pulling the whole table into the broker just to add or edit or delete a single record. Also, some of the consistency checks and enforcements are moved into the database itself from the C++ code. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@935068 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/store/MessageStorePlugin.cpp | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) (limited to 'cpp/src/qpid/store/MessageStorePlugin.cpp') diff --git a/cpp/src/qpid/store/MessageStorePlugin.cpp b/cpp/src/qpid/store/MessageStorePlugin.cpp index 05b6ef4465..fdb947eef2 100644 --- a/cpp/src/qpid/store/MessageStorePlugin.cpp +++ b/cpp/src/qpid/store/MessageStorePlugin.cpp @@ -405,11 +405,14 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer) QueueMap queues; MessageMap messages; MessageQueueMap messageQueueMap; + std::vector xids; + PreparedTransactionMap dtxMap; provider->second->recoverConfigs(recoverer); provider->second->recoverExchanges(recoverer, exchanges); provider->second->recoverQueues(recoverer, queues); provider->second->recoverBindings(recoverer, exchanges, queues); + provider->second->recoverTransactions(recoverer, dtxMap); provider->second->recoverMessages(recoverer, messages, messageQueueMap); // Enqueue msgs where needed. for (MessageQueueMap::const_iterator i = messageQueueMap.begin(); @@ -426,22 +429,33 @@ MessageStorePlugin::recover(broker::RecoveryManager& recoverer) broker::RecoverableMessage::shared_ptr msg = iMsg->second; // Now for each queue referenced in the queue map, locate it // and re-enqueue the message. - for (std::vector::const_iterator j = i->second.begin(); + for (std::vector::const_iterator j = i->second.begin(); j != i->second.end(); ++j) { // Locate the queue corresponding to the current queue Id - QueueMap::const_iterator iQ = queues.find(*j); + QueueMap::const_iterator iQ = queues.find(j->queueId); if (iQ == queues.end()) { std::ostringstream oss; oss << "No matching queue trying to re-enqueue message " - << " on queue Id " << *j; + << " on queue Id " << j->queueId; THROW_STORE_EXCEPTION(oss.str()); } - iQ->second->recover(msg); + // Messages involved in prepared transactions have their status + // updated accordingly. First, though, restore a message that + // is expected to be on a queue, including non-transacted + // messages and those pending dequeue in a dtx. + if (j->tplStatus != QueueEntry::ADDING) + iQ->second->recover(msg); + switch(j->tplStatus) { + case QueueEntry::ADDING: + dtxMap[j->xid]->enqueue(iQ->second, msg); + break; + case QueueEntry::REMOVING: + dtxMap[j->xid]->dequeue(iQ->second, msg); + break; + } } } - - // recoverTransactions() and apply correctly while re-enqueuing } }} // namespace qpid::store -- cgit v1.2.1