diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-20 20:27:59 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-20 20:27:59 +0000 |
| commit | bcc3eec81fed2e1f0acd0019e055d7f47303696b (patch) | |
| tree | 9b6c6af1be3cfe3dc31f55e8c01a1db20d3cac01 /java | |
| parent | 1a08c4097f7e18251f21ed0ea2a7660b24dface6 (diff) | |
| download | qpid-python-bcc3eec81fed2e1f0acd0019e055d7f47303696b.tar.gz | |
QPID-3774 : allow out of order completion of persistent enqueues / dequeues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1234111 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
10 files changed, 627 insertions, 101 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 2e2d2f0b11..045fe3b1f2 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -30,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -94,6 +95,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger _log = Logger.getLogger(BDBMessageStore.class); + private static final int LOCK_RETRY_ATTEMPTS = 5; + static final int DATABASE_FORMAT_VERSION = 5; private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version"; public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; @@ -893,91 +896,161 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { // _log.debug("public void removeMessage(Long messageId = " + messageId): called"); - + boolean complete = false; com.sleepycat.je.Transaction tx = null; Cursor cursor = null; + Random rand = null; + int attempts = 0; try { - tx = _environment.beginTransaction(null, null); + do + { + tx = null; + cursor = null; + try + { + tx = _environment.beginTransaction(null, null); - //remove the message meta data from the store - DatabaseEntry key = new DatabaseEntry(); - LongBinding.longToEntry(messageId, key); + //remove the message meta data from the store + DatabaseEntry key = new DatabaseEntry(); + LongBinding.longToEntry(messageId, key); - if (_log.isDebugEnabled()) - { - _log.debug("Removing message id " + messageId); - } + if (_log.isDebugEnabled()) + { + _log.debug("Removing message id " + messageId); + } - OperationStatus status = _messageMetaDataDb.delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " + - messageId); - } + OperationStatus status = _messageMetaDataDb.delete(tx, key); + if (status == OperationStatus.NOTFOUND) + { + _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " + + messageId); + } - if (_log.isDebugEnabled()) - { - _log.debug("Deleted metadata for message " + messageId); - } + if (_log.isDebugEnabled()) + { + _log.debug("Deleted metadata for message " + messageId); + } - //now remove the content data from the store if there is any. + //now remove the content data from the store if there is any. - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - MessageContentKey_5 mck = new MessageContentKey_5(messageId,0); + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + MessageContentKey_5 mck = new MessageContentKey_5(messageId,0); - TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); - contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); + TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); + contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); - //Use a partial record for the value to prevent retrieving the - //data itself as we only need the key to identify what to remove. - DatabaseEntry value = new DatabaseEntry(); - value.setPartial(0, 0, true); + //Use a partial record for the value to prevent retrieving the + //data itself as we only need the key to identify what to remove. + DatabaseEntry value = new DatabaseEntry(); + value.setPartial(0, 0, true); - cursor = _messageContentDb.openCursor(tx, null); + cursor = _messageContentDb.openCursor(tx, null); - status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW); - while (status == OperationStatus.SUCCESS) - { - mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); + status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW); + while (status == OperationStatus.SUCCESS) + { + mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); - if(mck.getMessageId() != messageId) - { - //we have exhausted all chunks for this message id, break - break; + if(mck.getMessageId() != messageId) + { + //we have exhausted all chunks for this message id, break + break; + } + else + { + status = cursor.delete(); + + if(status == OperationStatus.NOTFOUND) + { + cursor.close(); + cursor = null; + + tx.abort(); + throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId); + } + + if (_log.isDebugEnabled()) + { + _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId); + } + } + + status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); + } + + cursor.close(); + + cursor = null; + + commit(tx, sync); + complete = true; } - else + catch (LockConflictException e) { - status = cursor.delete(); + try + { + if(cursor != null) + { + cursor.close(); + } + } + catch(DatabaseException e1) + { + _log.warn("Unable to close cursor after LockConflictException", e1); + // rethrow the original log conflict exception, the secondary exception should already have + // been logged. + throw e; + } + try + { + if(tx != null) + { + tx.abort(); + } + } + catch(DatabaseException e2) + { + _log.warn("Unable to abort transaction after LockConflictExcption", e2); + // rethrow the original log conflict exception, the secondary exception should already have + // been logged. + throw e; + } - if(status == OperationStatus.NOTFOUND) + + _log.warn("Lock timeout exception. Retrying (attempt " + + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e); + + if(++attempts < LOCK_RETRY_ATTEMPTS) { - cursor.close(); - cursor = null; + if(rand == null) + { + rand = new Random(); + } - tx.abort(); - throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId); - } + try + { + Thread.sleep(500l + (long)(500l * rand.nextDouble())); + } + catch (InterruptedException e1) + { - if (_log.isDebugEnabled()) + } + } + else { - _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId); + // rethrow the lock conflict exception since we could not solve by retrying + throw e; } } - - status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); } - - cursor.close(); - cursor = null; - - commit(tx, sync); + while(!complete); } catch (DatabaseException e) { - e.printStackTrace(); + _log.error("Unexpected BDB exception", e); if (tx != null) { @@ -1009,7 +1082,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } catch (DatabaseException e) { - throw new AMQStoreException("Error closing database connection: " + e.getMessage(), e); + throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e); } } } @@ -2073,7 +2146,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { // RHM-7 Periodically wake up and check, just in case we // missed a notification. Don't want to lock the broker hard. - _lock.wait(250); + _lock.wait(1000); } catch (InterruptedException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 7d06dd2c22..092fde0bc2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -27,6 +27,7 @@ import java.security.Principal; import java.text.MessageFormat; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -63,7 +64,7 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription_0_10; -import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -84,18 +85,20 @@ import org.apache.qpid.transport.SessionDelegate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject +public class ServerSession extends Session + implements AuthorizationHolder, SessionConfig, + AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder { private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30; + private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; private final UUID _id; private ConnectionConfig _connectionConfig; private long _createTime = System.currentTimeMillis(); private LogActor _actor = GenericActor.getInstance(this); - private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction(); private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); @@ -147,7 +150,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { super(connection, delegate, name, expiry); _connectionConfig = connConfig; - _transaction = new AutoCommitTransaction(this.getMessageStore()); + _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this); _logSubject = new ChannelLogSubject(this); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); @@ -184,16 +187,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); } getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); - PostEnqueueAction postTransactionAction; - if(isTransactional()) - { - postTransactionAction = new PostEnqueueAction(queues, message) ; - } - else - { - postTransactionAction = _postEnqueueAction; - postTransactionAction.setState(queues, message); - } + PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ; _transaction.enqueue(queues,message, postTransactionAction, 0L); incrementOutstandingTxnsIfNecessary(); updateTransactionalActivity(); @@ -221,12 +215,12 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void accept(RangeSet ranges) { dispositionChange(ranges, new MessageDispositionAction() - { - public void performAction(MessageDispositionChangeListener listener) - { - listener.onAccept(); - } - }); + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onAccept(); + } + }); } @@ -444,10 +438,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public boolean isTransactional() { - // this does not look great but there should only be one "non-transactional" - // transactional context, while there could be several transactional ones in - // theory - return !(_transaction instanceof AutoCommitTransaction); + return _transaction.isTransactional(); } public boolean inTransaction() @@ -765,6 +756,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { subscription_0_10.flushCreditState(false); } + awaitCommandCompletion(); } private class PostEnqueueAction implements ServerTransaction.Action @@ -774,17 +766,12 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private ServerMessage _message; private final boolean _transactional; - public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message) + public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional) { - _transactional = true; + _transactional = transactional; setState(queues, message); } - public PostEnqueueAction() - { - _transactional = false; - } - public void setState(List<? extends BaseQueue> queues, ServerMessage message) { _message = message; @@ -830,4 +817,76 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { return _blocking.get(); } + + private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>(); + + public void completeAsyncCommands() + { + AsyncCommand cmd; + while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) + { + cmd.complete(); + _unfinishedCommandsQueue.poll(); + } + while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) + { + cmd = _unfinishedCommandsQueue.poll(); + cmd.awaitReadyForCompletion(); + cmd.complete(); + } + } + + + public void awaitCommandCompletion() + { + AsyncCommand cmd; + while((cmd = _unfinishedCommandsQueue.poll()) != null) + { + cmd.awaitReadyForCompletion(); + cmd.complete(); + } + } + + + public Object getAsyncCommandMark() + { + return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast(); + } + + public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + { + _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); + } + + private static class AsyncCommand + { + private final MessageStore.StoreFuture _future; + private ServerTransaction.Action _action; + + public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + { + _future = future; + _action = action; + } + + void awaitReadyForCompletion() + { + _future.waitForCompletion(); + } + + void complete() + { + if(!_future.isComplete()) + { + _future.waitForCompletion(); + } + _action.postCommit(); + _action = null; + } + + boolean isReadyForCompletion() + { + return _future.isComplete(); + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index b6e142a5fd..2eab65cf8a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.transport; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -55,6 +54,7 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.Subscription_0_10; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; @@ -81,9 +81,22 @@ public class ServerSessionDelegate extends SessionDelegate if(!session.isClosing()) { - super.command(session, method); + Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark(); + super.command(session, method, false); + Object newOutstanding = ((ServerSession)session).getAsyncCommandMark(); + if(newOutstanding == null || newOutstanding == asyncCommandMark) + { + session.processed(method); + } + + if(newOutstanding != null) + { + ((ServerSession)session).completeAsyncCommands(); + } + if (method.isSync()) { + ((ServerSession)session).awaitCommandCompletion(); session.flushProcessed(); } } @@ -98,7 +111,13 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageAccept(Session session, MessageAccept method) { - ((ServerSession)session).accept(method.getTransfers()); + final ServerSession serverSession = (ServerSession) session; + serverSession.accept(method.getTransfers()); + if(!serverSession.isTransactional()) + { + serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, + new CommandProcessedAction(serverSession, method)); + } } @Override @@ -252,7 +271,7 @@ public class ServerSessionDelegate extends SessionDelegate } @Override - public void messageTransfer(Session ssn, MessageTransfer xfr) + public void messageTransfer(Session ssn, final MessageTransfer xfr) { final Exchange exchange = getExchangeForMessage(ssn, xfr); @@ -294,12 +313,13 @@ public class ServerSessionDelegate extends SessionDelegate exchangeInUse = exchange; } + final ServerSession serverSession = (ServerSession) ssn; if(!queues.isEmpty()) { final MessageStore store = getVirtualHost(ssn).getMessageStore(); final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); - MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference()); - ((ServerSession) ssn).enqueue(message, queues); + MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); + serverSession.enqueue(message, queues); storeMessage.flushToStore(); } else @@ -313,13 +333,19 @@ public class ServerSessionDelegate extends SessionDelegate } else { - ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey())); + serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey())); } } - - ssn.processed(xfr); + if(serverSession.isTransactional()) + { + serverSession.processed(xfr); + } + else + { + serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr)); + } } private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr, @@ -404,6 +430,13 @@ public class ServerSessionDelegate extends SessionDelegate @Override + public void executionSync(final Session ssn, final ExecutionSync sync) + { + ((ServerSession)ssn).awaitCommandCompletion(); + super.executionSync(ssn, sync); + } + + @Override public void exchangeDeclare(Session session, ExchangeDeclare method) { String exchangeName = method.getExchange(); @@ -1269,4 +1302,25 @@ public class ServerSessionDelegate extends SessionDelegate final ServerConnection scon = (ServerConnection) session.getConnection(); SecurityManager.setThreadSubject(scon.getAuthorizedSubject()); } + + private static class CommandProcessedAction implements ServerTransaction.Action + { + private final ServerSession _serverSession; + private final Method _method; + + public CommandProcessedAction(final ServerSession serverSession, final Method xfr) + { + _serverSession = serverSession; + _method = xfr; + } + + public void postCommit() + { + _serverSession.processed(_method); + } + + public void onRollback() + { + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java new file mode 100755 index 0000000000..11439b3c8a --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -0,0 +1,316 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStore.StoreFuture; + +import java.util.Collection; +import java.util.List; + +/** + * An implementation of ServerTransaction where each enqueue/dequeue + * operation takes place within it own transaction. + * + * Since there is no long-lived transaction, the commit and rollback methods of + * this implementation are empty. + */ +public class AsyncAutoCommitTransaction implements ServerTransaction +{ + protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class); + + private final MessageStore _messageStore; + private final FutureRecorder _futureRecorder; + + public static interface FutureRecorder + { + public void recordFuture(StoreFuture future, Action action); + + } + + public AsyncAutoCommitTransaction(MessageStore transactionLog, FutureRecorder recorder) + { + _messageStore = transactionLog; + _futureRecorder = recorder; + } + + public long getTransactionStartTime() + { + return 0L; + } + + /** + * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered + * by the caller are executed immediately. + */ + public void addPostTransactionAction(final Action immediateAction) + { + addFuture(MessageStore.IMMEDIATE_FUTURE, immediateAction); + + } + + public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) + { + MessageStore.Transaction txn = null; + try + { + MessageStore.StoreFuture future; + if(message.isPersistent() && queue.isDurable()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); + } + + txn = _messageStore.newTransaction(); + txn.dequeueMessage(queue, message); + future = txn.commitTranAsync(); + + txn = null; + } + else + { + future = MessageStore.IMMEDIATE_FUTURE; + } + addFuture(future, postTransactionAction); + postTransactionAction = null; + } + catch (AMQException e) + { + _logger.error("Error during message dequeue", e); + throw new RuntimeException("Error during message dequeue", e); + } + finally + { + rollbackIfNecessary(postTransactionAction, txn); + } + + } + + private void addFuture(final MessageStore.StoreFuture future, final Action action) + { + _futureRecorder.recordFuture(future, action); + } + + public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) + { + MessageStore.Transaction txn = null; + try + { + for(QueueEntry entry : queueEntries) + { + ServerMessage message = entry.getMessage(); + BaseQueue queue = entry.getQueue(); + + if(message.isPersistent() && queue.isDurable()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); + } + + if(txn == null) + { + txn = _messageStore.newTransaction(); + } + + txn.dequeueMessage(queue, message); + } + + } + MessageStore.StoreFuture future; + if(txn != null) + { + future = txn.commitTranAsync(); + txn = null; + } + else + { + future = MessageStore.IMMEDIATE_FUTURE; + } + addFuture(future, postTransactionAction); + postTransactionAction = null; + } + catch (AMQException e) + { + _logger.error("Error during message dequeues", e); + throw new RuntimeException("Error during message dequeues", e); + } + finally + { + rollbackIfNecessary(postTransactionAction, txn); + } + + } + + + public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) + { + MessageStore.Transaction txn = null; + try + { + MessageStore.StoreFuture future; + if(message.isPersistent() && queue.isDurable()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); + } + + txn = _messageStore.newTransaction(); + txn.enqueueMessage(queue, message); + future = txn.commitTranAsync(); + txn = null; + } + else + { + future = MessageStore.IMMEDIATE_FUTURE; + } + addFuture(future, postTransactionAction); + postTransactionAction = null; + } + catch (AMQException e) + { + _logger.error("Error during message enqueue", e); + throw new RuntimeException("Error during message enqueue", e); + } + finally + { + rollbackIfNecessary(postTransactionAction, txn); + } + + + } + + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) + { + MessageStore.Transaction txn = null; + try + { + + if(message.isPersistent()) + { + for(BaseQueue queue : queues) + { + if (queue.isDurable()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); + } + if (txn == null) + { + txn = _messageStore.newTransaction(); + } + + txn.enqueueMessage(queue, message); + + + } + } + + } + MessageStore.StoreFuture future; + if (txn != null) + { + future = txn.commitTranAsync(); + txn = null; + } + else + { + future = MessageStore.IMMEDIATE_FUTURE; + } + addFuture(future, postTransactionAction); + postTransactionAction = null; + + + } + catch (AMQException e) + { + _logger.error("Error during message enqueues", e); + throw new RuntimeException("Error during message enqueues", e); + } + finally + { + rollbackIfNecessary(postTransactionAction, txn); + } + + } + + + public void commit(final Runnable immediatePostTransactionAction) + { + addFuture(MessageStore.IMMEDIATE_FUTURE, new Action() + { + public void postCommit() + { + immediatePostTransactionAction.run(); + } + + public void onRollback() + { + } + }); + } + + public void commit() + { + } + + public void rollback() + { + } + + public boolean isTransactional() + { + return false; + } + + private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn) + { + if (txn != null) + { + try + { + txn.abortTran(); + } + catch (AMQStoreException e) + { + _logger.error("Abort transaction failed", e); + // Deliberate decision not to re-throw this exception. Rationale: we can only reach here if a previous + // TransactionLog method has ended in Exception. If we were to re-throw here, we would prevent + // our caller from receiving the original exception (which is likely to be more revealing of the underlying error). + } + } + if (postTransactionAction != null) + { + postTransactionAction.onRollback(); + } + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index a67d4badd1..ad2a299108 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -242,6 +242,11 @@ public class AutoCommitTransaction implements ServerTransaction { } + public boolean isTransactional() + { + return false; + } + private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn) { if (txn != null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 7f5b5fb8b2..34bac0411e 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -309,7 +309,12 @@ public class LocalTransaction implements ServerTransaction private void resetDetails() { _transaction = null; - _postTransactionActions.clear(); + _postTransactionActions.clear(); _txnStartTime = 0L; } + + public boolean isTransactional() + { + return true; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index 64fdc0ba9a..77efb21ddb 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -43,6 +43,8 @@ import java.util.List; */ public interface ServerTransaction { + + /** * Represents an action to be performed on transaction commit or rollback */ @@ -110,4 +112,6 @@ public interface ServerTransaction * be executed immediately after the underlying transaction has rolled-back. */ void rollback(); + + boolean isTransactional(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index 028b912ba1..cabff1cd13 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -45,10 +45,15 @@ public class SessionDelegate method.dispatch(ssn, this); } - public void command(Session ssn, Method method) { + public void command(Session ssn, Method method) + { + command(ssn, method, !method.hasPayload()); + } + public void command(Session ssn, Method method, boolean processed) + { ssn.identify(method); method.dispatch(ssn, this); - if (!method.hasPayload()) + if (processed) { ssn.processed(method); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java index 12039caf25..e6461c8267 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java @@ -70,6 +70,11 @@ public class AcknowledgeTest extends QpidBrokerTestCase // These should all end up being prefetched by session sendMessage(_consumerSession, _queue, 1); + if(!transacted) + { + ((AMQSession)_consumerSession).sync(); + } + assertEquals("Wrong number of messages on queue", 1, ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index d3d9cf2984..e948aaffb3 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -721,7 +721,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); msg.setBooleanProperty("Match", false); producer.send(msg); - + ((AMQSession)session).sync(); // should be 1 or 2 messages on queue now // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose"); |
