From 380f3fa63615c12b6b002924d86b15441bbde65b Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Thu, 4 Mar 2010 11:17:48 +0000 Subject: QPID-2379: add TxnStarts, TxnCommits, TxnRejects, TxnCount on Session delegate git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@918939 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/qmf/QMFService.java | 12 ++--- .../java/org/apache/qpid/server/AMQChannel.java | 50 ++++++++++++++++++++- .../qpid/server/configuration/SessionConfig.java | 8 ++++ .../qpid/server/transport/ServerSession.java | 51 +++++++++++++++++++++- 4 files changed, 111 insertions(+), 10 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 3e155e104c..b6c06f7f34 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -1300,26 +1300,22 @@ public class QMFService implements ConfigStore.ConfigEventListener public Long getTxnStarts() { - // TODO - return 0l; + return _obj.getTxnStarts(); } public Long getTxnCommits() { - // TODO - return 0l; + return _obj.getTxnCommits(); } public Long getTxnRejects() { - // TODO - return 0l; + return _obj.getTxnRejects(); } public Long getTxnCount() { - // TODO - return 0l; + return _obj.getTxnCount(); } public Long getClientCredit() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1b03ee2334..15ac52305f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -84,6 +84,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; public class AMQChannel implements SessionConfig { @@ -132,6 +133,11 @@ public class AMQChannel implements SessionConfig private final AtomicBoolean _suspended = new AtomicBoolean(false); private ServerTransaction _transaction; + + private final AtomicLong _txnStarts = new AtomicLong(0); + private final AtomicLong _txnCommits = new AtomicLong(0); + private final AtomicLong _txnRejects = new AtomicLong(0); + private final AtomicLong _txnCount = new AtomicLong(0); // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; @@ -180,6 +186,7 @@ public class AMQChannel implements SessionConfig public void setLocalTransactional() { _transaction = new LocalTransaction(_messageStore); + _txnStarts.incrementAndGet(); } public boolean isTransactional() @@ -189,6 +196,40 @@ public class AMQChannel implements SessionConfig // theory return !(_transaction instanceof AutoCommitTransaction); } + + private void incrementOutstandingTxnsIfNecessary() + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 1 if 0. + _txnCount.compareAndSet(0,1); + } + + private void decrementOutstandingTxnsIfNecessary() + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 0 if 1. + _txnCount.compareAndSet(1,0); + } + + public Long getTxnStarts() + { + return _txnStarts.get(); + } + + public Long getTxnCommits() + { + return _txnCommits.get(); + } + + public Long getTxnRejects() + { + return _txnRejects.get(); + } + + public Long getTxnCount() + { + return _txnCount.get(); + } public int getChannelId() { @@ -278,7 +319,7 @@ public class AMQChannel implements SessionConfig else { _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues)); - + incrementOutstandingTxnsIfNecessary(); } } } @@ -845,6 +886,9 @@ public class AMQChannel implements SessionConfig _transaction.commit(); + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); } public void rollback() throws AMQException @@ -877,6 +921,10 @@ public class AMQChannel implements SessionConfig finally { _rollingBack = false; + + _txnRejects.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); } postRollbackTask.run(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java index ae01ab25ea..e46e951588 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java @@ -38,4 +38,12 @@ public interface SessionConfig extends ConfiguredObject(); private ServerTransaction _transaction; + + private final AtomicLong _txnStarts = new AtomicLong(0); + private final AtomicLong _txnCommits = new AtomicLong(0); + private final AtomicLong _txnRejects = new AtomicLong(0); + private final AtomicLong _txnCount = new AtomicLong(0); private Principal _principal; @@ -160,7 +166,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo } }); - + incrementOutstandingTxnsIfNecessary(); } @@ -391,13 +397,56 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public void commit() { _transaction.commit(); + + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); } public void rollback() { _transaction.rollback(); + + _txnRejects.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } + + + private void incrementOutstandingTxnsIfNecessary() + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 1 if 0. + _txnCount.compareAndSet(0,1); + } + + private void decrementOutstandingTxnsIfNecessary() + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 0 if 1. + _txnCount.compareAndSet(1,0); + } + + public Long getTxnStarts() + { + return _txnStarts.get(); } + public Long getTxnCommits() + { + return _txnCommits.get(); + } + + public Long getTxnRejects() + { + return _txnRejects.get(); + } + + public Long getTxnCount() + { + return _txnCount.get(); + } + public Principal getPrincipal() { return _principal; -- cgit v1.2.1