diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-02-06 12:56:20 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-02-06 12:56:20 +0000 |
| commit | 94bde36b1306f1ebbc2cc23f136ac3d4e7b1f164 (patch) | |
| tree | b9199d059d67528206d2a9d626e24e067dedab3c /java | |
| parent | 9e6f569828967edb9430f6b8cbdda9323a9cf8c5 (diff) | |
| download | qpid-python-94bde36b1306f1ebbc2cc23f136ac3d4e7b1f164.tar.gz | |
QPID-777 and QPID-778
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@618986 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
7 files changed, 266 insertions, 234 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a5c2688511..3a59163f9b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -24,7 +24,6 @@ package org.apache.qpid.client; import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -78,8 +77,6 @@ import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.TxRollbackBody; -import org.apache.qpid.framing.TxRollbackOkBody; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.AMQBindingURL; @@ -183,14 +180,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * keeps a record of subscriptions which have been created in the current instance. It does not remember * subscriptions between executions of the client. */ - private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = + protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); /** * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked * up in the {@link #_subscriptions} map. */ - private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = + protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = new ConcurrentHashMap<BasicMessageConsumer, String>(); /** @@ -271,10 +268,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess protected final boolean _immediatePrefetch; /** Indicates that warnings should be generated on violations of the strict AMQP. */ - private final boolean _strictAMQP; + protected final boolean _strictAMQP; /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */ - private final boolean _strictAMQPFATAL; + protected final boolean _strictAMQPFATAL; private final Object _messageDeliveryLock = new Object(); /** @@ -459,8 +456,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { if (_logger.isInfoEnabled()) { - _logger.info("Closing session: " + this + ":" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + _logger.info("Closing session: " + this );//+ ":" + // + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); } synchronized (_messageDeliveryLock) @@ -673,6 +670,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess false, false); } + public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException + { + checkValidDestination(destination); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null, + false, false); + } + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); @@ -723,70 +728,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess false); } - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException - { - - checkNotClosed(); - AMQTopic origTopic = checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - TopicSubscriberAdaptor subscriber = _subscriptions.get(name); - if (subscriber != null) - { - if (subscriber.getTopic().equals(topic)) - { - throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " - + name); - } - else - { - unsubscribe(name); - } - } - else - { - AMQShortString topicName; - if (topic instanceof AMQTopic) - { - topicName = ((AMQTopic) topic).getRoutingKey(); - } - else - { - topicName = new AMQShortString(topic.getTopicName()); - } - - if (_strictAMQP) - { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); - } - - deleteQueue(dest.getAMQQueueName()); - } - else - { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) - { - deleteQueue(dest.getAMQQueueName()); - } - } - } - - subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - - return subscriber; - } + public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException; /** Note, currently this does not handle reuse of the same name with different topics correctly. */ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) @@ -1800,7 +1742,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /* * I could have combined the last 3 methods, but this way it improves readability */ - private AMQTopic checkValidTopic(Topic topic) throws JMSException + protected AMQTopic checkValidTopic(Topic topic) throws JMSException { if (topic == null) { @@ -2060,7 +2002,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @todo Be aware of possible changes to parameter order as versions change. */ - private void deleteQueue(final AMQShortString queueName) throws JMSException + protected void deleteQueue(final AMQShortString queueName) throws JMSException { try { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index a63d94b4ca..3ffe92d139 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -38,14 +38,11 @@ import org.apache.qpidity.transport.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.JMSException; -import javax.jms.Destination; -import javax.jms.TemporaryQueue; +import javax.jms.*; +import javax.jms.IllegalStateException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.UUID; import java.util.Map; -import java.util.HashMap; - /** * This is a 0.10 Session */ @@ -146,6 +143,25 @@ public class AMQSession_0_10 extends AMQSession //------- overwritten methods of class AMQSession + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) + throws JMSException + { + checkNotClosed(); + checkValidTopic(topic); + if( _subscriptions.containsKey(name)) + { + _subscriptions.get(name).close(); + } + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); + TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + + return subscriber; + } + /** * Acknowledge one or many messages. * @@ -362,6 +378,14 @@ public class AMQSession_0_10 extends AMQSession getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. + if(consumer.isStrated()) + { + // set the flow + getQpidSession().messageFlow(consumer.getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, + AMQSession_0_10.MAX_PREFETCH); + + } getQpidSession().sync(); getCurrentException(); } @@ -462,11 +486,11 @@ public class AMQSession_0_10 extends AMQSession //only set if msg list is null try { - if (consumer.getMessageListener() != null) - { + // if (consumer.getMessageListener() != null) + // { getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, MAX_PREFETCH); - } + // } getQpidSession() .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); } @@ -579,8 +603,7 @@ public class AMQSession_0_10 extends AMQSession void start() throws AMQException { - - super.suspendChannel(false); + suspendChannel(false); for(BasicMessageConsumer c: _consumers.values()) { c.start(); @@ -601,7 +624,7 @@ public class AMQSession_0_10 extends AMQSession } } - synchronized void startDistpatcherIfNecessary() + synchronized void startDistpatcherIfNecessary() { // If IMMEDIATE_PREFETCH is not set then we need to start fetching if (!_immediatePrefetch) @@ -622,4 +645,71 @@ public class AMQSession_0_10 extends AMQSession startDistpatcherIfNecessary(false); } + + + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + { + + checkNotClosed(); + AMQTopic origTopic=checkValidTopic(topic); + AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection); + + TopicSubscriberAdaptor subscriber=_subscriptions.get(name); + if (subscriber != null) + { + if (subscriber.getTopic().equals(topic)) + { + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + + name); + } + else + { + unsubscribe(name); + } + } + else + { + AMQShortString topicName; + if (topic instanceof AMQTopic) + { + topicName=((AMQTopic) topic).getRoutingKey(); + } + else + { + topicName=new AMQShortString(topic.getTopicName()); + } + + if (_strictAMQP) + { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " + + "for creation durableSubscriber. Requesting queue deletion regardless."); + } + + deleteQueue(dest.getAMQQueueName()); + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + { + deleteQueue(dest.getAMQQueueName()); + } + } + } + + subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); + + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + + return subscriber; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 8740410bea..0e5786da1e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,9 +21,8 @@ package org.apache.qpid.client; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.TemporaryQueue; +import javax.jms.*; +import javax.jms.IllegalStateException; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; @@ -333,4 +332,70 @@ public class AMQSession_0_8 extends AMQSession return new AMQTemporaryQueue(this); } + + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + { + + checkNotClosed(); + AMQTopic origTopic = checkValidTopic(topic); + AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + if (subscriber != null) + { + if (subscriber.getTopic().equals(topic)) + { + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + + name); + } + else + { + unsubscribe(name); + } + } + else + { + AMQShortString topicName; + if (topic instanceof AMQTopic) + { + topicName = ((AMQTopic) topic).getRoutingKey(); + } + else + { + topicName = new AMQShortString(topic.getTopicName()); + } + + if (_strictAMQP) + { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " + + "for creation durableSubscriber. Requesting queue deletion regardless."); + } + + deleteQueue(dest.getAMQQueueName()); + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + { + deleteQueue(dest.getAMQQueueName()); + } + } + } + + subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + + return subscriber; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 423a323d43..e2ae18a21f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -71,6 +71,13 @@ public class AMQTopic extends AMQDestination implements Topic queueName, isDurable); } + protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName, boolean isDurable) + { + super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable ); + } + + public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) throws JMSException { @@ -79,6 +86,13 @@ public class AMQTopic extends AMQDestination implements Topic true); } + public static AMQTopic createDurable010Topic(AMQTopic topic, String subscriptionName, AMQConnection connection) + throws JMSException + { + return new AMQTopic(topic.getExchangeName(), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false, + getDurableTopicQueueName(subscriptionName, connection), false); + } + public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException { return new AMQShortString(connection.getClientID() + ":" + subscriptionName); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index ba31a6102f..d9d91f1ebe 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -385,7 +385,23 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } } - public abstract Object getMessageFromQueue(long l) throws InterruptedException; + public Object getMessageFromQueue(long l) throws InterruptedException + { + Object o; + if (l > 0) + { + o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); + } + else if (l < 0) + { + o = _synchronousQueue.poll(); + } + else + { + o = _synchronousQueue.take(); + } + return o; + } private boolean closeOnAutoClose() throws JMSException { @@ -979,6 +995,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me // do nothing as this is a 0_10 feature } + public boolean isStrated() + { + // do nothing as this is a 0_10 feature + return false; + } + public AMQShortString getQueuename() { return _queuename; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 80b63c75c8..8828f3553f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -19,10 +19,7 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.UnprocessedMessage_0_10; +import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; @@ -41,7 +38,6 @@ import javax.jms.JMSException; import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -50,12 +46,7 @@ import java.util.concurrent.atomic.AtomicLong; public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer> implements org.apache.qpidity.nclient.util.MessageListener { - /** - * A counter for keeping the number of available messages for this consumer - */ - private final AtomicLong _messageCounter = new AtomicLong(0); - - /** + /** * Number of received message so far */ private final AtomicLong _messagesReceived = new AtomicLong(0); @@ -117,11 +108,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // ----- Interface org.apache.qpidity.client.util.MessageListener /** + * + * This is invoked by the session thread when emptying the session message queue. + * We first check if the message is valid (match the selector) and then deliver it to the + * message listener or to the sync consumer queue. + * * @param jmsMessage this message has already been processed so can't redo preDeliver * @param channelId */ public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId) { + _messagesReceived.incrementAndGet(); boolean messageOk = false; try { @@ -136,12 +133,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } catch (Exception e1) { - // the receiver may be waiting for a message - if (_messageCounter.get() >= 0) - { - _messageCounter.decrementAndGet(); - _synchronousQueue.add(new NullTocken()); - } // we should silently log thie exception as it only hanppens when the connection is closed _logger.error("Exception when receiving message", e1); } @@ -152,20 +143,28 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } } - public void onMessage(Message message) + /** + * Require more credit for this consumer + */ + private void requireMoreCreditIfNecessary() { - if (isMessageListenerSet()) + if (_isStarted && _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH) { - _messagesReceived.incrementAndGet(); - if (_messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH) - { - // require more credit - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, - AMQSession_0_10.MAX_PREFETCH); - _messagesReceived.set(0); - } + // require more credit + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + Session.MESSAGE_FLOW_UNIT_MESSAGE, + AMQSession_0_10.MAX_PREFETCH); + _messagesReceived.set(0); } + } + + /** + * This method is invoked by the transport layer when a message is delivered for this + * consumer. The message is transformed and pass to the session. + * @param message an 0.10 message + */ + public void onMessage(Message message) + { int channelId = getSession().getChannelId(); long deliveryId = message.getMessageTransferId(); String consumerTag = getConsumerTag().toString(); @@ -207,8 +206,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By newMessage.setReplyToURL(replyToUrl); } newMessage.setContentHeader(headers); - // increase the counter of messages - _messageCounter.incrementAndGet(); getSession().messageReceived(newMessage); // else ignore this message } @@ -242,10 +239,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { // notify the session ((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag()); + if (isMessageListenerSet()) + { + requireMoreCreditIfNecessary(); + } + else if (_synchronousQueue.isEmpty()) + { + requireMoreCreditIfNecessary(); + } //if (!Boolean.getBoolean("noAck")) //{ super.postDeliver(msg); //} + + } void notifyMessage(UnprocessedMessage messageFrame, int channelId) @@ -351,50 +358,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } messageOk = acquireMessage(message); } - if (!messageOk) - { - requestCreditIfCreditMode(); - } return messageOk; } - private void requestCreditIfCreditMode() - { - try - { - // the current message received is not good, so we need to get a message. - if (getMessageListener() == null) - { - int oldval = _messageCounter.intValue(); - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, - 1); - _0_10session.getQpidSession() - .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); - _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); - _0_10session.getQpidSession().sync(); - _0_10session.getQpidSession() - .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); - if (_messageCounter.intValue() <= oldval) - { - // we haven't received a message so tell the receiver to return null - _synchronousQueue.add(new NullTocken()); - } - else - { - _messageCounter.decrementAndGet(); - } - } - // we now need to check if we have received a message - - } - catch (Exception e) - { - _logger.error( - "Error getting message listener, couldn't request credit after releasing a message that failed the selector test", - e); - } - } /** * Acknowledge a message @@ -469,16 +435,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By super.setMessageListener(messageListener); if (messageListener == null) { - _0_10session.getQpidSession().messageStop(getConsumerTag().toString()); + /* _0_10session.getQpidSession().messageStop(getConsumerTag().toString()); _0_10session.getQpidSession() .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); _0_10session.getQpidSession().sync(); + */ } else { + //TODO: empty the list of sync messages. if (_connection.started()) { _0_10session.getQpidSession() @@ -491,65 +459,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By 0xFFFFFFFF); _0_10session.getQpidSession().sync(); _messagesReceived.set(0); - ; } } } - public Object getMessageFromQueue(long l) throws InterruptedException + public boolean isStrated() { - if (!_isStarted) - { - return null; - } - Object o; - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); - - if (l == 0) - { - o = _synchronousQueue.take(); - } - else - { - if (l > 0) - { - o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); - } - else - { - o = _synchronousQueue.poll(); - } - if (o == null) - { - _logger.debug("Message Didn't arrive in time, checking if one is inflight"); - // checking if one is inflight - _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); - _0_10session.getQpidSession().sync(); - _0_10session.getQpidSession() - .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); - if (_messageCounter.get() > 0) - { - o = _synchronousQueue.take(); - } - } - } - if (o instanceof NullTocken) - { - o = null; - } - return o; - } - - protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException - { - _messageCounter.decrementAndGet(); - super.preApplicationProcessing(jmsMsg); - } - - private class NullTocken - { - + return _isStarted; } public void start() diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 6cf6918634..1bc9e4d855 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -86,22 +86,5 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); } - - public Object getMessageFromQueue(long l) throws InterruptedException - { - Object o; - if (l > 0) - { - o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); - } - else if (l < 0) - { - o = _synchronousQueue.poll(); - } - else - { - o = _synchronousQueue.take(); - } - return o; - } + }
\ No newline at end of file |
