From d20ff23e2a578969b1cad084504cd72bcbb38581 Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Fri, 7 Mar 2008 14:42:02 +0000 Subject: Removed redundant code (see QPID-838). As this is a major 0.10 change tck has been run prior to committing it. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@634696 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQSession_0_10.java | 52 ++-------------------- .../qpid/client/BasicMessageConsumer_0_10.java | 50 +++------------------ 2 files changed, 10 insertions(+), 92 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 8039e3a163..27feba694c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -249,24 +249,6 @@ public class AMQSession_0_10 extends AMQSession getCurrentException(); } - /** - * We need to release message that may be pre-fetched in the local queue - * - * @throws JMSException - */ - public void close() throws JMSException - { - super.close(); - // We need to release pre-fetched messages - Iterator messages=_queue.iterator(); - while (messages.hasNext()) - { - UnprocessedMessage message=(UnprocessedMessage) messages.next(); - messages.remove(); - rejectMessage(message, true); - } - } - /** * Commit the receipt and the delivery of all messages exchanged by this session resources. @@ -426,7 +408,8 @@ public class AMQSession_0_10 extends AMQSession getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. - if(consumer.isStrated()) + // only if not immediat prefetch + if(consumer.isStrated() || _immediatePrefetch) { // set the flow getQpidSession().messageFlow(consumer.getConsumerTag().toString(), @@ -654,21 +637,14 @@ public class AMQSession_0_10 extends AMQSession void start() throws AMQException { - suspendChannel(false); + super.start(); for(BasicMessageConsumer c: _consumers.values()) { c.start(); } - // If the event dispatcher is not running then start it too. - if (hasMessageListeners()) - { - startDistpatcherIfNecessary(); - } } - - void stop() throws AMQException { super.stop(); @@ -678,27 +654,7 @@ public class AMQSession_0_10 extends AMQSession } } - synchronized void startDistpatcherIfNecessary() - { - // If IMMEDIATE_PREFETCH is not set then we need to start fetching - if (!_immediatePrefetch) - { - // We do this now if this is the first call on a started connection - if (isSuspended() && _firstDispatcher.getAndSet(false)) - { - try - { - suspendChannel(false); - } - catch (AMQException e) - { - _logger.info("Unsuspending channel threw an exception:" + e); - } - } - } - - startDistpatcherIfNecessary(false); - } + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 97a631aa18..9d24fbf953 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -429,40 +429,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer