diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-03-04 11:17:48 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-03-04 11:17:48 +0000 |
| commit | 9fb6e54757b6d25d6c07657eebdbf8754438049f (patch) | |
| tree | 4e52d562a814841865b1d6e0837b8e42d05a7492 /java/broker/src | |
| parent | ba39f78f2ca2976f4d12f00a892f06245b46fac9 (diff) | |
| download | qpid-python-9fb6e54757b6d25d6c07657eebdbf8754438049f.tar.gz | |
QPID-2379: add TxnStarts, TxnCommits, TxnRejects, TxnCount on Session delegate
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@918939 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
4 files changed, 111 insertions, 10 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 3e155e104c..b6c06f7f34 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -1300,26 +1300,22 @@ public class QMFService implements ConfigStore.ConfigEventListener public Long getTxnStarts() { - // TODO - return 0l; + return _obj.getTxnStarts(); } public Long getTxnCommits() { - // TODO - return 0l; + return _obj.getTxnCommits(); } public Long getTxnRejects() { - // TODO - return 0l; + return _obj.getTxnRejects(); } public Long getTxnCount() { - // TODO - return 0l; + return _obj.getTxnCount(); } public Long getClientCredit() diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1b03ee2334..15ac52305f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -84,6 +84,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; public class AMQChannel implements SessionConfig { @@ -132,6 +133,11 @@ public class AMQChannel implements SessionConfig private final AtomicBoolean _suspended = new AtomicBoolean(false); private ServerTransaction _transaction; + + private final AtomicLong _txnStarts = new AtomicLong(0); + private final AtomicLong _txnCommits = new AtomicLong(0); + private final AtomicLong _txnRejects = new AtomicLong(0); + private final AtomicLong _txnCount = new AtomicLong(0); // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; @@ -180,6 +186,7 @@ public class AMQChannel implements SessionConfig public void setLocalTransactional() { _transaction = new LocalTransaction(_messageStore); + _txnStarts.incrementAndGet(); } public boolean isTransactional() @@ -189,6 +196,40 @@ public class AMQChannel implements SessionConfig // theory return !(_transaction instanceof AutoCommitTransaction); } + + private void incrementOutstandingTxnsIfNecessary() + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 1 if 0. + _txnCount.compareAndSet(0,1); + } + + private void decrementOutstandingTxnsIfNecessary() + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 0 if 1. + _txnCount.compareAndSet(1,0); + } + + public Long getTxnStarts() + { + return _txnStarts.get(); + } + + public Long getTxnCommits() + { + return _txnCommits.get(); + } + + public Long getTxnRejects() + { + return _txnRejects.get(); + } + + public Long getTxnCount() + { + return _txnCount.get(); + } public int getChannelId() { @@ -278,7 +319,7 @@ public class AMQChannel implements SessionConfig else { _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues)); - + incrementOutstandingTxnsIfNecessary(); } } } @@ -845,6 +886,9 @@ public class AMQChannel implements SessionConfig _transaction.commit(); + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); } public void rollback() throws AMQException @@ -877,6 +921,10 @@ public class AMQChannel implements SessionConfig finally { _rollingBack = false; + + _txnRejects.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); } postRollbackTask.run(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java index ae01ab25ea..e46e951588 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java @@ -38,4 +38,12 @@ public interface SessionConfig extends ConfiguredObject<SessionConfigType, Sessi Long getExpiryTime(); Long getMaxClientRate(); + + Long getTxnStarts(); + + Long getTxnCommits(); + + Long getTxnRejects(); + + Long getTxnCount(); }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 3e48ac2619..a65f3938a6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -61,6 +61,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; public class ServerSession extends Session implements PrincipalHolder, SessionConfig { @@ -92,6 +93,11 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); private ServerTransaction _transaction; + + private final AtomicLong _txnStarts = new AtomicLong(0); + private final AtomicLong _txnCommits = new AtomicLong(0); + private final AtomicLong _txnRejects = new AtomicLong(0); + private final AtomicLong _txnCount = new AtomicLong(0); private Principal _principal; @@ -160,7 +166,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo } }); - + incrementOutstandingTxnsIfNecessary(); } @@ -391,13 +397,56 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public void commit() { _transaction.commit(); + + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); } public void rollback() { _transaction.rollback(); + + _txnRejects.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } + + + private void incrementOutstandingTxnsIfNecessary() + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 1 if 0. + _txnCount.compareAndSet(0,1); + } + + private void decrementOutstandingTxnsIfNecessary() + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 0 if 1. + _txnCount.compareAndSet(1,0); + } + + public Long getTxnStarts() + { + return _txnStarts.get(); } + public Long getTxnCommits() + { + return _txnCommits.get(); + } + + public Long getTxnRejects() + { + return _txnRejects.get(); + } + + public Long getTxnCount() + { + return _txnCount.get(); + } + public Principal getPrincipal() { return _principal; |
