summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-20 20:27:59 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-20 20:27:59 +0000
commitbcc3eec81fed2e1f0acd0019e055d7f47303696b (patch)
tree9b6c6af1be3cfe3dc31f55e8c01a1db20d3cac01 /java/broker
parent1a08c4097f7e18251f21ed0ea2a7660b24dface6 (diff)
downloadqpid-python-bcc3eec81fed2e1f0acd0019e055d7f47303696b.tar.gz
QPID-3774 : allow out of order completion of persistent enqueues / dequeues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1234111 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java121
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java72
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java316
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java5
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java7
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java4
6 files changed, 484 insertions, 41 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 7d06dd2c22..092fde0bc2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -27,6 +27,7 @@ import java.security.Principal;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
@@ -63,7 +64,7 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription_0_10;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -84,18 +85,20 @@ import org.apache.qpid.transport.SessionDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject
+public class ServerSession extends Session
+ implements AuthorizationHolder, SessionConfig,
+ AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
+ private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
private final UUID _id;
private ConnectionConfig _connectionConfig;
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
- private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
@@ -147,7 +150,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
super(connection, delegate, name, expiry);
_connectionConfig = connConfig;
- _transaction = new AutoCommitTransaction(this.getMessageStore());
+ _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
_logSubject = new ChannelLogSubject(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
@@ -184,16 +187,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- PostEnqueueAction postTransactionAction;
- if(isTransactional())
- {
- postTransactionAction = new PostEnqueueAction(queues, message) ;
- }
- else
- {
- postTransactionAction = _postEnqueueAction;
- postTransactionAction.setState(queues, message);
- }
+ PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
_transaction.enqueue(queues,message, postTransactionAction, 0L);
incrementOutstandingTxnsIfNecessary();
updateTransactionalActivity();
@@ -221,12 +215,12 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public void accept(RangeSet ranges)
{
dispositionChange(ranges, new MessageDispositionAction()
- {
- public void performAction(MessageDispositionChangeListener listener)
- {
- listener.onAccept();
- }
- });
+ {
+ public void performAction(MessageDispositionChangeListener listener)
+ {
+ listener.onAccept();
+ }
+ });
}
@@ -444,10 +438,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public boolean isTransactional()
{
- // this does not look great but there should only be one "non-transactional"
- // transactional context, while there could be several transactional ones in
- // theory
- return !(_transaction instanceof AutoCommitTransaction);
+ return _transaction.isTransactional();
}
public boolean inTransaction()
@@ -765,6 +756,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
subscription_0_10.flushCreditState(false);
}
+ awaitCommandCompletion();
}
private class PostEnqueueAction implements ServerTransaction.Action
@@ -774,17 +766,12 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private ServerMessage _message;
private final boolean _transactional;
- public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message)
+ public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional)
{
- _transactional = true;
+ _transactional = transactional;
setState(queues, message);
}
- public PostEnqueueAction()
- {
- _transactional = false;
- }
-
public void setState(List<? extends BaseQueue> queues, ServerMessage message)
{
_message = message;
@@ -830,4 +817,76 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
return _blocking.get();
}
+
+ private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
+
+ public void completeAsyncCommands()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
+ {
+ cmd.complete();
+ _unfinishedCommandsQueue.poll();
+ }
+ while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
+ {
+ cmd = _unfinishedCommandsQueue.poll();
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+
+ public void awaitCommandCompletion()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.poll()) != null)
+ {
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+
+ public Object getAsyncCommandMark()
+ {
+ return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
+ }
+
+ public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ {
+ _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
+ }
+
+ private static class AsyncCommand
+ {
+ private final MessageStore.StoreFuture _future;
+ private ServerTransaction.Action _action;
+
+ public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ {
+ _future = future;
+ _action = action;
+ }
+
+ void awaitReadyForCompletion()
+ {
+ _future.waitForCompletion();
+ }
+
+ void complete()
+ {
+ if(!_future.isComplete())
+ {
+ _future.waitForCompletion();
+ }
+ _action.postCommit();
+ _action = null;
+ }
+
+ boolean isReadyForCompletion()
+ {
+ return _future.isComplete();
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index b6e142a5fd..2eab65cf8a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.transport;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -55,6 +54,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
@@ -81,9 +81,22 @@ public class ServerSessionDelegate extends SessionDelegate
if(!session.isClosing())
{
- super.command(session, method);
+ Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark();
+ super.command(session, method, false);
+ Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
+ if(newOutstanding == null || newOutstanding == asyncCommandMark)
+ {
+ session.processed(method);
+ }
+
+ if(newOutstanding != null)
+ {
+ ((ServerSession)session).completeAsyncCommands();
+ }
+
if (method.isSync())
{
+ ((ServerSession)session).awaitCommandCompletion();
session.flushProcessed();
}
}
@@ -98,7 +111,13 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageAccept(Session session, MessageAccept method)
{
- ((ServerSession)session).accept(method.getTransfers());
+ final ServerSession serverSession = (ServerSession) session;
+ serverSession.accept(method.getTransfers());
+ if(!serverSession.isTransactional())
+ {
+ serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE,
+ new CommandProcessedAction(serverSession, method));
+ }
}
@Override
@@ -252,7 +271,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
@Override
- public void messageTransfer(Session ssn, MessageTransfer xfr)
+ public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
final Exchange exchange = getExchangeForMessage(ssn, xfr);
@@ -294,12 +313,13 @@ public class ServerSessionDelegate extends SessionDelegate
exchangeInUse = exchange;
}
+ final ServerSession serverSession = (ServerSession) ssn;
if(!queues.isEmpty())
{
final MessageStore store = getVirtualHost(ssn).getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
- MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
- ((ServerSession) ssn).enqueue(message, queues);
+ MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
+ serverSession.enqueue(message, queues);
storeMessage.flushToStore();
}
else
@@ -313,13 +333,19 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
+ serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
}
}
-
- ssn.processed(xfr);
+ if(serverSession.isTransactional())
+ {
+ serverSession.processed(xfr);
+ }
+ else
+ {
+ serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
+ }
}
private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
@@ -404,6 +430,13 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
+ public void executionSync(final Session ssn, final ExecutionSync sync)
+ {
+ ((ServerSession)ssn).awaitCommandCompletion();
+ super.executionSync(ssn, sync);
+ }
+
+ @Override
public void exchangeDeclare(Session session, ExchangeDeclare method)
{
String exchangeName = method.getExchange();
@@ -1269,4 +1302,25 @@ public class ServerSessionDelegate extends SessionDelegate
final ServerConnection scon = (ServerConnection) session.getConnection();
SecurityManager.setThreadSubject(scon.getAuthorizedSubject());
}
+
+ private static class CommandProcessedAction implements ServerTransaction.Action
+ {
+ private final ServerSession _serverSession;
+ private final Method _method;
+
+ public CommandProcessedAction(final ServerSession serverSession, final Method xfr)
+ {
+ _serverSession = serverSession;
+ _method = xfr;
+ }
+
+ public void postCommit()
+ {
+ _serverSession.processed(_method);
+ }
+
+ public void onRollback()
+ {
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
new file mode 100755
index 0000000000..11439b3c8a
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -0,0 +1,316 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStore.StoreFuture;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * An implementation of ServerTransaction where each enqueue/dequeue
+ * operation takes place within it own transaction.
+ *
+ * Since there is no long-lived transaction, the commit and rollback methods of
+ * this implementation are empty.
+ */
+public class AsyncAutoCommitTransaction implements ServerTransaction
+{
+ protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class);
+
+ private final MessageStore _messageStore;
+ private final FutureRecorder _futureRecorder;
+
+ public static interface FutureRecorder
+ {
+ public void recordFuture(StoreFuture future, Action action);
+
+ }
+
+ public AsyncAutoCommitTransaction(MessageStore transactionLog, FutureRecorder recorder)
+ {
+ _messageStore = transactionLog;
+ _futureRecorder = recorder;
+ }
+
+ public long getTransactionStartTime()
+ {
+ return 0L;
+ }
+
+ /**
+ * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
+ * by the caller are executed immediately.
+ */
+ public void addPostTransactionAction(final Action immediateAction)
+ {
+ addFuture(MessageStore.IMMEDIATE_FUTURE, immediateAction);
+
+ }
+
+ public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
+ {
+ MessageStore.Transaction txn = null;
+ try
+ {
+ MessageStore.StoreFuture future;
+ if(message.isPersistent() && queue.isDurable())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ }
+
+ txn = _messageStore.newTransaction();
+ txn.dequeueMessage(queue, message);
+ future = txn.commitTranAsync();
+
+ txn = null;
+ }
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
+ postTransactionAction = null;
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error during message dequeue", e);
+ throw new RuntimeException("Error during message dequeue", e);
+ }
+ finally
+ {
+ rollbackIfNecessary(postTransactionAction, txn);
+ }
+
+ }
+
+ private void addFuture(final MessageStore.StoreFuture future, final Action action)
+ {
+ _futureRecorder.recordFuture(future, action);
+ }
+
+ public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+ {
+ MessageStore.Transaction txn = null;
+ try
+ {
+ for(QueueEntry entry : queueEntries)
+ {
+ ServerMessage message = entry.getMessage();
+ BaseQueue queue = entry.getQueue();
+
+ if(message.isPersistent() && queue.isDurable())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
+ }
+
+ if(txn == null)
+ {
+ txn = _messageStore.newTransaction();
+ }
+
+ txn.dequeueMessage(queue, message);
+ }
+
+ }
+ MessageStore.StoreFuture future;
+ if(txn != null)
+ {
+ future = txn.commitTranAsync();
+ txn = null;
+ }
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
+ postTransactionAction = null;
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error during message dequeues", e);
+ throw new RuntimeException("Error during message dequeues", e);
+ }
+ finally
+ {
+ rollbackIfNecessary(postTransactionAction, txn);
+ }
+
+ }
+
+
+ public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
+ {
+ MessageStore.Transaction txn = null;
+ try
+ {
+ MessageStore.StoreFuture future;
+ if(message.isPersistent() && queue.isDurable())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+ }
+
+ txn = _messageStore.newTransaction();
+ txn.enqueueMessage(queue, message);
+ future = txn.commitTranAsync();
+ txn = null;
+ }
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
+ postTransactionAction = null;
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error during message enqueue", e);
+ throw new RuntimeException("Error during message enqueue", e);
+ }
+ finally
+ {
+ rollbackIfNecessary(postTransactionAction, txn);
+ }
+
+
+ }
+
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ {
+ MessageStore.Transaction txn = null;
+ try
+ {
+
+ if(message.isPersistent())
+ {
+ for(BaseQueue queue : queues)
+ {
+ if (queue.isDurable())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
+ }
+ if (txn == null)
+ {
+ txn = _messageStore.newTransaction();
+ }
+
+ txn.enqueueMessage(queue, message);
+
+
+ }
+ }
+
+ }
+ MessageStore.StoreFuture future;
+ if (txn != null)
+ {
+ future = txn.commitTranAsync();
+ txn = null;
+ }
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
+ postTransactionAction = null;
+
+
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error during message enqueues", e);
+ throw new RuntimeException("Error during message enqueues", e);
+ }
+ finally
+ {
+ rollbackIfNecessary(postTransactionAction, txn);
+ }
+
+ }
+
+
+ public void commit(final Runnable immediatePostTransactionAction)
+ {
+ addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
+ {
+ public void postCommit()
+ {
+ immediatePostTransactionAction.run();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
+
+ public void commit()
+ {
+ }
+
+ public void rollback()
+ {
+ }
+
+ public boolean isTransactional()
+ {
+ return false;
+ }
+
+ private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
+ {
+ if (txn != null)
+ {
+ try
+ {
+ txn.abortTran();
+ }
+ catch (AMQStoreException e)
+ {
+ _logger.error("Abort transaction failed", e);
+ // Deliberate decision not to re-throw this exception. Rationale: we can only reach here if a previous
+ // TransactionLog method has ended in Exception. If we were to re-throw here, we would prevent
+ // our caller from receiving the original exception (which is likely to be more revealing of the underlying error).
+ }
+ }
+ if (postTransactionAction != null)
+ {
+ postTransactionAction.onRollback();
+ }
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index a67d4badd1..ad2a299108 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -242,6 +242,11 @@ public class AutoCommitTransaction implements ServerTransaction
{
}
+ public boolean isTransactional()
+ {
+ return false;
+ }
+
private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
{
if (txn != null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 7f5b5fb8b2..34bac0411e 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -309,7 +309,12 @@ public class LocalTransaction implements ServerTransaction
private void resetDetails()
{
_transaction = null;
- _postTransactionActions.clear();
+ _postTransactionActions.clear();
_txnStartTime = 0L;
}
+
+ public boolean isTransactional()
+ {
+ return true;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
index 64fdc0ba9a..77efb21ddb 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
@@ -43,6 +43,8 @@ import java.util.List;
*/
public interface ServerTransaction
{
+
+
/**
* Represents an action to be performed on transaction commit or rollback
*/
@@ -110,4 +112,6 @@ public interface ServerTransaction
* be executed immediately after the underlying transaction has rolled-back.
*/
void rollback();
+
+ boolean isTransactional();
}