summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-07-10 13:56:45 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-07-10 13:56:45 +0000
commit75a570de0545d7900392819ea0bb93e05c738ef6 (patch)
tree0c9ee4397df6fa4dc0bf6aa25dfadedac7630925 /qpid/java
parent7eeab801bf9055035d4d16a78d654fa874209bc7 (diff)
downloadqpid-python-75a570de0545d7900392819ea0bb93e05c738ef6.tar.gz
QPID-4125 : [Java Broker] allow coalescing of commits for multiple channels on same connection
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1359673 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java36
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java3
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java155
4 files changed, 185 insertions, 29 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 3f3269605f..71a4a84323 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -952,9 +952,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
public void commit() throws AMQException
{
- commit(null);
+ commit(null, false);
}
- public void commit(Runnable immediateAction) throws AMQException
+
+
+ public void commit(final Runnable immediateAction, boolean async) throws AMQException
{
if (!isTransactional())
@@ -962,11 +964,29 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
throw new AMQException("Fatal error: commit called on non-transactional channel");
}
- _transaction.commit(immediateAction);
+ if(async && _transaction instanceof LocalTransaction)
+ {
- _txnCommits.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
+ ((LocalTransaction)_transaction).commitAsync(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ immediateAction.run();
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
+ });
+ }
+ else
+ {
+ _transaction.commit(immediateAction);
+
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
}
public void rollback() throws AMQException
@@ -1624,6 +1644,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
cmd.awaitReadyForCompletion();
cmd.complete();
}
+ if(_transaction instanceof LocalTransaction)
+ {
+ ((LocalTransaction)_transaction).sync();
+ }
}
private static class AsyncCommand
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
index 6e8896a023..afa5fdb72a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -46,9 +46,9 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
}
- public void methodReceived(AMQStateManager stateManager, TxCommitBody body, int channelId) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, TxCommitBody body, final int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
try
{
@@ -62,11 +62,19 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
throw body.getChannelNotFoundException(channelId);
}
- channel.commit();
+ channel.commit(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }, true);
+
- MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
}
catch (AMQException e)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 7ef5124cc4..de61a03aff 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -779,6 +779,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
if(_closing.compareAndSet(false,true))
{
+ // force sync of outstanding async work
+ receiveComplete();
+
// REMOVE THIS SHOULD NOT BE HERE.
if (CurrentActor.get() == null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index a00691bde9..3fbcff7e2c 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.txn;
+import org.apache.qpid.server.store.StoreFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +51,7 @@ public class LocalTransaction implements ServerTransaction
private volatile Transaction _transaction;
private MessageStore _transactionLog;
private long _txnStartTime = 0L;
+ private StoreFuture _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
@@ -68,11 +70,13 @@ public class LocalTransaction implements ServerTransaction
public void addPostTransactionAction(Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
}
public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
if(message.isPersistent() && queue.isDurable())
@@ -98,6 +102,7 @@ public class LocalTransaction implements ServerTransaction
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
try
@@ -131,10 +136,7 @@ public class LocalTransaction implements ServerTransaction
{
try
{
- for(Action action : _postTransactionActions)
- {
- action.onRollback();
- }
+ doRollbackActions();
}
finally
{
@@ -151,7 +153,7 @@ public class LocalTransaction implements ServerTransaction
}
finally
{
- resetDetails();
+ resetDetails();
}
}
@@ -176,6 +178,7 @@ public class LocalTransaction implements ServerTransaction
public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
if(message.isPersistent() && queue.isDurable())
@@ -201,6 +204,7 @@ public class LocalTransaction implements ServerTransaction
public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
if (_txnStartTime == 0L)
@@ -239,11 +243,13 @@ public class LocalTransaction implements ServerTransaction
public void commit()
{
+ sync();
commit(null);
}
public void commit(Runnable immediateAction)
{
+ sync();
try
{
if(_transaction != null)
@@ -256,29 +262,137 @@ public class LocalTransaction implements ServerTransaction
immediateAction.run();
}
- for(int i = 0; i < _postTransactionActions.size(); i++)
+ doPostTransactionActions();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failed to commit transaction", e);
+
+ doRollbackActions();
+ throw new RuntimeException("Failed to commit transaction", e);
+ }
+ finally
+ {
+ resetDetails();
+ }
+ }
+
+ private void doRollbackActions()
+ {
+ for(Action action : _postTransactionActions)
+ {
+ action.onRollback();
+ }
+ }
+
+ public StoreFuture commitAsync(final Runnable deferred)
+ {
+ sync();
+ try
+ {
+ StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
+ if(_transaction != null)
{
- _postTransactionActions.get(i).postCommit();
+ future = new StoreFuture()
+ {
+ private volatile boolean _completed = false;
+ private StoreFuture _underlying = _transaction.commitTranAsync();
+
+ @Override
+ public boolean isComplete()
+ {
+ return _completed || checkUnderlyingCompletion();
+ }
+
+ @Override
+ public void waitForCompletion()
+ {
+ if(!_completed)
+ {
+ _underlying.waitForCompletion();
+ checkUnderlyingCompletion();
+ }
+ }
+
+ private synchronized boolean checkUnderlyingCompletion()
+ {
+ if(!_completed && _underlying.isComplete())
+ {
+ completeDeferredWork();
+ _completed = true;
+ }
+ return _completed;
+
+ }
+
+ private void completeDeferredWork()
+ {
+ try
+ {
+ doPostTransactionActions();
+ deferred.run();
+
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failed to commit transaction", e);
+
+ doRollbackActions();
+ throw new RuntimeException("Failed to commit transaction", e);
+ }
+ finally
+ {
+ resetDetails();
+ }
+ }
+
+ };
+ _asyncTran = future;
}
+ else
+ {
+ try
+ {
+ doPostTransactionActions();
+
+ deferred.run();
+ }
+ finally
+ {
+ resetDetails();
+ }
+ }
+
+ return future;
}
catch (Exception e)
{
_logger.error("Failed to commit transaction", e);
-
- for(Action action : _postTransactionActions)
+ try
{
- action.onRollback();
+ doRollbackActions();
+ }
+ finally
+ {
+ resetDetails();
}
throw new RuntimeException("Failed to commit transaction", e);
}
- finally
+
+
+ }
+
+ private void doPostTransactionActions()
+ {
+ for(int i = 0; i < _postTransactionActions.size(); i++)
{
- resetDetails();
+ _postTransactionActions.get(i).postCommit();
}
}
public void rollback()
{
+ sync();
try
{
if(_transaction != null)
@@ -295,10 +409,7 @@ public class LocalTransaction implements ServerTransaction
{
try
{
- for(Action action : _postTransactionActions)
- {
- action.onRollback();
- }
+ doRollbackActions();
}
finally
{
@@ -306,9 +417,19 @@ public class LocalTransaction implements ServerTransaction
}
}
}
-
+
+ public void sync()
+ {
+ if(_asyncTran != null)
+ {
+ _asyncTran.waitForCompletion();
+ _asyncTran = null;
+ }
+ }
+
private void resetDetails()
{
+ _asyncTran = null;
_transaction = null;
_postTransactionActions.clear();
_txnStartTime = 0L;