diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-03-07 14:42:02 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-03-07 14:42:02 +0000 |
| commit | d20ff23e2a578969b1cad084504cd72bcbb38581 (patch) | |
| tree | 337cef9ddd9ce147715e0fb09121ef6213d220b7 | |
| parent | 1326b8d12d5691ff34f697f6bcfead29166e479f (diff) | |
| download | qpid-python-d20ff23e2a578969b1cad084504cd72bcbb38581.tar.gz | |
Removed redundant code (see QPID-838). As this is a major 0.10 change tck has been run prior to committing it.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@634696 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 52 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 50 |
2 files changed, 10 insertions, 92 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 8039e3a163..27feba694c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -249,24 +249,6 @@ public class AMQSession_0_10 extends AMQSession getCurrentException(); } - /** - * We need to release message that may be pre-fetched in the local queue - * - * @throws JMSException - */ - public void close() throws JMSException - { - super.close(); - // We need to release pre-fetched messages - Iterator messages=_queue.iterator(); - while (messages.hasNext()) - { - UnprocessedMessage message=(UnprocessedMessage) messages.next(); - messages.remove(); - rejectMessage(message, true); - } - } - /** * Commit the receipt and the delivery of all messages exchanged by this session resources. @@ -426,7 +408,8 @@ public class AMQSession_0_10 extends AMQSession getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. - if(consumer.isStrated()) + // only if not immediat prefetch + if(consumer.isStrated() || _immediatePrefetch) { // set the flow getQpidSession().messageFlow(consumer.getConsumerTag().toString(), @@ -654,21 +637,14 @@ public class AMQSession_0_10 extends AMQSession void start() throws AMQException { - suspendChannel(false); + super.start(); for(BasicMessageConsumer c: _consumers.values()) { c.start(); } - // If the event dispatcher is not running then start it too. - if (hasMessageListeners()) - { - startDistpatcherIfNecessary(); - } } - - void stop() throws AMQException { super.stop(); @@ -678,27 +654,7 @@ public class AMQSession_0_10 extends AMQSession } } - synchronized void startDistpatcherIfNecessary() - { - // If IMMEDIATE_PREFETCH is not set then we need to start fetching - if (!_immediatePrefetch) - { - // We do this now if this is the first call on a started connection - if (isSuspended() && _firstDispatcher.getAndSet(false)) - { - try - { - suspendChannel(false); - } - catch (AMQException e) - { - _logger.info("Unsuspending channel threw an exception:" + e); - } - } - } - - startDistpatcherIfNecessary(false); - } + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 97a631aa18..9d24fbf953 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -429,40 +429,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By public void setMessageListener(final MessageListener messageListener) throws JMSException { super.setMessageListener(messageListener); - if (messageListener == null) + if (messageListener != null && !_synchronousQueue.isEmpty()) { - /* _0_10session.getQpidSession().messageStop(getConsumerTag().toString()); - _0_10session.getQpidSession() - .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, - 0xFFFFFFFF); - _0_10session.getQpidSession().sync(); - */ - } - else - { - if(! _synchronousQueue.isEmpty()) + Iterator messages=_synchronousQueue.iterator(); + while (messages.hasNext()) { - Iterator messages=_synchronousQueue.iterator(); - while (messages.hasNext()) - { - AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); - messages.remove(); - _session.rejectMessage(message, true); - } - } - if (_connection.started()) - { - _0_10session.getQpidSession() - .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, - AMQSession_0_10.MAX_PREFETCH); - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, - 0xFFFFFFFF); - _0_10session.getQpidSession().sync(); + AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); + messages.remove(); + _session.rejectMessage(message, true); } } } @@ -482,16 +456,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _isStarted = false; } - public void close() throws JMSException - { - super.close(); - // release message that may be staged - Iterator messages=_synchronousQueue.iterator(); - while (messages.hasNext()) - { - AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); - messages.remove(); - _session.rejectMessage(message, true); - } - } }
\ No newline at end of file |
