summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-02-06 12:56:20 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-02-06 12:56:20 +0000
commit94bde36b1306f1ebbc2cc23f136ac3d4e7b1f164 (patch)
treeb9199d059d67528206d2a9d626e24e067dedab3c /java
parent9e6f569828967edb9430f6b8cbdda9323a9cf8c5 (diff)
downloadqpid-python-94bde36b1306f1ebbc2cc23f136ac3d4e7b1f164.tar.gz
QPID-777 and QPID-778
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@618986 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java92
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java112
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java71
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java168
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java19
7 files changed, 266 insertions, 234 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index a5c2688511..3a59163f9b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -24,7 +24,6 @@ package org.apache.qpid.client;
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -78,8 +77,6 @@ import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.AMQBindingURL;
@@ -183,14 +180,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* keeps a record of subscriptions which have been created in the current instance. It does not remember
* subscriptions between executions of the client.
*/
- private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
+ protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
/**
* Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
* up in the {@link #_subscriptions} map.
*/
- private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
+ protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
new ConcurrentHashMap<BasicMessageConsumer, String>();
/**
@@ -271,10 +268,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
protected final boolean _immediatePrefetch;
/** Indicates that warnings should be generated on violations of the strict AMQP. */
- private final boolean _strictAMQP;
+ protected final boolean _strictAMQP;
/** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
- private final boolean _strictAMQPFATAL;
+ protected final boolean _strictAMQPFATAL;
private final Object _messageDeliveryLock = new Object();
/**
@@ -459,8 +456,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
if (_logger.isInfoEnabled())
{
- _logger.info("Closing session: " + this + ":"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ _logger.info("Closing session: " + this );//+ ":"
+ // + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
synchronized (_messageDeliveryLock)
@@ -673,6 +670,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
false, false);
}
+ public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+ false, false);
+ }
+
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
@@ -723,70 +728,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
false);
}
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
- {
-
- checkNotClosed();
- AMQTopic origTopic = checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
- TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
- if (subscriber != null)
- {
- if (subscriber.getTopic().equals(topic))
- {
- throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
- + name);
- }
- else
- {
- unsubscribe(name);
- }
- }
- else
- {
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
- {
- topicName = ((AMQTopic) topic).getRoutingKey();
- }
- else
- {
- topicName = new AMQShortString(topic.getTopicName());
- }
-
- if (_strictAMQP)
- {
- if (_strictAMQPFATAL)
- {
- throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
- }
- else
- {
- _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
- + "for creation durableSubscriber. Requesting queue deletion regardless.");
- }
-
- deleteQueue(dest.getAMQQueueName());
- }
- else
- {
- // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
- {
- deleteQueue(dest.getAMQQueueName());
- }
- }
- }
-
- subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
-
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
-
- return subscriber;
- }
+ public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
/** Note, currently this does not handle reuse of the same name with different topics correctly. */
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
@@ -1800,7 +1742,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- private AMQTopic checkValidTopic(Topic topic) throws JMSException
+ protected AMQTopic checkValidTopic(Topic topic) throws JMSException
{
if (topic == null)
{
@@ -2060,7 +2002,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @todo Be aware of possible changes to parameter order as versions change.
*/
- private void deleteQueue(final AMQShortString queueName) throws JMSException
+ protected void deleteQueue(final AMQShortString queueName) throws JMSException
{
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index a63d94b4ca..3ffe92d139 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -38,14 +38,11 @@ import org.apache.qpidity.transport.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.JMSException;
-import javax.jms.Destination;
-import javax.jms.TemporaryQueue;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.UUID;
import java.util.Map;
-import java.util.HashMap;
-
/**
* This is a 0.10 Session
*/
@@ -146,6 +143,25 @@ public class AMQSession_0_10 extends AMQSession
//------- overwritten methods of class AMQSession
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+ throws JMSException
+ {
+ checkNotClosed();
+ checkValidTopic(topic);
+ if( _subscriptions.containsKey(name))
+ {
+ _subscriptions.get(name).close();
+ }
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
+ }
+
/**
* Acknowledge one or many messages.
*
@@ -362,6 +378,14 @@ public class AMQSession_0_10 extends AMQSession
getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
+ if(consumer.isStrated())
+ {
+ // set the flow
+ getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ AMQSession_0_10.MAX_PREFETCH);
+
+ }
getQpidSession().sync();
getCurrentException();
}
@@ -462,11 +486,11 @@ public class AMQSession_0_10 extends AMQSession
//only set if msg list is null
try
{
- if (consumer.getMessageListener() != null)
- {
+ // if (consumer.getMessageListener() != null)
+ // {
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE,
MAX_PREFETCH);
- }
+ // }
getQpidSession()
.messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
}
@@ -579,8 +603,7 @@ public class AMQSession_0_10 extends AMQSession
void start() throws AMQException
{
-
- super.suspendChannel(false);
+ suspendChannel(false);
for(BasicMessageConsumer c: _consumers.values())
{
c.start();
@@ -601,7 +624,7 @@ public class AMQSession_0_10 extends AMQSession
}
}
- synchronized void startDistpatcherIfNecessary()
+ synchronized void startDistpatcherIfNecessary()
{
// If IMMEDIATE_PREFETCH is not set then we need to start fetching
if (!_immediatePrefetch)
@@ -622,4 +645,71 @@ public class AMQSession_0_10 extends AMQSession
startDistpatcherIfNecessary(false);
}
+
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+ {
+
+ checkNotClosed();
+ AMQTopic origTopic=checkValidTopic(topic);
+ AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
+
+ TopicSubscriberAdaptor subscriber=_subscriptions.get(name);
+ if (subscriber != null)
+ {
+ if (subscriber.getTopic().equals(topic))
+ {
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
+ + name);
+ }
+ else
+ {
+ unsubscribe(name);
+ }
+ }
+ else
+ {
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName=((AMQTopic) topic).getRoutingKey();
+ }
+ else
+ {
+ topicName=new AMQShortString(topic.getTopicName());
+ }
+
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+ + "for creation durableSubscriber. Requesting queue deletion regardless.");
+ }
+
+ deleteQueue(dest.getAMQQueueName());
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
+ }
+
+ subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 8740410bea..0e5786da1e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -21,9 +21,8 @@
package org.apache.qpid.client;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.TemporaryQueue;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
@@ -333,4 +332,70 @@ public class AMQSession_0_8 extends AMQSession
return new AMQTemporaryQueue(this);
}
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+ {
+
+ checkNotClosed();
+ AMQTopic origTopic = checkValidTopic(topic);
+ AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ if (subscriber.getTopic().equals(topic))
+ {
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
+ + name);
+ }
+ else
+ {
+ unsubscribe(name);
+ }
+ }
+ else
+ {
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName = ((AMQTopic) topic).getRoutingKey();
+ }
+ else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
+
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+ + "for creation durableSubscriber. Requesting queue deletion regardless.");
+ }
+
+ deleteQueue(dest.getAMQQueueName());
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
+ }
+
+ subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 423a323d43..e2ae18a21f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -71,6 +71,13 @@ public class AMQTopic extends AMQDestination implements Topic
queueName, isDurable);
}
+ protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+ {
+ super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable );
+ }
+
+
public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
throws JMSException
{
@@ -79,6 +86,13 @@ public class AMQTopic extends AMQDestination implements Topic
true);
}
+ public static AMQTopic createDurable010Topic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+ throws JMSException
+ {
+ return new AMQTopic(topic.getExchangeName(), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false,
+ getDurableTopicQueueName(subscriptionName, connection), false);
+ }
+
public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
{
return new AMQShortString(connection.getClientID() + ":" + subscriptionName);
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index ba31a6102f..d9d91f1ebe 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -385,7 +385,23 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
}
- public abstract Object getMessageFromQueue(long l) throws InterruptedException;
+ public Object getMessageFromQueue(long l) throws InterruptedException
+ {
+ Object o;
+ if (l > 0)
+ {
+ o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ }
+ else if (l < 0)
+ {
+ o = _synchronousQueue.poll();
+ }
+ else
+ {
+ o = _synchronousQueue.take();
+ }
+ return o;
+ }
private boolean closeOnAutoClose() throws JMSException
{
@@ -979,6 +995,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
// do nothing as this is a 0_10 feature
}
+ public boolean isStrated()
+ {
+ // do nothing as this is a 0_10 feature
+ return false;
+ }
+
public AMQShortString getQueuename()
{
return _queuename;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 80b63c75c8..8828f3553f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -19,10 +19,7 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
@@ -41,7 +38,6 @@ import javax.jms.JMSException;
import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -50,12 +46,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
implements org.apache.qpidity.nclient.util.MessageListener
{
- /**
- * A counter for keeping the number of available messages for this consumer
- */
- private final AtomicLong _messageCounter = new AtomicLong(0);
-
- /**
+ /**
* Number of received message so far
*/
private final AtomicLong _messagesReceived = new AtomicLong(0);
@@ -117,11 +108,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
// ----- Interface org.apache.qpidity.client.util.MessageListener
/**
+ *
+ * This is invoked by the session thread when emptying the session message queue.
+ * We first check if the message is valid (match the selector) and then deliver it to the
+ * message listener or to the sync consumer queue.
+ *
* @param jmsMessage this message has already been processed so can't redo preDeliver
* @param channelId
*/
public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
{
+ _messagesReceived.incrementAndGet();
boolean messageOk = false;
try
{
@@ -136,12 +133,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
catch (Exception e1)
{
- // the receiver may be waiting for a message
- if (_messageCounter.get() >= 0)
- {
- _messageCounter.decrementAndGet();
- _synchronousQueue.add(new NullTocken());
- }
// we should silently log thie exception as it only hanppens when the connection is closed
_logger.error("Exception when receiving message", e1);
}
@@ -152,20 +143,28 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
}
- public void onMessage(Message message)
+ /**
+ * Require more credit for this consumer
+ */
+ private void requireMoreCreditIfNecessary()
{
- if (isMessageListenerSet())
+ if (_isStarted && _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH)
{
- _messagesReceived.incrementAndGet();
- if (_messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH)
- {
- // require more credit
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- AMQSession_0_10.MAX_PREFETCH);
- _messagesReceived.set(0);
- }
+ // require more credit
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ AMQSession_0_10.MAX_PREFETCH);
+ _messagesReceived.set(0);
}
+ }
+
+ /**
+ * This method is invoked by the transport layer when a message is delivered for this
+ * consumer. The message is transformed and pass to the session.
+ * @param message an 0.10 message
+ */
+ public void onMessage(Message message)
+ {
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
String consumerTag = getConsumerTag().toString();
@@ -207,8 +206,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
newMessage.setReplyToURL(replyToUrl);
}
newMessage.setContentHeader(headers);
- // increase the counter of messages
- _messageCounter.incrementAndGet();
getSession().messageReceived(newMessage);
// else ignore this message
}
@@ -242,10 +239,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
// notify the session
((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
+ if (isMessageListenerSet())
+ {
+ requireMoreCreditIfNecessary();
+ }
+ else if (_synchronousQueue.isEmpty())
+ {
+ requireMoreCreditIfNecessary();
+ }
//if (!Boolean.getBoolean("noAck"))
//{
super.postDeliver(msg);
//}
+
+
}
void notifyMessage(UnprocessedMessage messageFrame, int channelId)
@@ -351,50 +358,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
messageOk = acquireMessage(message);
}
- if (!messageOk)
- {
- requestCreditIfCreditMode();
- }
return messageOk;
}
- private void requestCreditIfCreditMode()
- {
- try
- {
- // the current message received is not good, so we need to get a message.
- if (getMessageListener() == null)
- {
- int oldval = _messageCounter.intValue();
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- 1);
- _0_10session.getQpidSession()
- .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
- _0_10session.getQpidSession().sync();
- _0_10session.getQpidSession()
- .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- if (_messageCounter.intValue() <= oldval)
- {
- // we haven't received a message so tell the receiver to return null
- _synchronousQueue.add(new NullTocken());
- }
- else
- {
- _messageCounter.decrementAndGet();
- }
- }
- // we now need to check if we have received a message
-
- }
- catch (Exception e)
- {
- _logger.error(
- "Error getting message listener, couldn't request credit after releasing a message that failed the selector test",
- e);
- }
- }
/**
* Acknowledge a message
@@ -469,16 +435,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
super.setMessageListener(messageListener);
if (messageListener == null)
{
- _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
+ /* _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
_0_10session.getQpidSession()
.messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
+ */
}
else
{
+ //TODO: empty the list of sync messages.
if (_connection.started())
{
_0_10session.getQpidSession()
@@ -491,65 +459,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
_messagesReceived.set(0);
- ;
}
}
}
- public Object getMessageFromQueue(long l) throws InterruptedException
+ public boolean isStrated()
{
- if (!_isStarted)
- {
- return null;
- }
- Object o;
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-
- if (l == 0)
- {
- o = _synchronousQueue.take();
- }
- else
- {
- if (l > 0)
- {
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
- }
- else
- {
- o = _synchronousQueue.poll();
- }
- if (o == null)
- {
- _logger.debug("Message Didn't arrive in time, checking if one is inflight");
- // checking if one is inflight
- _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
- _0_10session.getQpidSession().sync();
- _0_10session.getQpidSession()
- .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- if (_messageCounter.get() > 0)
- {
- o = _synchronousQueue.take();
- }
- }
- }
- if (o instanceof NullTocken)
- {
- o = null;
- }
- return o;
- }
-
- protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
- {
- _messageCounter.decrementAndGet();
- super.preApplicationProcessing(jmsMsg);
- }
-
- private class NullTocken
- {
-
+ return _isStarted;
}
public void start()
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 6cf6918634..1bc9e4d855 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -86,22 +86,5 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
}
-
- public Object getMessageFromQueue(long l) throws InterruptedException
- {
- Object o;
- if (l > 0)
- {
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
- }
- else if (l < 0)
- {
- o = _synchronousQueue.poll();
- }
- else
- {
- o = _synchronousQueue.take();
- }
- return o;
- }
+
} \ No newline at end of file