diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-07-10 13:56:45 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-07-10 13:56:45 +0000 |
| commit | 75a570de0545d7900392819ea0bb93e05c738ef6 (patch) | |
| tree | 0c9ee4397df6fa4dc0bf6aa25dfadedac7630925 /qpid/java/broker/src | |
| parent | 7eeab801bf9055035d4d16a78d654fa874209bc7 (diff) | |
| download | qpid-python-75a570de0545d7900392819ea0bb93e05c738ef6.tar.gz | |
QPID-4125 : [Java Broker] allow coalescing of commits for multiple channels on same connection
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1359673 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
4 files changed, 185 insertions, 29 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3f3269605f..71a4a84323 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -952,9 +952,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm public void commit() throws AMQException { - commit(null); + commit(null, false); } - public void commit(Runnable immediateAction) throws AMQException + + + public void commit(final Runnable immediateAction, boolean async) throws AMQException { if (!isTransactional()) @@ -962,11 +964,29 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm throw new AMQException("Fatal error: commit called on non-transactional channel"); } - _transaction.commit(immediateAction); + if(async && _transaction instanceof LocalTransaction) + { - _txnCommits.incrementAndGet(); - _txnStarts.incrementAndGet(); - decrementOutstandingTxnsIfNecessary(); + ((LocalTransaction)_transaction).commitAsync(new Runnable() + { + @Override + public void run() + { + immediateAction.run(); + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } + }); + } + else + { + _transaction.commit(immediateAction); + + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } } public void rollback() throws AMQException @@ -1624,6 +1644,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm cmd.awaitReadyForCompletion(); cmd.complete(); } + if(_transaction instanceof LocalTransaction) + { + ((LocalTransaction)_transaction).sync(); + } } private static class AsyncCommand diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index 6e8896a023..afa5fdb72a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -46,9 +46,9 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> { } - public void methodReceived(AMQStateManager stateManager, TxCommitBody body, int channelId) throws AMQException + public void methodReceived(AMQStateManager stateManager, TxCommitBody body, final int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); + final AMQProtocolSession session = stateManager.getProtocolSession(); try { @@ -62,11 +62,19 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> { throw body.getChannelNotFoundException(channelId); } - channel.commit(); + channel.commit(new Runnable() + { + + @Override + public void run() + { + MethodRegistry methodRegistry = session.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody(); + session.writeFrame(responseBody.generateFrame(channelId)); + } + }, true); + - MethodRegistry methodRegistry = session.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); } catch (AMQException e) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 7ef5124cc4..de61a03aff 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -779,6 +779,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { if(_closing.compareAndSet(false,true)) { + // force sync of outstanding async work + receiveComplete(); + // REMOVE THIS SHOULD NOT BE HERE. if (CurrentActor.get() == null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index a00691bde9..3fbcff7e2c 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.txn; +import org.apache.qpid.server.store.StoreFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,7 @@ public class LocalTransaction implements ServerTransaction private volatile Transaction _transaction; private MessageStore _transactionLog; private long _txnStartTime = 0L; + private StoreFuture _asyncTran; public LocalTransaction(MessageStore transactionLog) { @@ -68,11 +70,13 @@ public class LocalTransaction implements ServerTransaction public void addPostTransactionAction(Action postTransactionAction) { + sync(); _postTransactionActions.add(postTransactionAction); } public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { + sync(); _postTransactionActions.add(postTransactionAction); if(message.isPersistent() && queue.isDurable()) @@ -98,6 +102,7 @@ public class LocalTransaction implements ServerTransaction public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) { + sync(); _postTransactionActions.add(postTransactionAction); try @@ -131,10 +136,7 @@ public class LocalTransaction implements ServerTransaction { try { - for(Action action : _postTransactionActions) - { - action.onRollback(); - } + doRollbackActions(); } finally { @@ -151,7 +153,7 @@ public class LocalTransaction implements ServerTransaction } finally { - resetDetails(); + resetDetails(); } } @@ -176,6 +178,7 @@ public class LocalTransaction implements ServerTransaction public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { + sync(); _postTransactionActions.add(postTransactionAction); if(message.isPersistent() && queue.isDurable()) @@ -201,6 +204,7 @@ public class LocalTransaction implements ServerTransaction public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) { + sync(); _postTransactionActions.add(postTransactionAction); if (_txnStartTime == 0L) @@ -239,11 +243,13 @@ public class LocalTransaction implements ServerTransaction public void commit() { + sync(); commit(null); } public void commit(Runnable immediateAction) { + sync(); try { if(_transaction != null) @@ -256,29 +262,137 @@ public class LocalTransaction implements ServerTransaction immediateAction.run(); } - for(int i = 0; i < _postTransactionActions.size(); i++) + doPostTransactionActions(); + } + catch (Exception e) + { + _logger.error("Failed to commit transaction", e); + + doRollbackActions(); + throw new RuntimeException("Failed to commit transaction", e); + } + finally + { + resetDetails(); + } + } + + private void doRollbackActions() + { + for(Action action : _postTransactionActions) + { + action.onRollback(); + } + } + + public StoreFuture commitAsync(final Runnable deferred) + { + sync(); + try + { + StoreFuture future = StoreFuture.IMMEDIATE_FUTURE; + if(_transaction != null) { - _postTransactionActions.get(i).postCommit(); + future = new StoreFuture() + { + private volatile boolean _completed = false; + private StoreFuture _underlying = _transaction.commitTranAsync(); + + @Override + public boolean isComplete() + { + return _completed || checkUnderlyingCompletion(); + } + + @Override + public void waitForCompletion() + { + if(!_completed) + { + _underlying.waitForCompletion(); + checkUnderlyingCompletion(); + } + } + + private synchronized boolean checkUnderlyingCompletion() + { + if(!_completed && _underlying.isComplete()) + { + completeDeferredWork(); + _completed = true; + } + return _completed; + + } + + private void completeDeferredWork() + { + try + { + doPostTransactionActions(); + deferred.run(); + + } + catch (Exception e) + { + _logger.error("Failed to commit transaction", e); + + doRollbackActions(); + throw new RuntimeException("Failed to commit transaction", e); + } + finally + { + resetDetails(); + } + } + + }; + _asyncTran = future; } + else + { + try + { + doPostTransactionActions(); + + deferred.run(); + } + finally + { + resetDetails(); + } + } + + return future; } catch (Exception e) { _logger.error("Failed to commit transaction", e); - - for(Action action : _postTransactionActions) + try { - action.onRollback(); + doRollbackActions(); + } + finally + { + resetDetails(); } throw new RuntimeException("Failed to commit transaction", e); } - finally + + + } + + private void doPostTransactionActions() + { + for(int i = 0; i < _postTransactionActions.size(); i++) { - resetDetails(); + _postTransactionActions.get(i).postCommit(); } } public void rollback() { + sync(); try { if(_transaction != null) @@ -295,10 +409,7 @@ public class LocalTransaction implements ServerTransaction { try { - for(Action action : _postTransactionActions) - { - action.onRollback(); - } + doRollbackActions(); } finally { @@ -306,9 +417,19 @@ public class LocalTransaction implements ServerTransaction } } } - + + public void sync() + { + if(_asyncTran != null) + { + _asyncTran.waitForCompletion(); + _asyncTran = null; + } + } + private void resetDetails() { + _asyncTran = null; _transaction = null; _postTransactionActions.clear(); _txnStartTime = 0L; |
