diff options
| author | Stephen D. Huston <shuston@apache.org> | 2010-04-16 20:12:55 +0000 |
|---|---|---|
| committer | Stephen D. Huston <shuston@apache.org> | 2010-04-16 20:12:55 +0000 |
| commit | a2313ce0fc34fbe4864445595e1db1955c4918a1 (patch) | |
| tree | 4757029ffe7a3970cf38d32f8775a14bef7ea259 /cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp | |
| parent | eb56a638b2aea7e56c55e49b267cfe2f673afe51 (diff) | |
| download | qpid-python-a2313ce0fc34fbe4864445595e1db1955c4918a1.tar.gz | |
Fix for QPID-2420 to correctly handle restoring and commit/abort prepared transactions.
The basic approach is documented in QPID-2420. This also makes improvements in the way changes are done to the tblMessageMap table which should perform much better, avoiding pulling the whole table into the broker just to add or edit or delete a single record. Also, some of the consistency checks and enforcements are moved into the database itself from the C++ code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@935068 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp')
| -rw-r--r-- | cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp | 312 |
1 files changed, 250 insertions, 62 deletions
diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp index a26df59df7..323bc94c5c 100644 --- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp +++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -32,6 +32,7 @@ #include "BindingRecordset.h" #include "MessageMapRecordset.h" #include "MessageRecordset.h" +#include "TplRecordset.h" #include "DatabaseConnection.h" #include "Exception.h" #include "State.h" @@ -51,6 +52,7 @@ const std::string TblExchange("tblExchange"); const std::string TblMessage("tblMessage"); const std::string TblMessageMap("tblMessageMap"); const std::string TblQueue("tblQueue"); +const std::string TblTpl("tblTPL"); } namespace qpid { @@ -256,9 +258,7 @@ public: virtual void prepare(qpid::broker::TPCTransactionContext& txn); virtual void commit(qpid::broker::TransactionContext& txn); virtual void abort(qpid::broker::TransactionContext& txn); - - // @TODO This maybe should not be in TransactionalStore - virtual void collectPreparedXids(std::set<std::string>& xids) {} + virtual void collectPreparedXids(std::set<std::string>& xids); //@} virtual void recoverConfigs(qpid::broker::RecoveryManager& recoverer); @@ -272,6 +272,8 @@ public: virtual void recoverMessages(qpid::broker::RecoveryManager& recoverer, MessageMap& messageMap, MessageQueueMap& messageQueueMap); + virtual void recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap); private: struct ProviderOptions : public qpid::Options @@ -310,7 +312,7 @@ private: State *initState(); DatabaseConnection *initConnection(void); - void createDb(_ConnectionPtr conn, const std::string &name); + void createDb(DatabaseConnection *db, const std::string &name); }; static MSSqlProvider static_instance_registers_plugin; @@ -356,7 +358,7 @@ MSSqlProvider::earlyInitialize(Plugin::Target &target) // Database doesn't exist; create it QPID_LOG(notice, "MSSQL: Creating database " + options.catalogName); - createDb(conn, options.catalogName); + createDb(db.get(), options.catalogName); } else { QPID_LOG(notice, @@ -407,8 +409,9 @@ MSSqlProvider::create(PersistableQueue& queue, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error creating queue " + queue.getName(), e); + throw ADOException("Error creating queue " + queue.getName(), e, errs); } } @@ -418,14 +421,6 @@ MSSqlProvider::create(PersistableQueue& queue, void MSSqlProvider::destroy(PersistableQueue& queue) { - // MessageDeleter class for use with for_each, below. - class MessageDeleter { - BlobRecordset& msgs; - public: - explicit MessageDeleter(BlobRecordset& _msgs) : msgs(_msgs) {} - void operator()(uint64_t msgId) { msgs.remove(msgId); } - }; - DatabaseConnection *db = initConnection(); BlobRecordset rsQueues; BindingRecordset rsBindings; @@ -441,19 +436,18 @@ MSSqlProvider::destroy(PersistableQueue& queue) // under the references in the bindings table. Then remove the // message->queue entries for the queue, also because the queue can't // be deleted while there are references to it. If there are messages - // orphaned by removing the queue references, those messages can - // also be deleted. Lastly, the queue record can be removed. + // orphaned by removing the queue references, they're deleted by + // a trigger on the tblMessageMap table. Lastly, the queue record + // can be removed. rsBindings.removeForQueue(queue.getPersistenceId()); - std::vector<uint64_t> orphans; - rsMessageMaps.removeForQueue(queue.getPersistenceId(), orphans); - std::for_each(orphans.begin(), orphans.end(), - MessageDeleter(rsMessages)); + rsMessageMaps.removeForQueue(queue.getPersistenceId()); rsQueues.remove(queue); db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting queue " + queue.getName(), e); + throw ADOException("Error deleting queue " + queue.getName(), e, errs); } } @@ -473,8 +467,11 @@ MSSqlProvider::create(const PersistableExchange& exchange, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error creating exchange " + exchange.getName(), e); + throw ADOException("Error creating exchange " + exchange.getName(), + e, + errs); } } @@ -498,8 +495,11 @@ MSSqlProvider::destroy(const PersistableExchange& exchange) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting exchange " + exchange.getName(), e); + throw ADOException("Error deleting exchange " + exchange.getName(), + e, + errs); } } @@ -524,9 +524,12 @@ MSSqlProvider::bind(const PersistableExchange& exchange, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error binding exchange " + exchange.getName() + - " to queue " + queue.getName(), e); + " to queue " + queue.getName(), + e, + errs); } } @@ -551,9 +554,12 @@ MSSqlProvider::unbind(const PersistableExchange& exchange, db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); throw ADOException("Error unbinding exchange " + exchange.getName() + - " from queue " + queue.getName(), e); + " from queue " + queue.getName(), + e, + errs); } } @@ -572,8 +578,9 @@ MSSqlProvider::create(const PersistableConfig& config) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error creating config " + config.getName(), e); + throw ADOException("Error creating config " + config.getName(), e, errs); } } @@ -592,8 +599,9 @@ MSSqlProvider::destroy(const PersistableConfig& config) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting config " + config.getName(), e); + throw ADOException("Error deleting config " + config.getName(), e, errs); } } @@ -618,8 +626,9 @@ MSSqlProvider::stage(const boost::intrusive_ptr<PersistableMessage>& msg) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error staging message", e); + throw ADOException("Error staging message", e, errs); } } @@ -641,8 +650,9 @@ MSSqlProvider::destroy(PersistableMessage& msg) db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error deleting message", e); + throw ADOException("Error deleting message", e, errs); } } @@ -662,8 +672,9 @@ MSSqlProvider::appendContent(const boost::intrusive_ptr<const PersistableMessage db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); db->rollbackTransaction(); - throw ADOException("Error appending to message", e); + throw ADOException("Error appending to message", e, errs); } } @@ -691,7 +702,8 @@ MSSqlProvider::loadContent(const qpid::broker::PersistableQueue& /*queue*/, rsMessages.loadContent(msg, data, offset, length); } catch(_com_error &e) { - throw ADOException("Error loading message content", e); + std::string errs = db->getErrors(); + throw ADOException("Error loading message content", e, errs); } } @@ -714,6 +726,7 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, // this is not in the context of a transaction, then just use the thread's // DatabaseConnection with a ADO transaction. DatabaseConnection *db = 0; + std::string xid; AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); if (atxn == 0) { db = initConnection(); @@ -721,12 +734,16 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, } else { (void)initState(); // Ensure this thread is initialized + // It's a transactional enqueue; if it's TPC, grab the xid. + AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt); + if (tpcTxn) + xid = tpcTxn->getXid(); db = atxn->dbConn(); try { - atxn->begin(); + atxn->sqlBegin(); } catch(_com_error &e) { - throw ADOException("Error queuing message", e); + throw ADOException("Error queuing message", e, db->getErrors()); } } @@ -738,18 +755,19 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, rsMessages.add(msg); } rsMap.open(db, TblMessageMap); - rsMap.add(msg->getPersistenceId(), queue.getPersistenceId()); + rsMap.add(msg->getPersistenceId(), queue.getPersistenceId(), xid); if (atxn) - atxn->commit(); + atxn->sqlCommit(); else db->commitTransaction(); } catch(_com_error &e) { + std::string errs = db->getErrors(); if (atxn) - atxn->abort(); + atxn->sqlAbort(); else db->rollbackTransaction(); - throw ADOException("Error queuing message", e); + throw ADOException("Error queuing message", e, errs); } msg->enqueueComplete(); } @@ -773,6 +791,7 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, // this is not in the context of a transaction, then just use the thread's // DatabaseConnection with a ADO transaction. DatabaseConnection *db = 0; + std::string xid; AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); if (atxn == 0) { db = initConnection(); @@ -780,36 +799,54 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, } else { (void)initState(); // Ensure this thread is initialized + // It's a transactional dequeue; if it's TPC, grab the xid. + AmqpTPCTransaction *tpcTxn = dynamic_cast<AmqpTPCTransaction*> (ctxt); + if (tpcTxn) + xid = tpcTxn->getXid(); db = atxn->dbConn(); try { - atxn->begin(); + atxn->sqlBegin(); } catch(_com_error &e) { - throw ADOException("Error queuing message", e); + throw ADOException("Error queuing message", e, db->getErrors()); } } MessageMapRecordset rsMap; - MessageRecordset rsMessages; try { rsMap.open(db, TblMessageMap); - bool more = rsMap.remove(msg->getPersistenceId(), - queue.getPersistenceId()); - if (!more) { - rsMessages.open(db, TblMessage); - rsMessages.remove(msg); + // TPC dequeues are just marked pending and will actually be removed + // when the transaction commits; Single-phase dequeues are removed + // now, relying on the SQL transaction to put it back if the + // transaction doesn't commit. + if (!xid.empty()) { + rsMap.pendingRemove(msg->getPersistenceId(), + queue.getPersistenceId(), + xid); + } + else { + rsMap.remove(msg->getPersistenceId(), + queue.getPersistenceId()); } if (atxn) - atxn->commit(); + atxn->sqlCommit(); else db->commitTransaction(); } + catch(ms_sql::Exception&) { + if (atxn) + atxn->sqlAbort(); + else + db->rollbackTransaction(); + throw; + } catch(_com_error &e) { + std::string errs = db->getErrors(); if (atxn) - atxn->abort(); + atxn->sqlAbort(); else db->rollbackTransaction(); - throw ADOException("Error dequeuing message", e); + throw ADOException("Error dequeuing message", e, errs); } msg->dequeueComplete(); } @@ -827,10 +864,10 @@ MSSqlProvider::begin() // it safe and handle this just like a TPC transaction, which actually // can be prepared and committed/aborted from different threads, // making it a bad idea to try using the thread-local DatabaseConnection. - std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection); db->open(options.connectString, options.catalogName); std::auto_ptr<AmqpTransaction> tx(new AmqpTransaction(db)); - tx->begin(); + tx->sqlBegin(); std::auto_ptr<qpid::broker::TransactionContext> tc(tx); return tc; } @@ -839,10 +876,24 @@ std::auto_ptr<qpid::broker::TPCTransactionContext> MSSqlProvider::begin(const std::string& xid) { (void)initState(); // Ensure this thread is initialized - std::auto_ptr<DatabaseConnection> db(new DatabaseConnection); + boost::shared_ptr<DatabaseConnection> db(new DatabaseConnection); db->open(options.connectString, options.catalogName); std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(db, xid)); - tx->begin(); + tx->sqlBegin(); + + TplRecordset rsTpl; + try { + tx->sqlBegin(); + rsTpl.open(db.get(), TblTpl); + rsTpl.add(xid); + tx->sqlCommit(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + tx->sqlAbort(); + throw ADOException("Error adding TPL record", e, errs); + } + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); return tc; } @@ -850,28 +901,122 @@ MSSqlProvider::begin(const std::string& xid) void MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn) { - // The inner transactions used for the components of the TPC are done; - // nothing else to do but wait for the commit. + // Commit all the marked-up enqueue/dequeue ops and the TPL record. + // On commit/rollback the TPL will be removed and the TPL markups + // on the message map will be cleaned up as well. + (void)initState(); // Ensure this thread is initialized + AmqpTPCTransaction *atxn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (atxn == 0) + throw qpid::broker::InvalidTransactionContextException(); + try { + atxn->sqlCommit(); + } + catch(_com_error &e) { + throw ADOException("Error preparing", e, atxn->dbConn()->getErrors()); + } + atxn->setPrepared(); } void MSSqlProvider::commit(qpid::broker::TransactionContext& txn) { (void)initState(); // Ensure this thread is initialized - AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn); - if (atxn == 0) - throw qpid::broker::InvalidTransactionContextException(); - atxn->commit(); + /* + * One-phase transactions simply commit the outer SQL transaction + * that was begun on begin(). Two-phase transactions are different - + * the SQL transaction started on begin() was committed on prepare() + * so all the SQL records reflecting the enqueue/dequeue actions for + * the transaction are recorded but with xid markups on them to reflect + * that they are prepared but not committed. Now go back and remove + * the markups, deleting those marked for removal. + */ + AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (p2txn == 0) { + AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn); + if (p1txn == 0) + throw qpid::broker::InvalidTransactionContextException(); + p1txn->sqlCommit(); + return; + } + + DatabaseConnection *db(p2txn->dbConn()); + TplRecordset rsTpl; + MessageMapRecordset rsMessageMap; + try { + db->beginTransaction(); + rsTpl.open(db, TblTpl); + rsMessageMap.open(db, TblMessageMap); + rsMessageMap.commitPrepared(p2txn->getXid()); + rsTpl.remove(p2txn->getXid()); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error committing transaction", e, errs); + } } void MSSqlProvider::abort(qpid::broker::TransactionContext& txn) { (void)initState(); // Ensure this thread is initialized + /* + * One-phase and non-prepared two-phase transactions simply abort + * the outer SQL transaction that was begun on begin(). However, prepared + * two-phase transactions are different - the SQL transaction started + * on begin() was committed on prepare() so all the SQL records + * reflecting the enqueue/dequeue actions for the transaction are + * recorded but with xid markups on them to reflect that they are + * prepared but not committed. Now go back and remove the markups, + * deleting those marked for addition. + */ + AmqpTPCTransaction *p2txn = dynamic_cast<AmqpTPCTransaction*> (&txn); + if (p2txn == 0 || !p2txn->isPrepared()) { + AmqpTransaction *p1txn = dynamic_cast<AmqpTransaction*> (&txn); + if (p1txn == 0) + throw qpid::broker::InvalidTransactionContextException(); + p1txn->sqlAbort(); + return; + } + + DatabaseConnection *db(p2txn->dbConn()); + TplRecordset rsTpl; + MessageMapRecordset rsMessageMap; + try { + db->beginTransaction(); + rsTpl.open(db, TblTpl); + rsMessageMap.open(db, TblMessageMap); + rsMessageMap.abortPrepared(p2txn->getXid()); + rsTpl.remove(p2txn->getXid()); + db->commitTransaction(); + } + catch(_com_error &e) { + std::string errs = db->getErrors(); + db->rollbackTransaction(); + throw ADOException("Error committing transaction", e, errs); + } + + + (void)initState(); // Ensure this thread is initialized AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (&txn); if (atxn == 0) throw qpid::broker::InvalidTransactionContextException(); - atxn->abort(); + atxn->sqlAbort(); +} + +void +MSSqlProvider::collectPreparedXids(std::set<std::string>& xids) +{ + DatabaseConnection *db = initConnection(); + try { + TplRecordset rsTpl; + rsTpl.open(db, TblTpl); + rsTpl.recover(xids); + } + catch(_com_error &e) { + throw ADOException("Error reading TPL", e, db->getErrors()); + } } // @TODO Much of this recovery code is way too similar... refactor to @@ -977,6 +1122,40 @@ MSSqlProvider::recoverMessages(qpid::broker::RecoveryManager& recoverer, rsMessageMaps.recover(messageQueueMap); } +void +MSSqlProvider::recoverTransactions(qpid::broker::RecoveryManager& recoverer, + PreparedTransactionMap& dtxMap) +{ + DatabaseConnection *db = initConnection(); + std::set<std::string> xids; + try { + TplRecordset rsTpl; + rsTpl.open(db, TblTpl); + rsTpl.recover(xids); + } + catch(_com_error &e) { + throw ADOException("Error recovering TPL records", e, db->getErrors()); + } + + try { + // Rebuild the needed RecoverableTransactions. + for (std::set<std::string>::const_iterator iXid = xids.begin(); + iXid != xids.end(); + ++iXid) { + boost::shared_ptr<DatabaseConnection> dbX(new DatabaseConnection); + dbX->open(options.connectString, options.catalogName); + std::auto_ptr<AmqpTPCTransaction> tx(new AmqpTPCTransaction(dbX, + *iXid)); + tx->setPrepared(); + std::auto_ptr<qpid::broker::TPCTransactionContext> tc(tx); + dtxMap[*iXid] = recoverer.recoverTransaction(*iXid, tc); + } + } + catch(_com_error &e) { + throw ADOException("Error recreating dtx connection", e); + } +} + ////////////// Internal Methods State * @@ -1003,7 +1182,7 @@ MSSqlProvider::initConnection(void) } void -MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) +MSSqlProvider::createDb(DatabaseConnection *db, const std::string &name) { const std::string dbCmd = "CREATE DATABASE " + name; const std::string useCmd = "USE " + name; @@ -1018,9 +1197,15 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) " fieldTableBlob varbinary(MAX))"; const std::string messageMapSpecs = " (messageId bigint REFERENCES tblMessage(persistenceId) NOT NULL," - " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL)"; + " queueId bigint REFERENCES tblQueue(persistenceId) NOT NULL," + " prepareStatus tinyint CHECK (prepareStatus IS NULL OR " + " prepareStatus IN (1, 2))," + " xid varbinary(512) REFERENCES tblTPL(xid)" + " CONSTRAINT CK_NoDups UNIQUE NONCLUSTERED (messageId, queueId) )"; + const std::string tplSpecs = " (xid varbinary(512) PRIMARY KEY NOT NULL)"; _variant_t unused; _bstr_t dbStr = dbCmd.c_str(); + _ConnectionPtr conn(*db); try { conn->Execute(dbStr, &unused, adExecuteNoRecords); _bstr_t useStr = useCmd.c_str(); @@ -1040,12 +1225,15 @@ MSSqlProvider::createDb(_ConnectionPtr conn, const std::string &name) makeTable = tableCmd + TblBinding + bindingSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); + makeTable = tableCmd + TblTpl + tplSpecs; + makeTableStr = makeTable.c_str(); + conn->Execute(makeTableStr, &unused, adExecuteNoRecords); makeTable = tableCmd + TblMessageMap + messageMapSpecs; makeTableStr = makeTable.c_str(); conn->Execute(makeTableStr, &unused, adExecuteNoRecords); } catch(_com_error &e) { - throw ADOException("MSSQL can't create " + name, e); + throw ADOException("MSSQL can't create " + name, e, db->getErrors()); } } |
