diff options
Diffstat (limited to 'cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp')
| -rw-r--r-- | cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp | 87 |
1 files changed, 62 insertions, 25 deletions
diff --git a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp index 4c2b9892ab..e06f8a750f 100644 --- a/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp +++ b/cpp/src/qpid/store/ms-sql/MSSqlProvider.cpp @@ -689,32 +689,49 @@ MSSqlProvider::enqueue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { + // If this enqueue is in the context of a transaction, use the specified + // transaction to nest a new transaction for this operation. However, if + // this is not in the context of a transaction, then just use the thread's + // DatabaseConnection with a ADO transaction. + DatabaseConnection *db = 0; AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); - if (atxn == 0) - throw qpid::broker::InvalidTransactionContextException(); - (void)initState(); // Ensure this thread is initialized - try { - atxn->begin(); + if (atxn == 0) { + db = initConnection(); + db->beginTransaction(); + } + else { + (void)initState(); // Ensure this thread is initialized + db = atxn->dbConn(); + try { + atxn->begin(); + } + catch(_com_error &e) { + throw ADOException("Error queuing message", e); + } } - catch(_com_error &e) { - throw ADOException("Error queuing message", e); - } try { if (msg->getPersistenceId() == 0) { // Message itself not yet saved MessageRecordset rsMessages; - rsMessages.open(atxn->dbConn(), TblMessage); + rsMessages.open(db, TblMessage); rsMessages.add(msg); } MessageMapRecordset rsMap; - rsMap.open(atxn->dbConn(), TblMessageMap); + rsMap.open(db, TblMessageMap); rsMap.add(msg->getPersistenceId(), queue.getPersistenceId()); - atxn->commit(); + if (atxn) + atxn->commit(); + else + db->commitTransaction(); } catch(_com_error &e) { - atxn->abort(); + if (atxn) + atxn->abort(); + else + db->rollbackTransaction(); throw ADOException("Error queuing message", e); - } + } + msg->enqueueComplete(); } /** @@ -731,32 +748,50 @@ MSSqlProvider::dequeue(qpid::broker::TransactionContext* ctxt, const boost::intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { + // If this dequeue is in the context of a transaction, use the specified + // transaction to nest a new transaction for this operation. However, if + // this is not in the context of a transaction, then just use the thread's + // DatabaseConnection with a ADO transaction. + DatabaseConnection *db = 0; AmqpTransaction *atxn = dynamic_cast<AmqpTransaction*> (ctxt); - if (atxn == 0) - throw qpid::broker::InvalidTransactionContextException(); - (void)initState(); // Ensure this thread is initialized - try { - atxn->begin(); + if (atxn == 0) { + db = initConnection(); + db->beginTransaction(); } - catch(_com_error &e) { - throw ADOException("Error queuing message", e); - } + else { + (void)initState(); // Ensure this thread is initialized + db = atxn->dbConn(); + try { + atxn->begin(); + } + catch(_com_error &e) { + throw ADOException("Error queuing message", e); + } + } + try { MessageMapRecordset rsMap; - rsMap.open(atxn->dbConn(), TblMessageMap); + rsMap.open(db, TblMessageMap); bool more = rsMap.remove(msg->getPersistenceId(), queue.getPersistenceId()); if (!more) { MessageRecordset rsMessages; - rsMessages.open(atxn->dbConn(), TblMessage); + rsMessages.open(db, TblMessage); rsMessages.remove(msg); } - atxn->commit(); + if (atxn) + atxn->commit(); + else + db->commitTransaction(); } catch(_com_error &e) { - atxn->abort(); + if (atxn) + atxn->abort(); + else + db->rollbackTransaction(); throw ADOException("Error dequeuing message", e); } + msg->dequeueComplete(); } std::auto_ptr<qpid::broker::TransactionContext> @@ -795,6 +830,8 @@ MSSqlProvider::begin(const std::string& xid) void MSSqlProvider::prepare(qpid::broker::TPCTransactionContext& txn) { + // The inner transactions used for the components of the TPC are done; + // nothing else to do but wait for the commit. } void |
