summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-03-07 14:42:02 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-03-07 14:42:02 +0000
commitd20ff23e2a578969b1cad084504cd72bcbb38581 (patch)
tree337cef9ddd9ce147715e0fb09121ef6213d220b7
parent1326b8d12d5691ff34f697f6bcfead29166e479f (diff)
downloadqpid-python-d20ff23e2a578969b1cad084504cd72bcbb38581.tar.gz
Removed redundant code (see QPID-838). As this is a major 0.10 change tck has been run prior to committing it.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@634696 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java52
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java50
2 files changed, 10 insertions, 92 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 8039e3a163..27feba694c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -249,24 +249,6 @@ public class AMQSession_0_10 extends AMQSession
getCurrentException();
}
- /**
- * We need to release message that may be pre-fetched in the local queue
- *
- * @throws JMSException
- */
- public void close() throws JMSException
- {
- super.close();
- // We need to release pre-fetched messages
- Iterator messages=_queue.iterator();
- while (messages.hasNext())
- {
- UnprocessedMessage message=(UnprocessedMessage) messages.next();
- messages.remove();
- rejectMessage(message, true);
- }
- }
-
/**
* Commit the receipt and the delivery of all messages exchanged by this session resources.
@@ -426,7 +408,8 @@ public class AMQSession_0_10 extends AMQSession
getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
- if(consumer.isStrated())
+ // only if not immediat prefetch
+ if(consumer.isStrated() || _immediatePrefetch)
{
// set the flow
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
@@ -654,21 +637,14 @@ public class AMQSession_0_10 extends AMQSession
void start() throws AMQException
{
- suspendChannel(false);
+ super.start();
for(BasicMessageConsumer c: _consumers.values())
{
c.start();
}
- // If the event dispatcher is not running then start it too.
- if (hasMessageListeners())
- {
- startDistpatcherIfNecessary();
- }
}
-
-
void stop() throws AMQException
{
super.stop();
@@ -678,27 +654,7 @@ public class AMQSession_0_10 extends AMQSession
}
}
- synchronized void startDistpatcherIfNecessary()
- {
- // If IMMEDIATE_PREFETCH is not set then we need to start fetching
- if (!_immediatePrefetch)
- {
- // We do this now if this is the first call on a started connection
- if (isSuspended() && _firstDispatcher.getAndSet(false))
- {
- try
- {
- suspendChannel(false);
- }
- catch (AMQException e)
- {
- _logger.info("Unsuspending channel threw an exception:" + e);
- }
- }
- }
-
- startDistpatcherIfNecessary(false);
- }
+
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 97a631aa18..9d24fbf953 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -429,40 +429,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
- if (messageListener == null)
+ if (messageListener != null && !_synchronousQueue.isEmpty())
{
- /* _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
- _0_10session.getQpidSession()
- .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
- 0xFFFFFFFF);
- _0_10session.getQpidSession().sync();
- */
- }
- else
- {
- if(! _synchronousQueue.isEmpty())
+ Iterator messages=_synchronousQueue.iterator();
+ while (messages.hasNext())
{
- Iterator messages=_synchronousQueue.iterator();
- while (messages.hasNext())
- {
- AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
- messages.remove();
- _session.rejectMessage(message, true);
- }
- }
- if (_connection.started())
- {
- _0_10session.getQpidSession()
- .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- AMQSession_0_10.MAX_PREFETCH);
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
- 0xFFFFFFFF);
- _0_10session.getQpidSession().sync();
+ AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+ messages.remove();
+ _session.rejectMessage(message, true);
}
}
}
@@ -482,16 +456,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_isStarted = false;
}
- public void close() throws JMSException
- {
- super.close();
- // release message that may be staged
- Iterator messages=_synchronousQueue.iterator();
- while (messages.hasNext())
- {
- AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
- messages.remove();
- _session.rejectMessage(message, true);
- }
- }
} \ No newline at end of file