From 08b3d439ce5cdcd127d14489ba4730ae3f2c7724 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Wed, 1 Feb 2012 15:19:32 +0000 Subject: QPID-3790: Add a method AMQSession.getQueueDepth(AMQDestionation, boolean) to sync session (if specified) before sending QueueQuery command Applied patch from Andrew MacBean and Oleksandr Rudyy. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1239166 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 38 +++++++++++++++++----- .../org/apache/qpid/client/AMQSession_0_10.java | 6 +++- .../org/apache/qpid/client/AMQSession_0_8.java | 2 +- .../apache/qpid/client/AMQSession_0_10Test.java | 27 +++++++++++++-- 4 files changed, 59 insertions(+), 14 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 25a2875b3f..82ba04ddd3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2752,18 +2752,38 @@ public abstract class AMQSession( - new FailoverProtectedOperation() - { - public Long execute() throws AMQException, FailoverException - { - return requestQueueDepth(amqd); - } - }, _connection).execute(); + return getQueueDepth(amqd, false); + } + /** + * Returns the number of messages currently queued by the given + * destination. Syncs session before receiving the queue depth if sync is + * set to true. + * + * @param amqd AMQ destination to get the depth value + * @param sync flag to sync session before receiving the queue depth + * @return queue depth + * @throws AMQException + */ + public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws AMQException + { + return new FailoverNoopSupport(new FailoverProtectedOperation() + { + public Long execute() throws AMQException, FailoverException + { + try + { + return requestQueueDepth(amqd, sync); + } + catch (TransportException e) + { + throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e); + } + } + }, _connection).execute(); } - protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException; + protected abstract Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException; /** * Declares the named exchange and type of exchange. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index d38f4bbb2f..c092fa6ccb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -951,9 +951,13 @@ public class AMQSession_0_10 extends AMQSession