diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-02-23 22:15:29 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-02-23 22:15:29 +0000 |
| commit | afeb5065ec226e39bb0b6855db63952d9a1ba89c (patch) | |
| tree | 2341895495043707530f839a0be87774f804553e /qpid/java/broker/src | |
| parent | 1b4a67ccc4b501b2b7872881609ee9f0d8c2e3eb (diff) | |
| download | qpid-python-afeb5065ec226e39bb0b6855db63952d9a1ba89c.tar.gz | |
AMQP-24 : [Java Broker] Implement distributed transactions for AMQP 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1292984 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
28 files changed, 2245 insertions, 52 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index e5e755bd23..16a3036ea7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -1615,7 +1615,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } } - @Override public int compareTo(AMQSessionModel session) { return getId().toString().compareTo(session.getID().toString()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties index fadc2e2098..9ef58df940 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties @@ -31,3 +31,9 @@ RECOVERY_START = TXN-1004 : Recovery Start[ : {0}] RECOVERED = TXN-1005 : Recovered {0,number} messages for queue {1} # 0 - queue name RECOVERY_COMPLETE = TXN-1006 : Recovery Complete[ : {0}] +# 0 - xid +# 1 - queue name +XA_INCOMPLETE_QUEUE = TXN-1007 : XA transaction recover for xid {0} incomplete as it references a queue {1} which was not durably retained +# 0 - xid format +# 1 - message id +XA_INCOMPLETE_MESSAGE = TXN-1008 : XA transaction recover for xid {0} incomplete as it references a message {1} which was not durably retained diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 2877e25645..2cc9a5423e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -92,7 +92,9 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor private static final String LINKS_TABLE_NAME = "QPID_LINKS"; private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES"; - + private static final String XID_TABLE_NAME = "QPID_XIDS"; + private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS"; + private static final int DB_VERSION = 3; @@ -190,6 +192,31 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor + "arguments )" + " values (?, ?, ?, ?, ?, ?)"; + private static final String CREATE_XIDS_TABLE = + "CREATE TABLE "+XID_TABLE_NAME+" ( format bigint not null," + + " global_id varchar(64) for bit data, branch_id varchar(64) for bit data, PRIMARY KEY ( format, " + + "global_id, branch_id ))"; + private static final String INSERT_INTO_XIDS = + "INSERT INTO "+XID_TABLE_NAME+" ( format, global_id, branch_id ) values (?, ?, ?)"; + private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME + + " WHERE format = ? and global_id = ? and branch_id = ?"; + private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME; + + + private static final String CREATE_XID_ACTIONS_TABLE = + "CREATE TABLE "+XID_ACTIONS_TABLE_NAME+" ( format bigint not null," + + " global_id varchar(64) for bit data not null, branch_id varchar(64) for bit data not null, " + + "action_type char not null, queue_name varchar(255) not null, message_id bigint not null" + + ", PRIMARY KEY ( " + + "format, global_id, branch_id, action_type, queue_name, message_id))"; + private static final String INSERT_INTO_XID_ACTIONS = + "INSERT INTO "+XID_ACTIONS_TABLE_NAME+" ( format, global_id, branch_id, action_type, " + + "queue_name, message_id ) values (?,?,?,?,?,?) "; + private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME + + " WHERE format = ? and global_id = ? and branch_id = ?"; + private static final String SELECT_ALL_FROM_XID_ACTIONS = + "SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME + + " WHERE format = ? and global_id = ? and branch_id = ?"; private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; @@ -295,7 +322,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor _configured = true; } - recoverQueueEntries(recoveryHandler); + TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(recoveryHandler); + recoverXids(dtxrh); } @@ -350,7 +378,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor createMessageContentTable(conn); createLinkTable(conn); createBridgeTable(conn); - + createXidTable(conn); + createXidActionTable(conn); conn.close(); } @@ -519,8 +548,38 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } } + private void createXidTable(final Connection conn) throws SQLException + { + if(!tableExists(XID_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_XIDS_TABLE); + } + finally + { + stmt.close(); + } + } + } + private void createXidActionTable(final Connection conn) throws SQLException + { + if(!tableExists(XID_ACTIONS_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_XID_ACTIONS_TABLE); + } + finally + { + stmt.close(); + } + } + } private boolean tableExists(final String tableName, final Connection conn) throws SQLException { @@ -1726,6 +1785,126 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } + + private void removeXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId) + throws AMQStoreException + { + Connection conn = connWrapper.getConnection(); + + + try + { + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_XIDS); + try + { + stmt.setLong(1,format); + stmt.setBytes(2,globalId); + stmt.setBytes(3,branchId); + int results = stmt.executeUpdate(); + + + + if(results != 1) + { + throw new AMQStoreException("Unable to find message with xid"); + } + } + finally + { + stmt.close(); + } + + stmt = conn.prepareStatement(DELETE_FROM_XID_ACTIONS); + try + { + stmt.setLong(1,format); + stmt.setBytes(2,globalId); + stmt.setBytes(3,branchId); + int results = stmt.executeUpdate(); + + } + finally + { + stmt.close(); + } + + } + catch (SQLException e) + { + _logger.error("Failed to dequeue: " + e.getMessage(), e); + throw new AMQStoreException("Error deleting enqueued message with xid", e); + } + + } + + + private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId, + Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws AMQStoreException + { + Connection conn = connWrapper.getConnection(); + + + try + { + + PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_XIDS); + try + { + stmt.setLong(1,format); + stmt.setBytes(2, globalId); + stmt.setBytes(3, branchId); + stmt.executeUpdate(); + } + finally + { + stmt.close(); + } + + stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS); + + try + { + stmt.setLong(1,format); + stmt.setBytes(2, globalId); + stmt.setBytes(3, branchId); + + if(enqueues != null) + { + stmt.setString(4, "E"); + for(Transaction.Record record : enqueues) + { + stmt.setString(5, record.getQueue().getResourceName()); + stmt.setLong(6, record.getMessage().getMessageNumber()); + stmt.executeUpdate(); + } + } + + if(dequeues != null) + { + stmt.setString(4, "D"); + for(Transaction.Record record : dequeues) + { + stmt.setString(5, record.getQueue().getResourceName()); + stmt.setLong(6, record.getMessage().getMessageNumber()); + stmt.executeUpdate(); + } + } + + } + finally + { + stmt.close(); + } + + } + catch (SQLException e) + { + _logger.error("Failed to enqueue: " + e.getMessage(), e); + throw new AMQStoreException("Error writing xid ", e); + } + + } + private static final class ConnectionWrapper { private final Connection _connection; @@ -1919,7 +2098,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor - private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException + private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException { Connection conn = newAutoCommitConnection(); try @@ -1950,7 +2129,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor stmt.close(); } - queueEntryHandler.completeQueueEntryRecovery(); + return queueEntryHandler.completeQueueEntryRecovery(); } finally { @@ -1958,6 +2137,165 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } } + private static final class Xid + { + + private final long _format; + private final byte[] _globalId; + private final byte[] _branchId; + + public Xid(long format, byte[] globalId, byte[] branchId) + { + _format = format; + _globalId = globalId; + _branchId = branchId; + } + + public long getFormat() + { + return _format; + } + + public byte[] getGlobalId() + { + return _globalId; + } + + public byte[] getBranchId() + { + return _branchId; + } + } + + private static class RecordImpl implements MessageStore.Transaction.Record, TransactionLogResource, EnqueableMessage + { + + private final String _queueName; + private long _messageNumber; + + public RecordImpl(String queueName, long messageNumber) + { + _queueName = queueName; + _messageNumber = messageNumber; + } + + public TransactionLogResource getQueue() + { + return this; + } + + public EnqueableMessage getMessage() + { + return this; + } + + public long getMessageNumber() + { + return _messageNumber; + } + + public boolean isPersistent() + { + return true; + } + + public StoredMessage getStoredMessage() + { + throw new UnsupportedOperationException(); + } + + public String getResourceName() + { + return _queueName; + } + } + + private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException + { + Connection conn = newAutoCommitConnection(); + try + { + List<Xid> xids = new ArrayList<Xid>(); + + Statement stmt = conn.createStatement(); + try + { + ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS); + try + { + while(rs.next()) + { + + long format = rs.getLong(1); + byte[] globalId = rs.getBytes(2); + byte[] branchId = rs.getBytes(3); + xids.add(new Xid(format, globalId, branchId)); + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + + + for(Xid xid : xids) + { + List<RecordImpl> enqueues = new ArrayList<RecordImpl>(); + List<RecordImpl> dequeues = new ArrayList<RecordImpl>(); + + PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS); + + pstmt.setLong(1, xid.getFormat()); + pstmt.setBytes(2, xid.getGlobalId()); + pstmt.setBytes(3, xid.getBranchId()); + try + { + ResultSet rs = pstmt.executeQuery(); + try + { + while(rs.next()) + { + + String actionType = rs.getString(1); + String queueName = rs.getString(2); + long messageId = rs.getLong(3); + + RecordImpl record = new RecordImpl(queueName, messageId); + List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues; + records.add(record); + } + } + finally + { + rs.close(); + } + } + finally + { + pstmt.close(); + } + + dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), + enqueues.toArray(new RecordImpl[enqueues.size()]), + dequeues.toArray(new RecordImpl[dequeues.size()])); + } + + + dtxrh.completeDtxRecordRecovery(); + } + finally + { + conn.close(); + } + + } + StorableMessageMetaData getMetaData(long messageId) throws SQLException { @@ -2175,8 +2513,21 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor { DerbyMessageStore.this.abortTran(_connWrapper); } + + public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException + { + DerbyMessageStore.this.removeXid(_connWrapper, format, globalId, branchId); + } + + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + throws AMQStoreException + { + DerbyMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues); + } } + + private class StoredDerbyMessage implements StoredMessage { @@ -2360,4 +2711,4 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } } -} +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 546a81a050..b01e5aa954 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -78,6 +78,14 @@ public class MemoryMessageStore implements MessageStore, DurableConfigurationSto { } + public void removeXid(long format, byte[] globalId, byte[] branchId) + { + } + + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + } + }; public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index 2fecd5d4be..00bb0449d6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; - import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.EnqueableMessage; @@ -126,7 +125,16 @@ public interface MessageStore void abortTran() throws AMQStoreException; + public static interface Record + { + TransactionLogResource getQueue(); + EnqueableMessage getMessage(); + } + + void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException; + void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + throws AMQStoreException; } public void configureTransactionLog(String name, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java index 802596ed1e..48ca72718b 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java @@ -28,6 +28,13 @@ public interface TransactionLogRecoveryHandler { void queueEntry(String queuename, long messageId); - void completeQueueEntryRecovery(); + DtxRecordRecoveryHandler completeQueueEntryRecovery(); + } + + public static interface DtxRecordRecoveryHandler + { + void dtxRecord(long format, byte[] globalId, byte[] branchId, MessageStore.Transaction.Record[] enqueues, MessageStore.Transaction.Record[] dequeues); + + void completeDtxRecordRecovery(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index c32f642eea..48e372f87e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; +import org.apache.qpid.server.txn.RollbackOnlyDtxException; +import org.apache.qpid.server.txn.TimeoutDtxException; import static org.apache.qpid.util.Serial.gt; import java.security.Principal; @@ -44,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.configuration.ConfigStore; @@ -67,24 +70,19 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription_0_10; +import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; +import org.apache.qpid.server.txn.DistributedTransaction; +import org.apache.qpid.server.txn.DtxNotSelectedException; +import org.apache.qpid.server.txn.IncorrectDtxStateException; +import org.apache.qpid.server.txn.JoinAndResumeDtxException; import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.txn.NotAssociatedDtxException; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.txn.SuspendAndFailDtxException; +import org.apache.qpid.server.txn.UnknownDtxBranchException; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlow; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageSetFlowMode; -import org.apache.qpid.transport.MessageStop; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.RangeSetFactory; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionDelegate; +import org.apache.qpid.transport.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,7 +107,6 @@ public class ServerSession extends Session private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); - public static interface MessageDispositionChangeListener { public void onAccept(); @@ -359,7 +356,15 @@ public class ServerSession extends Session public void onClose() { - _transaction.rollback(); + if(_transaction instanceof LocalTransaction) + { + _transaction.rollback(); + } + else if(_transaction instanceof DistributedTransaction) + { + getVirtualHost().getDtxRegistry().endAssociations(this); + } + for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) { listener.onRelease(true); @@ -455,6 +460,95 @@ public class ServerSession extends Session _txnStarts.incrementAndGet(); } + public void selectDtx() + { + _transaction = new DistributedTransaction(this, getMessageStore(), getVirtualHost()); + + } + + + public void startDtx(Xid xid, boolean join, boolean resume) + throws JoinAndResumeDtxException, + UnknownDtxBranchException, + AlreadyKnownDtxException, + DtxNotSelectedException + { + DistributedTransaction distributedTransaction = assertDtxTransaction(); + distributedTransaction.start(xid, join, resume); + } + + + public void endDtx(Xid xid, boolean fail, boolean suspend) + throws NotAssociatedDtxException, + UnknownDtxBranchException, + DtxNotSelectedException, + SuspendAndFailDtxException, TimeoutDtxException + { + DistributedTransaction distributedTransaction = assertDtxTransaction(); + distributedTransaction.end(xid, fail, suspend); + } + + + public long getTimeoutDtx(Xid xid) + throws UnknownDtxBranchException + { + return getVirtualHost().getDtxRegistry().getTimeout(xid); + } + + + public void setTimeoutDtx(Xid xid, long timeout) + throws UnknownDtxBranchException + { + getVirtualHost().getDtxRegistry().setTimeout(xid, timeout); + } + + + public void prepareDtx(Xid xid) + throws UnknownDtxBranchException, + IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException + { + getVirtualHost().getDtxRegistry().prepare(xid); + } + + public void commitDtx(Xid xid, boolean onePhase) + throws UnknownDtxBranchException, + IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException + { + getVirtualHost().getDtxRegistry().commit(xid, onePhase); + } + + + public void rollbackDtx(Xid xid) + throws UnknownDtxBranchException, + IncorrectDtxStateException, AMQStoreException, TimeoutDtxException + { + getVirtualHost().getDtxRegistry().rollback(xid); + } + + + public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException + { + getVirtualHost().getDtxRegistry().forget(xid); + } + + public List<Xid> recoverDtx() + { + return getVirtualHost().getDtxRegistry().recover(); + } + + private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException + { + if(_transaction instanceof DistributedTransaction) + { + return (DistributedTransaction) _transaction; + } + else + { + throw new DtxNotSelectedException(); + } + } + + public void commit() { _transaction.commit(); @@ -910,13 +1004,11 @@ public class ServerSession extends Session } } - protected void setClose(boolean close) { super.setClose(close); } - @Override public int compareTo(AMQSessionModel session) { return getId().toString().compareTo(session.getID().toString()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 4648a53b40..d18086808f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.transport; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -50,7 +51,16 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.Subscription_0_10; +import org.apache.qpid.server.txn.AlreadyKnownDtxException; +import org.apache.qpid.server.txn.DtxNotSelectedException; +import org.apache.qpid.server.txn.IncorrectDtxStateException; +import org.apache.qpid.server.txn.JoinAndResumeDtxException; +import org.apache.qpid.server.txn.NotAssociatedDtxException; +import org.apache.qpid.server.txn.RollbackOnlyDtxException; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.txn.SuspendAndFailDtxException; +import org.apache.qpid.server.txn.TimeoutDtxException; +import org.apache.qpid.server.txn.UnknownDtxBranchException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; @@ -429,6 +439,235 @@ public class ServerSessionDelegate extends SessionDelegate ((ServerSession)session).rollback(); } + @Override + public void dtxSelect(Session session, DtxSelect method) + { + // TODO - check current tx mode + ((ServerSession)session).selectDtx(); + } + + @Override + public void dtxStart(Session session, DtxStart method) + { + XaResult result = new XaResult(); + result.setStatus(DtxXaStatus.XA_OK); + try + { + ((ServerSession)session).startDtx(method.getXid(), method.getJoin(), method.getResume()); + session.executionResult(method.getId(), result); + } + catch(JoinAndResumeDtxException e) + { + exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage()); + } + catch(UnknownDtxBranchException e) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Unknown xid " + method.getXid()); + } + catch(AlreadyKnownDtxException e) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Xid already started an neither join nor " + + "resume set" + method.getXid()); + } + catch(DtxNotSelectedException e) + { + exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage()); + } + + } + + @Override + public void dtxEnd(Session session, DtxEnd method) + { + XaResult result = new XaResult(); + result.setStatus(DtxXaStatus.XA_OK); + try + { + try + { + ((ServerSession)session).endDtx(method.getXid(), method.getFail(), method.getSuspend()); + } + catch (TimeoutDtxException e) + { + result.setStatus(DtxXaStatus.XA_RBTIMEOUT); + } + session.executionResult(method.getId(), result); + } + catch(UnknownDtxBranchException e) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); + } + catch(NotAssociatedDtxException e) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); + } + catch(DtxNotSelectedException e) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); + } + catch(SuspendAndFailDtxException e) + { + exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage()); + } + + } + + @Override + public void dtxCommit(Session session, DtxCommit method) + { + XaResult result = new XaResult(); + result.setStatus(DtxXaStatus.XA_OK); + try + { + try + { + ((ServerSession)session).commitDtx(method.getXid(), method.getOnePhase()); + } + catch (RollbackOnlyDtxException e) + { + result.setStatus(DtxXaStatus.XA_RBROLLBACK); + } + catch (TimeoutDtxException e) + { + result.setStatus(DtxXaStatus.XA_RBTIMEOUT); + } + session.executionResult(method.getId(), result); + } + catch(UnknownDtxBranchException e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage()); + } + catch(IncorrectDtxStateException e) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); + } + catch(AMQStoreException e) + { + exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage()); + } + } + + @Override + public void dtxForget(Session session, DtxForget method) + { + try + { + ((ServerSession)session).forgetDtx(method.getXid()); + } + catch(UnknownDtxBranchException e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage()); + } + catch(IncorrectDtxStateException e) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); + } + + } + + @Override + public void dtxGetTimeout(Session session, DtxGetTimeout method) + { + GetTimeoutResult result = new GetTimeoutResult(); + try + { + result.setTimeout(((ServerSession) session).getTimeoutDtx(method.getXid())); + session.executionResult(method.getId(), result); + } + catch(UnknownDtxBranchException e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage()); + } + } + + @Override + public void dtxPrepare(Session session, DtxPrepare method) + { + XaResult result = new XaResult(); + result.setStatus(DtxXaStatus.XA_OK); + try + { + try + { + ((ServerSession)session).prepareDtx(method.getXid()); + } + catch (RollbackOnlyDtxException e) + { + result.setStatus(DtxXaStatus.XA_RBROLLBACK); + } + catch (TimeoutDtxException e) + { + result.setStatus(DtxXaStatus.XA_RBTIMEOUT); + } + session.executionResult((int) method.getId(), result); + } + catch(UnknownDtxBranchException e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage()); + } + catch(IncorrectDtxStateException e) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); + } + catch(AMQStoreException e) + { + exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage()); + } + } + + @Override + public void dtxRecover(Session session, DtxRecover method) + { + RecoverResult result = new RecoverResult(); + List inDoubt = ((ServerSession)session).recoverDtx(); + result.setInDoubt(inDoubt); + session.executionResult(method.getId(), result); + } + + @Override + public void dtxRollback(Session session, DtxRollback method) + { + + XaResult result = new XaResult(); + result.setStatus(DtxXaStatus.XA_OK); + try + { + try + { + ((ServerSession)session).rollbackDtx(method.getXid()); + } + catch (TimeoutDtxException e) + { + result.setStatus(DtxXaStatus.XA_RBTIMEOUT); + } + session.executionResult(method.getId(), result); + } + catch(UnknownDtxBranchException e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage()); + } + catch(IncorrectDtxStateException e) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage()); + } + catch(AMQStoreException e) + { + exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage()); + } + } + + @Override + public void dtxSetTimeout(Session session, DtxSetTimeout method) + { + try + { + ((ServerSession)session).setTimeoutDtx(method.getXid(), method.getTimeout()); + } + catch(UnknownDtxBranchException e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage()); + } + } @Override public void executionSync(final Session ssn, final ExecutionSync sync) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AlreadyKnownDtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AlreadyKnownDtxException.java new file mode 100644 index 0000000000..faa4ec592f --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AlreadyKnownDtxException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +import org.apache.qpid.transport.Xid; + +public class AlreadyKnownDtxException extends DtxException +{ + public AlreadyKnownDtxException(Xid id) + { + super("Xid " + id + " cannot be started as it is already known"); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java new file mode 100644 index 0000000000..36f5f7b58f --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java @@ -0,0 +1,246 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.Xid; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class DistributedTransaction implements ServerTransaction +{ + + private final AutoCommitTransaction _autoCommitTransaction; + + private volatile MessageStore.Transaction _transaction; + + private long _txnStartTime = 0L; + + private DtxBranch _branch; + private AMQSessionModel _session; + private VirtualHost _vhost; + + + public DistributedTransaction(AMQSessionModel session, MessageStore store, VirtualHost vhost) + { + _session = session; + _vhost = vhost; + _autoCommitTransaction = new AutoCommitTransaction(vhost.getMessageStore()); + } + + public long getTransactionStartTime() + { + return _txnStartTime; + } + + public void addPostTransactionAction(Action postTransactionAction) + { + if(_branch != null) + { + _branch.addPostTransactionAcion(postTransactionAction); + } + else + { + _autoCommitTransaction.addPostTransactionAction(postTransactionAction); + } + } + + public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) + { + if(_branch != null) + { + _branch.dequeue(queue, message); + _branch.addPostTransactionAcion(postTransactionAction); + } + else + { + _autoCommitTransaction.dequeue(queue, message, postTransactionAction); + } + } + + public void dequeue(Collection<QueueEntry> messages, Action postTransactionAction) + { + if(_branch != null) + { + for(QueueEntry entry : messages) + { + _branch.dequeue(entry.getQueue(), entry.getMessage()); + } + _branch.addPostTransactionAcion(postTransactionAction); + } + else + { + _autoCommitTransaction.dequeue(messages, postTransactionAction); + } + } + + public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) + { + if(_branch != null) + { + _branch.enqueue(queue, message); + _branch.addPostTransactionAcion(postTransactionAction); + enqueue(Collections.singletonList(queue), message, postTransactionAction, System.currentTimeMillis()); + } + else + { + _autoCommitTransaction.enqueue(queue, message, postTransactionAction); + } + } + + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, + Action postTransactionAction, long currentTime) + { + if(_branch != null) + { + for(BaseQueue queue : queues) + { + _branch.enqueue(queue, message); + } + _branch.addPostTransactionAcion(postTransactionAction); + } + else + { + _autoCommitTransaction.enqueue(queues, message, postTransactionAction, currentTime); + } + } + + public void commit() + { + throw new IllegalStateException("Cannot call tx.commit() on a distributed transaction"); + } + + public void commit(Runnable immediatePostTransactionAction) + { + throw new IllegalStateException("Cannot call tx.commit() on a distributed transaction"); + } + + public void rollback() + { + throw new IllegalStateException("Cannot call tx.rollback() on a distributed transaction"); + } + + public boolean isTransactional() + { + return _branch != null; + } + + public void start(Xid id, boolean join, boolean resume) + throws UnknownDtxBranchException, AlreadyKnownDtxException, JoinAndResumeDtxException + { + if(join && resume) + { + throw new JoinAndResumeDtxException(id); + } + + DtxBranch branch = _vhost.getDtxRegistry().getBranch(id); + + if(branch == null) + { + if(join || resume) + { + throw new UnknownDtxBranchException(id); + } + branch = new DtxBranch(id,_vhost.getMessageStore(), _vhost); + if(_vhost.getDtxRegistry().registerBranch(branch)) + { + _branch = branch; + branch.associateSession(_session); + } + else + { + throw new AlreadyKnownDtxException(id); + } + } + else + { + if(join) + { + branch.associateSession(_session); + } + else if(resume) + { + branch.resumeSession(_session); + } + else + { + throw new AlreadyKnownDtxException(id); + } + _branch = branch; + } + } + + public void end(Xid id, boolean fail, boolean suspend) + throws UnknownDtxBranchException, NotAssociatedDtxException, SuspendAndFailDtxException, TimeoutDtxException + { + DtxBranch branch = _vhost.getDtxRegistry().getBranch(id); + + if(suspend && fail) + { + branch.disassociateSession(_session); + _branch = null; + throw new SuspendAndFailDtxException(id); + } + + + if(branch == null) + { + throw new UnknownDtxBranchException(id); + } + else + { + if(!branch.isAssociated(_session)) + { + throw new NotAssociatedDtxException(id); + } + if(branch.expired() || branch.getState() == DtxBranch.State.TIMEDOUT) + { + branch.disassociateSession(_session); + throw new TimeoutDtxException(id); + } + + if(suspend) + { + branch.suspendSession(_session); + } + else + { + if(fail) + { + branch.setState(DtxBranch.State.ROLLBACK_ONLY); + } + branch.disassociateSession(_session); + } + + _branch = null; + + } + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java new file mode 100644 index 0000000000..99bb639261 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java @@ -0,0 +1,348 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.Xid; + +public class DtxBranch +{ + private static final Logger _logger = Logger.getLogger(DtxBranch.class); + + private final Xid _xid; + private final List<ServerTransaction.Action> _postTransactionActions = new ArrayList<ServerTransaction.Action>(); + private State _state = State.ACTIVE; + private long _timeout; + private Map<AMQSessionModel, State> _associatedSessions = new HashMap<AMQSessionModel, State>(); + private final List<Record> _enqueueRecords = new ArrayList<Record>(); + private final List<Record> _dequeueRecords = new ArrayList<Record>(); + + private MessageStore.Transaction _transaction; + private long _expiration; + private VirtualHost _vhost; + private ScheduledFuture<?> _timeoutFuture; + private MessageStore _store; + + + public enum State + { + ACTIVE, + PREPARED, + TIMEDOUT, + SUSPENDED, + FORGOTTEN, + HEUR_COM, + HEUR_RB, + ROLLBACK_ONLY + } + + + public DtxBranch(Xid xid, MessageStore store, VirtualHost vhost) + { + _xid = xid; + _store = store; + _vhost = vhost; + } + + public Xid getXid() + { + return _xid; + } + + public State getState() + { + return _state; + } + + public void setState(State state) + { + _state = state; + } + + public long getTimeout() + { + return _timeout; + } + + public void setTimeout(long timeout) + { + if(_timeoutFuture != null) + { + _timeoutFuture.cancel(false); + } + _timeout = timeout; + _expiration = timeout == 0 ? 0 : System.currentTimeMillis() + timeout; + + if(_timeout == 0) + { + _timeoutFuture = null; + } + else + { + _timeoutFuture = _vhost.scheduleTask(_timeout, new Runnable() + { + public void run() + { + setState(State.TIMEDOUT); + try + { + rollback(); + } + catch (AMQStoreException e) + { + _logger.error("Unexpected error when attempting to rollback XA transaction ("+ + _xid + ") due to timeout", e); + throw new RuntimeException(e); + } + } + }); + } + } + + public boolean expired() + { + return _timeout != 0 && _expiration < System.currentTimeMillis(); + } + + public synchronized boolean isAssociated(AMQSessionModel session) + { + return _associatedSessions.containsKey(session); + } + + public synchronized boolean hasAssociatedSessions() + { + return !_associatedSessions.isEmpty(); + } + + + public synchronized boolean hasAssociatedActiveSessions() + { + if(hasAssociatedSessions()) + { + for(State state : _associatedSessions.values()) + { + if(state != State.SUSPENDED) + { + return true; + } + } + } + return false; + } + + public synchronized void clearAssociations() + { + _associatedSessions.clear(); + } + + synchronized boolean associateSession(AMQSessionModel associatedSession) + { + return _associatedSessions.put(associatedSession, State.ACTIVE) != null; + } + + synchronized boolean disassociateSession(AMQSessionModel associatedSession) + { + return _associatedSessions.remove(associatedSession) != null; + } + + public synchronized boolean resumeSession(AMQSessionModel session) + { + if(_associatedSessions.containsKey(session) && _associatedSessions.get(session) == State.SUSPENDED) + { + _associatedSessions.put(session, State.ACTIVE); + return true; + } + return false; + } + + public synchronized boolean suspendSession(AMQSessionModel session) + { + if(_associatedSessions.containsKey(session) && _associatedSessions.get(session) == State.ACTIVE) + { + _associatedSessions.put(session, State.SUSPENDED); + return true; + } + return false; + } + + public void prepare() throws AMQStoreException + { + + MessageStore.Transaction txn = _store.newTransaction(); + txn.recordXid(_xid.getFormat(), + _xid.getGlobalId(), + _xid.getBranchId(), + _enqueueRecords.toArray(new Record[_enqueueRecords.size()]), + _dequeueRecords.toArray(new Record[_dequeueRecords.size()])); + txn.commitTran(); + + prePrepareTransaction(); + } + + public synchronized void rollback() throws AMQStoreException + { + if(_timeoutFuture != null) + { + _timeoutFuture.cancel(false); + _timeoutFuture = null; + } + + + if(_transaction != null) + { + // prepare has previously been called + + MessageStore.Transaction txn = _store.newTransaction(); + txn.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId()); + txn.commitTran(); + + _transaction.abortTran(); + } + + for(ServerTransaction.Action action : _postTransactionActions) + { + action.onRollback(); + } + _postTransactionActions.clear(); + } + + public void commit() throws AMQStoreException + { + if(_timeoutFuture != null) + { + _timeoutFuture.cancel(false); + _timeoutFuture = null; + } + + if(_transaction == null) + { + prePrepareTransaction(); + } + else + { + _transaction.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId()); + } + _transaction.commitTran(); + + for(ServerTransaction.Action action : _postTransactionActions) + { + action.postCommit(); + } + _postTransactionActions.clear(); + } + + public void prePrepareTransaction() throws AMQStoreException + { + _transaction = _store.newTransaction(); + + for(Record enqueue : _enqueueRecords) + { + if(enqueue.isDurable()) + { + _transaction.enqueueMessage(enqueue.getQueue(), enqueue.getMessage()); + } + } + + + for(Record enqueue : _dequeueRecords) + { + if(enqueue.isDurable()) + { + _transaction.dequeueMessage(enqueue.getQueue(), enqueue.getMessage()); + } + } + } + + + public void addPostTransactionAcion(ServerTransaction.Action postTransactionAction) + { + _postTransactionActions.add(postTransactionAction); + } + + + public void dequeue(BaseQueue queue, EnqueableMessage message) + { + _dequeueRecords.add(new Record(queue, message)); + } + + + public void enqueue(BaseQueue queue, EnqueableMessage message) + { + _enqueueRecords.add(new Record(queue, message)); + } + + private static final class Record implements MessageStore.Transaction.Record + { + private final BaseQueue _queue; + private final EnqueableMessage _message; + + public Record(BaseQueue queue, EnqueableMessage message) + { + _queue = queue; + _message = message; + } + + public BaseQueue getQueue() + { + return _queue; + } + + public EnqueableMessage getMessage() + { + return _message; + } + + public boolean isDurable() + { + return _message.isPersistent() && _queue.isDurable(); + } + } + + + public void close() + { + if(_transaction != null) + { + try + { + _state = null; + _transaction.abortTran(); + } + catch(AMQStoreException e) + { + _logger.error("Error while closing XA branch", e); + } + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxException.java new file mode 100644 index 0000000000..d18d0cb68b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxException.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +public class DtxException extends Exception +{ + public DtxException() + { + } + + public DtxException(String message) + { + super(message); + } + + public DtxException(String message, Throwable cause) + { + super(message, cause); + } + + public DtxException(Throwable cause) + { + super(cause); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxNotSelectedException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxNotSelectedException.java new file mode 100644 index 0000000000..c1289b1fdd --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxNotSelectedException.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +public class DtxNotSelectedException extends DtxException +{ + public DtxNotSelectedException() + { + super("Distribution transactions have not been selected on this session"); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java new file mode 100644 index 0000000000..5c54c1164f --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java @@ -0,0 +1,333 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.transport.Xid; + +public class DtxRegistry +{ + private final Map<ComparableXid, DtxBranch> _branches = new HashMap<ComparableXid, DtxBranch>(); + + + private static final class ComparableXid + { + private final Xid _xid; + + private ComparableXid(Xid xid) + { + _xid = xid; + } + + @Override + public boolean equals(Object o) + { + if(this == o) + { + return true; + } + if(o == null || getClass() != o.getClass()) + { + return false; + } + + ComparableXid that = (ComparableXid) o; + + return compareBytes(_xid.getBranchId(), that._xid.getBranchId()) + && compareBytes(_xid.getGlobalId(), that._xid.getGlobalId()); + } + + private static boolean compareBytes(byte[] a, byte[] b) + { + if(a.length != b.length) + { + return false; + } + for(int i = 0; i < a.length; i++) + { + if(a[i] != b[i]) + { + return false; + } + } + return true; + } + + + @Override + public int hashCode() + { + int result = 0; + for(int i = 0; i < _xid.getGlobalId().length; i++) + { + result = 31 * result + (int) _xid.getGlobalId()[i]; + } + for(int i = 0; i < _xid.getBranchId().length; i++) + { + result = 31 * result + (int) _xid.getBranchId()[i]; + } + + return result; + } + } + + public synchronized DtxBranch getBranch(Xid xid) + { + return _branches.get(new ComparableXid(xid)); + } + + public synchronized boolean registerBranch(DtxBranch branch) + { + ComparableXid xid = new ComparableXid(branch.getXid()); + if(!_branches.containsKey(xid)) + { + _branches.put(xid, branch); + return true; + } + return false; + } + + synchronized boolean unregisterBranch(DtxBranch branch) + { + return (_branches.remove(new ComparableXid(branch.getXid())) != null); + } + + public void commit(Xid id, boolean onePhase) + throws IncorrectDtxStateException, UnknownDtxBranchException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException + { + DtxBranch branch = getBranch(id); + if(branch != null) + { + synchronized (branch) + { + if(!branch.hasAssociatedActiveSessions()) + { + branch.clearAssociations(); + + if(branch.expired() || branch.getState() == DtxBranch.State.TIMEDOUT) + { + unregisterBranch(branch); + throw new TimeoutDtxException(id); + } + else if(branch.getState() == DtxBranch.State.ROLLBACK_ONLY) + { + throw new RollbackOnlyDtxException(id); + } + else if(onePhase && branch.getState() == DtxBranch.State.PREPARED) + { + throw new IncorrectDtxStateException("Cannot call one-phase commit on a prepared branch", id); + } + else if(!onePhase && branch.getState() != DtxBranch.State.PREPARED) + { + throw new IncorrectDtxStateException("Cannot call two-phase commit on a non-prepared branch", + id); + } + branch.commit(); + branch.setState(DtxBranch.State.FORGOTTEN); + unregisterBranch(branch); + } + else + { + throw new IncorrectDtxStateException("Branch was still associated with a session", id); + } + } + } + else + { + throw new UnknownDtxBranchException(id); + } + } + + public synchronized void prepare(Xid id) + throws UnknownDtxBranchException, + IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException + { + DtxBranch branch = getBranch(id); + if(branch != null) + { + synchronized (branch) + { + if(!branch.hasAssociatedActiveSessions()) + { + branch.clearAssociations(); + + if(branch.expired() || branch.getState() == DtxBranch.State.TIMEDOUT) + { + unregisterBranch(branch); + throw new TimeoutDtxException(id); + } + else if(branch.getState() != DtxBranch.State.ACTIVE + && branch.getState() != DtxBranch.State.ROLLBACK_ONLY) + { + throw new IncorrectDtxStateException("Cannot prepare a transaction in state " + + branch.getState(), id); + } + else + { + branch.prepare(); + branch.setState(DtxBranch.State.PREPARED); + } + } + else + { + throw new IncorrectDtxStateException("Branch still has associated sessions", id); + } + } + } + else + { + throw new UnknownDtxBranchException(id); + } + } + + public void rollback(Xid id) + throws IncorrectDtxStateException, + UnknownDtxBranchException, + AMQStoreException, TimeoutDtxException + { + + DtxBranch branch = getBranch(id); + if(branch != null) + { + synchronized (branch) + { + if(branch.expired() || branch.getState() == DtxBranch.State.TIMEDOUT) + { + unregisterBranch(branch); + throw new TimeoutDtxException(id); + } + if(!branch.hasAssociatedActiveSessions()) + { + branch.clearAssociations(); + branch.rollback(); + branch.setState(DtxBranch.State.FORGOTTEN); + unregisterBranch(branch); + } + else + { + throw new IncorrectDtxStateException("Branch was still associated with a session", id); + } + } + } + else + { + throw new UnknownDtxBranchException(id); + } + } + + + public void forget(Xid id) throws UnknownDtxBranchException, IncorrectDtxStateException + { + DtxBranch branch = getBranch(id); + if(branch != null) + { + synchronized (branch) + { + if(!branch.hasAssociatedSessions()) + { + if(branch.getState() != DtxBranch.State.HEUR_COM && branch.getState() != DtxBranch.State.HEUR_RB) + { + throw new IncorrectDtxStateException("Branch should not be forgotten - " + + "it is not heuristically complete", id); + } + branch.setState(DtxBranch.State.FORGOTTEN); + unregisterBranch(branch); + } + else + { + throw new IncorrectDtxStateException("Branch was still associated with a session", id); + } + } + } + else + { + throw new UnknownDtxBranchException(id); + } + } + + public long getTimeout(Xid id) throws UnknownDtxBranchException + { + DtxBranch branch = getBranch(id); + if(branch != null) + { + return branch.getTimeout(); + } + else + { + throw new UnknownDtxBranchException(id); + } + } + + public void setTimeout(Xid id, long timeout) throws UnknownDtxBranchException + { + DtxBranch branch = getBranch(id); + if(branch != null) + { + branch.setTimeout(timeout); + } + else + { + throw new UnknownDtxBranchException(id); + } + } + + public synchronized List<Xid> recover() + { + List<Xid> inDoubt = new ArrayList<Xid>(); + for(DtxBranch branch : _branches.values()) + { + if(branch.getState() == DtxBranch.State.PREPARED) + { + inDoubt.add(branch.getXid()); + } + } + return inDoubt; + } + + public synchronized void endAssociations(AMQSessionModel session) + { + for(DtxBranch branch : _branches.values()) + { + if(branch.isAssociated(session)) + { + branch.setState(DtxBranch.State.ROLLBACK_ONLY); + branch.disassociateSession(session); + } + } + } + + + public synchronized void close() + { + for(DtxBranch branch : _branches.values()) + { + branch.close(); + } + _branches.clear(); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/IncorrectDtxStateException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/IncorrectDtxStateException.java new file mode 100644 index 0000000000..45f094e4b9 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/IncorrectDtxStateException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +import org.apache.qpid.transport.Xid; + +public class IncorrectDtxStateException extends DtxException +{ + public IncorrectDtxStateException(String message, Xid id) + { + super(message + " (xid: " + id + ")"); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JoinAndResumeDtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JoinAndResumeDtxException.java new file mode 100644 index 0000000000..a25e5a4ed6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JoinAndResumeDtxException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +import org.apache.qpid.transport.Xid; + +public class JoinAndResumeDtxException extends DtxException +{ + public JoinAndResumeDtxException(Xid id) + { + super("Cannot start a branch with both join and resume set " + id); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NotAssociatedDtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NotAssociatedDtxException.java new file mode 100644 index 0000000000..de070546a7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NotAssociatedDtxException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +import org.apache.qpid.transport.Xid; + +public class NotAssociatedDtxException extends DtxException +{ + public NotAssociatedDtxException(Xid id) + { + super("Xid " + id + " not associated with the current session"); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/RollbackOnlyDtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/RollbackOnlyDtxException.java new file mode 100644 index 0000000000..6cf12d8631 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/RollbackOnlyDtxException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +import org.apache.qpid.transport.Xid; + +public class RollbackOnlyDtxException extends DtxException +{ + public RollbackOnlyDtxException(Xid id) + { + super("Transaction " + id + " may only be rolled back"); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/SuspendAndFailDtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/SuspendAndFailDtxException.java new file mode 100644 index 0000000000..228844fd63 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/SuspendAndFailDtxException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +import org.apache.qpid.transport.Xid; + +public class SuspendAndFailDtxException extends DtxException +{ +public SuspendAndFailDtxException(Xid id) +{ + super("Cannot end a branch with both suspend and fail set " + id); +} +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TimeoutDtxException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TimeoutDtxException.java new file mode 100644 index 0000000000..50f7708d8a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TimeoutDtxException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +import org.apache.qpid.transport.Xid; + +public class TimeoutDtxException extends DtxException +{ + public TimeoutDtxException(Xid id) + { + super("Transaction " + id + " has timed-out and may only be rolled back"); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/UnknownDtxBranchException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/UnknownDtxBranchException.java new file mode 100644 index 0000000000..c23e518365 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/UnknownDtxBranchException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.txn; + +import org.apache.qpid.transport.Xid; + +public class UnknownDtxBranchException extends DtxException +{ + public UnknownDtxBranchException(Xid id) + { + super("Unknown xid " + id); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index afded3416e..cc573c0100 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; import org.apache.qpid.common.Closeable; import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.configuration.ConfigStore; @@ -37,9 +40,7 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; - -import java.util.Map; -import java.util.UUID; +import org.apache.qpid.server.txn.DtxRegistry; public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer { @@ -96,5 +97,9 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo ConfigStore getConfigStore(); + DtxRegistry getDtxRegistry(); + void removeBrokerConnection(BrokerLink brokerLink); + + ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 266d23af97..2202efb5e3 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -20,9 +20,20 @@ */ package org.apache.qpid.server.virtualhost; -import org.apache.log4j.Logger; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.BindingFactory; @@ -32,29 +43,27 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.txn.DtxBranch; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.Xid; +import org.apache.qpid.transport.util.Functions; import org.apache.qpid.util.ByteBufferInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.UUID; - public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler, ConfigurationRecoveryHandler.QueueRecoveryHandler, ConfigurationRecoveryHandler.ExchangeRecoveryHandler, @@ -63,7 +72,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa MessageStoreRecoveryHandler, MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, TransactionLogRecoveryHandler, - TransactionLogRecoveryHandler.QueueEntryRecoveryHandler + TransactionLogRecoveryHandler.QueueEntryRecoveryHandler, + TransactionLogRecoveryHandler.DtxRecordRecoveryHandler { private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class); @@ -76,7 +86,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa private MessageStore _store; private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); - private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>(); + private Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>(); private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>(); @@ -158,7 +168,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void message(StoredMessage message) { - ServerMessage serverMessage; + AbstractServerMessageImpl serverMessage; switch(message.getMetaData().getType()) { case META_DATA_0_8: @@ -193,6 +203,164 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { } + public void dtxRecord(long format, byte[] globalId, byte[] branchId, + MessageStore.Transaction.Record[] enqueues, + MessageStore.Transaction.Record[] dequeues) + { + Xid id = new Xid(format, globalId, branchId); + DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry(); + DtxBranch branch = dtxRegistry.getBranch(id); + if(branch == null) + { + branch = new DtxBranch(id, _store, _virtualHost); + dtxRegistry.registerBranch(branch); + } + for(MessageStore.Transaction.Record record : enqueues) + { + final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); + if(queue != null) + { + final long messageId = record.getMessage().getMessageNumber(); + final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + message.incrementReference(); + + branch.enqueue(queue,message); + + branch.addPostTransactionAcion(new ServerTransaction.Action() + { + + public void postCommit() + { + try + { + + queue.enqueue(message, true, null); + message.decrementReference(); + } + catch (AMQException e) + { + _logger.error("Unable to enqueue message " + message.getMessageNumber() + " into " + + "queue " + queue.getName() + " (from XA transaction)", e); + throw new RuntimeException(e); + } + } + + public void onRollback() + { + message.decrementReference(); + } + }); + } + else + { + StringBuilder xidString = xidAsString(id); + String messageNumberString = String.valueOf(message.getMessageNumber()); + CurrentActor.get().message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + messageNumberString)); + + } + + } + else + { + StringBuilder xidString = xidAsString(id); + CurrentActor.get().message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + queue.getName())); + + } + } + for(MessageStore.Transaction.Record record : dequeues) + { + final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); + if(queue != null) + { + final long messageId = record.getMessage().getMessageNumber(); + final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + final QueueEntry entry = queue.getMessageOnTheQueue(messageId); + + entry.acquire(); + + branch.dequeue(queue, message); + + branch.addPostTransactionAcion(new ServerTransaction.Action() + { + + public void postCommit() + { + entry.discard(); + } + + public void onRollback() + { + entry.release(); + } + }); + } + else + { + StringBuilder xidString = xidAsString(id); + String messageNumberString = String.valueOf(message.getMessageNumber()); + CurrentActor.get().message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + messageNumberString)); + + } + + } + else + { + StringBuilder xidString = xidAsString(id); + CurrentActor.get().message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + queue.getName())); + } + + } + + try + { + branch.setState(DtxBranch.State.PREPARED); + branch.prePrepareTransaction(); + } + catch (AMQStoreException e) + { + _logger.error("Unexpected database exception when attempting to prepare a recovered XA transaction " + + xidAsString(id), e); + throw new RuntimeException(e); + } + } + + private static StringBuilder xidAsString(Xid id) + { + return new StringBuilder("(") + .append(id.getFormat()) + .append(',') + .append(Functions.str(id.getGlobalId())) + .append(',') + .append(Functions.str(id.getBranchId())) + .append(')'); + } + + public void completeDtxRecordRecovery() + { + for(StoredMessage m : _unusedMessages.values()) + { + _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); + m.remove(); + } + CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); + } + private static final class ProcessAction { private final AMQQueue _queue; @@ -349,15 +517,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } - public void completeQueueEntryRecovery() + public DtxRecordRecoveryHandler completeQueueEntryRecovery() { - for(StoredMessage m : _unusedMessages.values()) - { - _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); - m.remove(); - } - for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet()) { CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); @@ -365,7 +527,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); } - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); + + + return this; } private static class DummyMessage implements EnqueableMessage diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 51b60f7980..061f06f1d0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.concurrent.ScheduledFuture; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; @@ -66,6 +67,7 @@ import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; @@ -95,6 +97,8 @@ public class VirtualHostImpl implements VirtualHost private MessageStore _messageStore; + private DtxRegistry _dtxRegistry; + private VirtualHostMBean _virtualHostMBean; private AMQBrokerManagerMBean _brokerMBean; @@ -118,6 +122,7 @@ public class VirtualHostImpl implements VirtualHost private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>(); private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; + public IConnectionRegistry getConnectionRegistry() { return _connectionRegistry; @@ -188,6 +193,7 @@ public class VirtualHostImpl implements VirtualHost _broker = _appRegistry.getBroker(); _configuration = hostConfig; _name = _configuration.getName(); + _dtxRegistry = new DtxRegistry(); _id = _appRegistry.getConfigStore().createId(); @@ -351,6 +357,11 @@ public class VirtualHostImpl implements VirtualHost TimeUnit.MILLISECONDS); } + public ScheduledFuture<?> scheduleTask(long delay, Runnable task) + { + return _houseKeepingTasks.schedule(task, delay, TimeUnit.MILLISECONDS); + } + public long getHouseKeepingTaskCount() { return _houseKeepingTasks.getTaskCount(); @@ -617,6 +628,11 @@ public class VirtualHostImpl implements VirtualHost } } + if(_dtxRegistry != null) + { + _dtxRegistry.close(); + } + //Close MessageStore if (_messageStore != null) { @@ -784,6 +800,11 @@ public class VirtualHostImpl implements VirtualHost return getApplicationRegistry().getConfigStore(); } + public DtxRegistry getDtxRegistry() + { + return _dtxRegistry; + } + /** * Temporary Startup RT class to record the creation of persistent queues / exchanges. * diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 88b9acffe0..09d865cb05 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -146,6 +146,14 @@ public class SkeletonMessageStore implements MessageStore { } + + public void removeXid(long format, byte[] globalId, byte[] branchId) + { + } + + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + } }; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index e409734a17..104e06d29a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -100,6 +100,14 @@ public class TestableMemoryMessageStore extends MemoryMessageStore public void abortTran() throws AMQStoreException { } + + public void removeXid(long format, byte[] globalId, byte[] branchId) + { + } + + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index fa0bb5be8b..801549e561 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -112,6 +112,14 @@ class MockStoreTransaction implements Transaction _state = TransactionState.ABORTED; } + public void removeXid(long format, byte[] globalId, byte[] branchId) + { + } + + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + } + public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction) { return new MessageStore() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 5fe664acdd..af742532e2 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.concurrent.ScheduledFuture; import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.configuration.BrokerConfig; import org.apache.qpid.server.configuration.ConfigStore; @@ -39,6 +40,7 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.DtxRegistry; import java.util.Map; import java.util.UUID; @@ -94,6 +96,11 @@ public class MockVirtualHost implements VirtualHost return null; } + public DtxRegistry getDtxRegistry() + { + return null; + } + public VirtualHostConfiguration getConfiguration() { return null; @@ -170,6 +177,11 @@ public class MockVirtualHost implements VirtualHost } + public ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask) + { + return null; + } + public void scheduleHouseKeepingTask(long period, HouseKeepingTask task) { |
