summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-10-08 16:15:07 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-10-08 16:15:07 +0000
commitcbc13dd71907a8c21bea9612c2b33a585a4db236 (patch)
treefb4ccb2f56f7b89418e468cf5c6a8db12c6ad5fc /qpid/java
parent2920d622cf21c1e31631527c636386dcbdc3e41e (diff)
downloadqpid-python-cbc13dd71907a8c21bea9612c2b33a585a4db236.tar.gz
Changed to use Window for asyn and credit mode for sync consume.
Also added logic to change the mode when suspend is called and when the message listener is set to null. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@582861 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java25
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java56
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java18
6 files changed, 97 insertions, 33 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index f89b3715f3..5850c08b9b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -344,7 +344,7 @@ public class AMQSession_0_10 extends AMQSession
new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
- getQpidSession().messageFlowMode(tag.toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
+
// We need to sync so that we get notify of an error.
getQpidSession().sync();
getCurrentException();
@@ -437,17 +437,30 @@ public class AMQSession_0_10 extends AMQSession
for (BasicMessageConsumer consumer : _consumers.values())
{
getQpidSession().messageStop(consumer.getConsumerTag().toString());
+ getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+
}
}
else
{
for (BasicMessageConsumer consumer : _consumers.values())
{
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE,
- MAX_PREFETCH);
- // todo this
- getQpidSession()
- .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+ //only set if msg list is null
+ try
+ {
+ if (consumer.getMessageListener() != null)
+ {
+ getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ MAX_PREFETCH);
+ // todo this
+ getQpidSession()
+ .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+ }
+ }
+ catch(Exception e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR,"Error while trying to get the listener",e);
+ }
}
}
// We need to sync so that we get notify of an error.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index ca6312e79f..fa85afc6e8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -82,7 +82,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
* Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
* <p/> Argument true indicates we want strict FIFO semantics
*/
- private final ArrayBlockingQueue _synchronousQueue;
+ protected final ArrayBlockingQueue _synchronousQueue;
protected MessageFactoryRegistry _messageFactory;
@@ -354,15 +354,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
return null;
}
- Object o ;
- if (l > 0)
- {
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
- }
- else
- {
- o = _synchronousQueue.take();
- }
+ Object o = getMessageFromQueue(l);
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -385,6 +377,8 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
}
+ public abstract Object getMessageFromQueue(long l) throws InterruptedException;
+
private boolean closeOnAutoClose() throws JMSException
{
if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index f1b8c2d3e7..b0cd749a2a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,6 +29,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.transport.*;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.filter.MessageFilter;
@@ -38,6 +39,7 @@ import javax.jms.JMSException;
import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
/**
* This is a 0.10 message consumer.
@@ -65,7 +67,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
*/
private boolean _preAcquire = true;
- //--- constructor
+ //--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
@@ -127,7 +129,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
public void onMessage(Message message)
- {
+ {
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
String consumerTag = getConsumerTag().toString();
@@ -215,7 +217,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException
{
boolean messageOk = true;
- // TODO Use a tag for fiding out if message filtering is done here or by the broker.
+ // TODO Use a tag for fiding out if message filtering is done here or by the broker.
try
{
if (getMessageSelector() != null)
@@ -334,15 +336,45 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
- if (_connection.started())
+ if (messageListener == null)
+ {
+ _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+ _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
+ _0_10session.getQpidSession().sync();
+ }
+ else
+ {
+ if (_connection.started())
+ {
+ _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ AMQSession_0_10.MAX_PREFETCH);
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
+ 0xFFFFFFFF);
+ _0_10session.getQpidSession().sync();
+ }
+ }
+ }
+
+ public Object getMessageFromQueue(long l) throws InterruptedException
+ {
+ Object o;
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
+
+ if (l > 0)
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- AMQSession_0_10.MAX_PREFETCH);
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
- 0xFFFFFFFF);
+ o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
_0_10session.getQpidSession().sync();
+ o = _synchronousQueue.poll();
+ }
+ else
+ {
+ o = _synchronousQueue.take();
}
+ return null;
}
} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index c82ed96e4c..1bca13ac02 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import java.util.concurrent.TimeUnit;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -84,4 +86,18 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
}
+
+ public Object getMessageFromQueue(long l) throws InterruptedException
+ {
+ Object o;
+ if (l > 0)
+ {
+ o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ o = _synchronousQueue.take();
+ }
+ return null;
+ }
} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
index 993b0870aa..4ea8df3273 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
@@ -147,6 +147,5 @@ public class BasicInteropTest implements ClosedListener
t.testSendMessage();
t.testMessageFlush();
t.close();
-
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java b/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java
index 7f55dcbd67..9baade610b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,6 +28,7 @@ import javax.jms.Queue;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.nclient.MessagePartListener;
+import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.exchange.ExchangeDefaults;
import org.apache.qpidity.filter.JMSSelectorFilter;
@@ -159,7 +160,7 @@ public class MessageConsumerImpl extends MessageActor
// bind this queue with the topic exchange
getSession().getQpidSession()
.queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getRoutingKey(), null);
- // subscribe to this topic
+ // subscribe to this topic
getSession().getQpidSession()
.messageSubscribe(queueName, getMessageActorID(),
org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
@@ -174,12 +175,21 @@ public class MessageConsumerImpl extends MessageActor
// set the flow mode
getSession().getQpidSession()
.messageFlowMode(getMessageActorID(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_MODE_CREDIT);
+ // Set unlimited byte credits
+ getSession().getQpidSession().messageFlow(getMessageActorID(), Session.MESSAGE_FLOW_UNIT_BYTE, -1);
// this will prevent the broker from sending more than one message
// When a messageListener is set the flow will be adjusted.
// until then we assume it's for synchronous message consumption
requestCredit(1);
- requestSync();
+ try
+ {
+ requestSync();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
// check for an exception
if (getSession().getCurrentException() != null)
{