summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java25
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java56
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java18
6 files changed, 97 insertions, 33 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index f89b3715f3..5850c08b9b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -344,7 +344,7 @@ public class AMQSession_0_10 extends AMQSession
new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
- getQpidSession().messageFlowMode(tag.toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
+
// We need to sync so that we get notify of an error.
getQpidSession().sync();
getCurrentException();
@@ -437,17 +437,30 @@ public class AMQSession_0_10 extends AMQSession
for (BasicMessageConsumer consumer : _consumers.values())
{
getQpidSession().messageStop(consumer.getConsumerTag().toString());
+ getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+
}
}
else
{
for (BasicMessageConsumer consumer : _consumers.values())
{
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE,
- MAX_PREFETCH);
- // todo this
- getQpidSession()
- .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+ //only set if msg list is null
+ try
+ {
+ if (consumer.getMessageListener() != null)
+ {
+ getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ MAX_PREFETCH);
+ // todo this
+ getQpidSession()
+ .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+ }
+ }
+ catch(Exception e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR,"Error while trying to get the listener",e);
+ }
}
}
// We need to sync so that we get notify of an error.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index ca6312e79f..fa85afc6e8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -82,7 +82,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
* Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
* <p/> Argument true indicates we want strict FIFO semantics
*/
- private final ArrayBlockingQueue _synchronousQueue;
+ protected final ArrayBlockingQueue _synchronousQueue;
protected MessageFactoryRegistry _messageFactory;
@@ -354,15 +354,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
return null;
}
- Object o ;
- if (l > 0)
- {
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
- }
- else
- {
- o = _synchronousQueue.take();
- }
+ Object o = getMessageFromQueue(l);
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -385,6 +377,8 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
}
+ public abstract Object getMessageFromQueue(long l) throws InterruptedException;
+
private boolean closeOnAutoClose() throws JMSException
{
if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index f1b8c2d3e7..b0cd749a2a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,6 +29,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.transport.*;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.filter.MessageFilter;
@@ -38,6 +39,7 @@ import javax.jms.JMSException;
import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
/**
* This is a 0.10 message consumer.
@@ -65,7 +67,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
*/
private boolean _preAcquire = true;
- //--- constructor
+ //--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
@@ -127,7 +129,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
public void onMessage(Message message)
- {
+ {
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
String consumerTag = getConsumerTag().toString();
@@ -215,7 +217,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException
{
boolean messageOk = true;
- // TODO Use a tag for fiding out if message filtering is done here or by the broker.
+ // TODO Use a tag for fiding out if message filtering is done here or by the broker.
try
{
if (getMessageSelector() != null)
@@ -334,15 +336,45 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
- if (_connection.started())
+ if (messageListener == null)
+ {
+ _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+ _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
+ _0_10session.getQpidSession().sync();
+ }
+ else
+ {
+ if (_connection.started())
+ {
+ _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ AMQSession_0_10.MAX_PREFETCH);
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
+ 0xFFFFFFFF);
+ _0_10session.getQpidSession().sync();
+ }
+ }
+ }
+
+ public Object getMessageFromQueue(long l) throws InterruptedException
+ {
+ Object o;
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
+
+ if (l > 0)
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- AMQSession_0_10.MAX_PREFETCH);
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
- 0xFFFFFFFF);
+ o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
_0_10session.getQpidSession().sync();
+ o = _synchronousQueue.poll();
+ }
+ else
+ {
+ o = _synchronousQueue.take();
}
+ return null;
}
} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index c82ed96e4c..1bca13ac02 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import java.util.concurrent.TimeUnit;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -84,4 +86,18 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
}
+
+ public Object getMessageFromQueue(long l) throws InterruptedException
+ {
+ Object o;
+ if (l > 0)
+ {
+ o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ o = _synchronousQueue.take();
+ }
+ return null;
+ }
} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
index 993b0870aa..4ea8df3273 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
@@ -147,6 +147,5 @@ public class BasicInteropTest implements ClosedListener
t.testSendMessage();
t.testMessageFlush();
t.close();
-
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java b/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java
index 7f55dcbd67..9baade610b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,6 +28,7 @@ import javax.jms.Queue;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.nclient.MessagePartListener;
+import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.exchange.ExchangeDefaults;
import org.apache.qpidity.filter.JMSSelectorFilter;
@@ -159,7 +160,7 @@ public class MessageConsumerImpl extends MessageActor
// bind this queue with the topic exchange
getSession().getQpidSession()
.queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getRoutingKey(), null);
- // subscribe to this topic
+ // subscribe to this topic
getSession().getQpidSession()
.messageSubscribe(queueName, getMessageActorID(),
org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
@@ -174,12 +175,21 @@ public class MessageConsumerImpl extends MessageActor
// set the flow mode
getSession().getQpidSession()
.messageFlowMode(getMessageActorID(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_MODE_CREDIT);
+ // Set unlimited byte credits
+ getSession().getQpidSession().messageFlow(getMessageActorID(), Session.MESSAGE_FLOW_UNIT_BYTE, -1);
// this will prevent the broker from sending more than one message
// When a messageListener is set the flow will be adjusted.
// until then we assume it's for synchronous message consumption
requestCredit(1);
- requestSync();
+ try
+ {
+ requestSync();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
// check for an exception
if (getSession().getCurrentException() != null)
{