summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-20 20:27:59 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-20 20:27:59 +0000
commitbcc3eec81fed2e1f0acd0019e055d7f47303696b (patch)
tree9b6c6af1be3cfe3dc31f55e8c01a1db20d3cac01 /java
parent1a08c4097f7e18251f21ed0ea2a7660b24dface6 (diff)
downloadqpid-python-bcc3eec81fed2e1f0acd0019e055d7f47303696b.tar.gz
QPID-3774 : allow out of order completion of persistent enqueues / dequeues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1234111 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java187
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java121
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java72
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java316
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java5
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java7
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java9
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java2
10 files changed, 627 insertions, 101 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 2e2d2f0b11..045fe3b1f2 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -30,6 +30,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -94,6 +95,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
+ private static final int LOCK_RETRY_ATTEMPTS = 5;
+
static final int DATABASE_FORMAT_VERSION = 5;
private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
@@ -893,91 +896,161 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
// _log.debug("public void removeMessage(Long messageId = " + messageId): called");
-
+ boolean complete = false;
com.sleepycat.je.Transaction tx = null;
Cursor cursor = null;
+ Random rand = null;
+ int attempts = 0;
try
{
- tx = _environment.beginTransaction(null, null);
+ do
+ {
+ tx = null;
+ cursor = null;
+ try
+ {
+ tx = _environment.beginTransaction(null, null);
- //remove the message meta data from the store
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
+ //remove the message meta data from the store
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
- if (_log.isDebugEnabled())
- {
- _log.debug("Removing message id " + messageId);
- }
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Removing message id " + messageId);
+ }
- OperationStatus status = _messageMetaDataDb.delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
- messageId);
- }
+ OperationStatus status = _messageMetaDataDb.delete(tx, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
+ messageId);
+ }
- if (_log.isDebugEnabled())
- {
- _log.debug("Deleted metadata for message " + messageId);
- }
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Deleted metadata for message " + messageId);
+ }
- //now remove the content data from the store if there is any.
+ //now remove the content data from the store if there is any.
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- MessageContentKey_5 mck = new MessageContentKey_5(messageId,0);
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ MessageContentKey_5 mck = new MessageContentKey_5(messageId,0);
- TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
- contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+ TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
+ contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
- //Use a partial record for the value to prevent retrieving the
- //data itself as we only need the key to identify what to remove.
- DatabaseEntry value = new DatabaseEntry();
- value.setPartial(0, 0, true);
+ //Use a partial record for the value to prevent retrieving the
+ //data itself as we only need the key to identify what to remove.
+ DatabaseEntry value = new DatabaseEntry();
+ value.setPartial(0, 0, true);
- cursor = _messageContentDb.openCursor(tx, null);
+ cursor = _messageContentDb.openCursor(tx, null);
- status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
- while (status == OperationStatus.SUCCESS)
- {
- mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
+ status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
+ while (status == OperationStatus.SUCCESS)
+ {
+ mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
- if(mck.getMessageId() != messageId)
- {
- //we have exhausted all chunks for this message id, break
- break;
+ if(mck.getMessageId() != messageId)
+ {
+ //we have exhausted all chunks for this message id, break
+ break;
+ }
+ else
+ {
+ status = cursor.delete();
+
+ if(status == OperationStatus.NOTFOUND)
+ {
+ cursor.close();
+ cursor = null;
+
+ tx.abort();
+ throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
+ }
+ }
+
+ status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
+ }
+
+ cursor.close();
+
+ cursor = null;
+
+ commit(tx, sync);
+ complete = true;
}
- else
+ catch (LockConflictException e)
{
- status = cursor.delete();
+ try
+ {
+ if(cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ catch(DatabaseException e1)
+ {
+ _log.warn("Unable to close cursor after LockConflictException", e1);
+ // rethrow the original log conflict exception, the secondary exception should already have
+ // been logged.
+ throw e;
+ }
+ try
+ {
+ if(tx != null)
+ {
+ tx.abort();
+ }
+ }
+ catch(DatabaseException e2)
+ {
+ _log.warn("Unable to abort transaction after LockConflictExcption", e2);
+ // rethrow the original log conflict exception, the secondary exception should already have
+ // been logged.
+ throw e;
+ }
- if(status == OperationStatus.NOTFOUND)
+
+ _log.warn("Lock timeout exception. Retrying (attempt "
+ + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +") " + e);
+
+ if(++attempts < LOCK_RETRY_ATTEMPTS)
{
- cursor.close();
- cursor = null;
+ if(rand == null)
+ {
+ rand = new Random();
+ }
- tx.abort();
- throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
- }
+ try
+ {
+ Thread.sleep(500l + (long)(500l * rand.nextDouble()));
+ }
+ catch (InterruptedException e1)
+ {
- if (_log.isDebugEnabled())
+ }
+ }
+ else
{
- _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
+ // rethrow the lock conflict exception since we could not solve by retrying
+ throw e;
}
}
-
- status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
-
- cursor.close();
- cursor = null;
-
- commit(tx, sync);
+ while(!complete);
}
catch (DatabaseException e)
{
- e.printStackTrace();
+ _log.error("Unexpected BDB exception", e);
if (tx != null)
{
@@ -1009,7 +1082,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
catch (DatabaseException e)
{
- throw new AMQStoreException("Error closing database connection: " + e.getMessage(), e);
+ throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e);
}
}
}
@@ -2073,7 +2146,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
// RHM-7 Periodically wake up and check, just in case we
// missed a notification. Don't want to lock the broker hard.
- _lock.wait(250);
+ _lock.wait(1000);
}
catch (InterruptedException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 7d06dd2c22..092fde0bc2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -27,6 +27,7 @@ import java.security.Principal;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
@@ -63,7 +64,7 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription_0_10;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -84,18 +85,20 @@ import org.apache.qpid.transport.SessionDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject
+public class ServerSession extends Session
+ implements AuthorizationHolder, SessionConfig,
+ AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
+ private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
private final UUID _id;
private ConnectionConfig _connectionConfig;
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
- private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
@@ -147,7 +150,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
super(connection, delegate, name, expiry);
_connectionConfig = connConfig;
- _transaction = new AutoCommitTransaction(this.getMessageStore());
+ _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
_logSubject = new ChannelLogSubject(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
@@ -184,16 +187,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- PostEnqueueAction postTransactionAction;
- if(isTransactional())
- {
- postTransactionAction = new PostEnqueueAction(queues, message) ;
- }
- else
- {
- postTransactionAction = _postEnqueueAction;
- postTransactionAction.setState(queues, message);
- }
+ PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
_transaction.enqueue(queues,message, postTransactionAction, 0L);
incrementOutstandingTxnsIfNecessary();
updateTransactionalActivity();
@@ -221,12 +215,12 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public void accept(RangeSet ranges)
{
dispositionChange(ranges, new MessageDispositionAction()
- {
- public void performAction(MessageDispositionChangeListener listener)
- {
- listener.onAccept();
- }
- });
+ {
+ public void performAction(MessageDispositionChangeListener listener)
+ {
+ listener.onAccept();
+ }
+ });
}
@@ -444,10 +438,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public boolean isTransactional()
{
- // this does not look great but there should only be one "non-transactional"
- // transactional context, while there could be several transactional ones in
- // theory
- return !(_transaction instanceof AutoCommitTransaction);
+ return _transaction.isTransactional();
}
public boolean inTransaction()
@@ -765,6 +756,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
subscription_0_10.flushCreditState(false);
}
+ awaitCommandCompletion();
}
private class PostEnqueueAction implements ServerTransaction.Action
@@ -774,17 +766,12 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private ServerMessage _message;
private final boolean _transactional;
- public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message)
+ public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional)
{
- _transactional = true;
+ _transactional = transactional;
setState(queues, message);
}
- public PostEnqueueAction()
- {
- _transactional = false;
- }
-
public void setState(List<? extends BaseQueue> queues, ServerMessage message)
{
_message = message;
@@ -830,4 +817,76 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
return _blocking.get();
}
+
+ private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
+
+ public void completeAsyncCommands()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
+ {
+ cmd.complete();
+ _unfinishedCommandsQueue.poll();
+ }
+ while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
+ {
+ cmd = _unfinishedCommandsQueue.poll();
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+
+ public void awaitCommandCompletion()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.poll()) != null)
+ {
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+
+ public Object getAsyncCommandMark()
+ {
+ return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
+ }
+
+ public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ {
+ _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
+ }
+
+ private static class AsyncCommand
+ {
+ private final MessageStore.StoreFuture _future;
+ private ServerTransaction.Action _action;
+
+ public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ {
+ _future = future;
+ _action = action;
+ }
+
+ void awaitReadyForCompletion()
+ {
+ _future.waitForCompletion();
+ }
+
+ void complete()
+ {
+ if(!_future.isComplete())
+ {
+ _future.waitForCompletion();
+ }
+ _action.postCommit();
+ _action = null;
+ }
+
+ boolean isReadyForCompletion()
+ {
+ return _future.isComplete();
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index b6e142a5fd..2eab65cf8a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.transport;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -55,6 +54,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
@@ -81,9 +81,22 @@ public class ServerSessionDelegate extends SessionDelegate
if(!session.isClosing())
{
- super.command(session, method);
+ Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark();
+ super.command(session, method, false);
+ Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
+ if(newOutstanding == null || newOutstanding == asyncCommandMark)
+ {
+ session.processed(method);
+ }
+
+ if(newOutstanding != null)
+ {
+ ((ServerSession)session).completeAsyncCommands();
+ }
+
if (method.isSync())
{
+ ((ServerSession)session).awaitCommandCompletion();
session.flushProcessed();
}
}
@@ -98,7 +111,13 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageAccept(Session session, MessageAccept method)
{
- ((ServerSession)session).accept(method.getTransfers());
+ final ServerSession serverSession = (ServerSession) session;
+ serverSession.accept(method.getTransfers());
+ if(!serverSession.isTransactional())
+ {
+ serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE,
+ new CommandProcessedAction(serverSession, method));
+ }
}
@Override
@@ -252,7 +271,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
@Override
- public void messageTransfer(Session ssn, MessageTransfer xfr)
+ public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
final Exchange exchange = getExchangeForMessage(ssn, xfr);
@@ -294,12 +313,13 @@ public class ServerSessionDelegate extends SessionDelegate
exchangeInUse = exchange;
}
+ final ServerSession serverSession = (ServerSession) ssn;
if(!queues.isEmpty())
{
final MessageStore store = getVirtualHost(ssn).getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
- MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
- ((ServerSession) ssn).enqueue(message, queues);
+ MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
+ serverSession.enqueue(message, queues);
storeMessage.flushToStore();
}
else
@@ -313,13 +333,19 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
+ serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
}
}
-
- ssn.processed(xfr);
+ if(serverSession.isTransactional())
+ {
+ serverSession.processed(xfr);
+ }
+ else
+ {
+ serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
+ }
}
private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
@@ -404,6 +430,13 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
+ public void executionSync(final Session ssn, final ExecutionSync sync)
+ {
+ ((ServerSession)ssn).awaitCommandCompletion();
+ super.executionSync(ssn, sync);
+ }
+
+ @Override
public void exchangeDeclare(Session session, ExchangeDeclare method)
{
String exchangeName = method.getExchange();
@@ -1269,4 +1302,25 @@ public class ServerSessionDelegate extends SessionDelegate
final ServerConnection scon = (ServerConnection) session.getConnection();
SecurityManager.setThreadSubject(scon.getAuthorizedSubject());
}
+
+ private static class CommandProcessedAction implements ServerTransaction.Action
+ {
+ private final ServerSession _serverSession;
+ private final Method _method;
+
+ public CommandProcessedAction(final ServerSession serverSession, final Method xfr)
+ {
+ _serverSession = serverSession;
+ _method = xfr;
+ }
+
+ public void postCommit()
+ {
+ _serverSession.processed(_method);
+ }
+
+ public void onRollback()
+ {
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
new file mode 100755
index 0000000000..11439b3c8a
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -0,0 +1,316 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStore.StoreFuture;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * An implementation of ServerTransaction where each enqueue/dequeue
+ * operation takes place within it own transaction.
+ *
+ * Since there is no long-lived transaction, the commit and rollback methods of
+ * this implementation are empty.
+ */
+public class AsyncAutoCommitTransaction implements ServerTransaction
+{
+ protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class);
+
+ private final MessageStore _messageStore;
+ private final FutureRecorder _futureRecorder;
+
+ public static interface FutureRecorder
+ {
+ public void recordFuture(StoreFuture future, Action action);
+
+ }
+
+ public AsyncAutoCommitTransaction(MessageStore transactionLog, FutureRecorder recorder)
+ {
+ _messageStore = transactionLog;
+ _futureRecorder = recorder;
+ }
+
+ public long getTransactionStartTime()
+ {
+ return 0L;
+ }
+
+ /**
+ * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
+ * by the caller are executed immediately.
+ */
+ public void addPostTransactionAction(final Action immediateAction)
+ {
+ addFuture(MessageStore.IMMEDIATE_FUTURE, immediateAction);
+
+ }
+
+ public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
+ {
+ MessageStore.Transaction txn = null;
+ try
+ {
+ MessageStore.StoreFuture future;
+ if(message.isPersistent() && queue.isDurable())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ }
+
+ txn = _messageStore.newTransaction();
+ txn.dequeueMessage(queue, message);
+ future = txn.commitTranAsync();
+
+ txn = null;
+ }
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
+ postTransactionAction = null;
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error during message dequeue", e);
+ throw new RuntimeException("Error during message dequeue", e);
+ }
+ finally
+ {
+ rollbackIfNecessary(postTransactionAction, txn);
+ }
+
+ }
+
+ private void addFuture(final MessageStore.StoreFuture future, final Action action)
+ {
+ _futureRecorder.recordFuture(future, action);
+ }
+
+ public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+ {
+ MessageStore.Transaction txn = null;
+ try
+ {
+ for(QueueEntry entry : queueEntries)
+ {
+ ServerMessage message = entry.getMessage();
+ BaseQueue queue = entry.getQueue();
+
+ if(message.isPersistent() && queue.isDurable())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ }
+
+ if(txn == null)
+ {
+ txn = _messageStore.newTransaction();
+ }
+
+ txn.dequeueMessage(queue, message);
+ }
+
+ }
+ MessageStore.StoreFuture future;
+ if(txn != null)
+ {
+ future = txn.commitTranAsync();
+ txn = null;
+ }
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
+ postTransactionAction = null;
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error during message dequeues", e);
+ throw new RuntimeException("Error during message dequeues", e);
+ }
+ finally
+ {
+ rollbackIfNecessary(postTransactionAction, txn);
+ }
+
+ }
+
+
+ public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
+ {
+ MessageStore.Transaction txn = null;
+ try
+ {
+ MessageStore.StoreFuture future;
+ if(message.isPersistent() && queue.isDurable())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+ }
+
+ txn = _messageStore.newTransaction();
+ txn.enqueueMessage(queue, message);
+ future = txn.commitTranAsync();
+ txn = null;
+ }
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
+ postTransactionAction = null;
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error during message enqueue", e);
+ throw new RuntimeException("Error during message enqueue", e);
+ }
+ finally
+ {
+ rollbackIfNecessary(postTransactionAction, txn);
+ }
+
+
+ }
+
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ {
+ MessageStore.Transaction txn = null;
+ try
+ {
+
+ if(message.isPersistent())
+ {
+ for(BaseQueue queue : queues)
+ {
+ if (queue.isDurable())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+ }
+ if (txn == null)
+ {
+ txn = _messageStore.newTransaction();
+ }
+
+ txn.enqueueMessage(queue, message);
+
+
+ }
+ }
+
+ }
+ MessageStore.StoreFuture future;
+ if (txn != null)
+ {
+ future = txn.commitTranAsync();
+ txn = null;
+ }
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
+ postTransactionAction = null;
+
+
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error during message enqueues", e);
+ throw new RuntimeException("Error during message enqueues", e);
+ }
+ finally
+ {
+ rollbackIfNecessary(postTransactionAction, txn);
+ }
+
+ }
+
+
+ public void commit(final Runnable immediatePostTransactionAction)
+ {
+ addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
+ {
+ public void postCommit()
+ {
+ immediatePostTransactionAction.run();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
+
+ public void commit()
+ {
+ }
+
+ public void rollback()
+ {
+ }
+
+ public boolean isTransactional()
+ {
+ return false;
+ }
+
+ private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
+ {
+ if (txn != null)
+ {
+ try
+ {
+ txn.abortTran();
+ }
+ catch (AMQStoreException e)
+ {
+ _logger.error("Abort transaction failed", e);
+ // Deliberate decision not to re-throw this exception. Rationale: we can only reach here if a previous
+ // TransactionLog method has ended in Exception. If we were to re-throw here, we would prevent
+ // our caller from receiving the original exception (which is likely to be more revealing of the underlying error).
+ }
+ }
+ if (postTransactionAction != null)
+ {
+ postTransactionAction.onRollback();
+ }
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index a67d4badd1..ad2a299108 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -242,6 +242,11 @@ public class AutoCommitTransaction implements ServerTransaction
{
}
+ public boolean isTransactional()
+ {
+ return false;
+ }
+
private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
{
if (txn != null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 7f5b5fb8b2..34bac0411e 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -309,7 +309,12 @@ public class LocalTransaction implements ServerTransaction
private void resetDetails()
{
_transaction = null;
- _postTransactionActions.clear();
+ _postTransactionActions.clear();
_txnStartTime = 0L;
}
+
+ public boolean isTransactional()
+ {
+ return true;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
index 64fdc0ba9a..77efb21ddb 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
@@ -43,6 +43,8 @@ import java.util.List;
*/
public interface ServerTransaction
{
+
+
/**
* Represents an action to be performed on transaction commit or rollback
*/
@@ -110,4 +112,6 @@ public interface ServerTransaction
* be executed immediately after the underlying transaction has rolled-back.
*/
void rollback();
+
+ boolean isTransactional();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index 028b912ba1..cabff1cd13 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -45,10 +45,15 @@ public class SessionDelegate
method.dispatch(ssn, this);
}
- public void command(Session ssn, Method method) {
+ public void command(Session ssn, Method method)
+ {
+ command(ssn, method, !method.hasPayload());
+ }
+ public void command(Session ssn, Method method, boolean processed)
+ {
ssn.identify(method);
method.dispatch(ssn, this);
- if (!method.hasPayload())
+ if (processed)
{
ssn.processed(method);
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
index 12039caf25..e6461c8267 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
@@ -70,6 +70,11 @@ public class AcknowledgeTest extends QpidBrokerTestCase
// These should all end up being prefetched by session
sendMessage(_consumerSession, _queue, 1);
+ if(!transacted)
+ {
+ ((AMQSession)_consumerSession).sync();
+ }
+
assertEquals("Wrong number of messages on queue", 1,
((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index d3d9cf2984..e948aaffb3 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -721,7 +721,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
msg.setBooleanProperty("Match", false);
producer.send(msg);
-
+ ((AMQSession)session).sync();
// should be 1 or 2 messages on queue now
// (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only)
AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose");