From 258b72de02ef8940262b82eb4419463c4a69fc74 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Fri, 8 Aug 2008 18:31:18 +0000 Subject: QPID-1213: simplified unprocessed message and moved version specific code into the _0_8 and _0_10 variants git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@684036 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 225 ++++++++------------- .../org/apache/qpid/client/AMQSession_0_10.java | 39 ++-- .../org/apache/qpid/client/AMQSession_0_8.java | 84 +++++++- .../apache/qpid/client/BasicMessageConsumer.java | 32 ++- .../qpid/client/BasicMessageConsumer_0_10.java | 129 ++++-------- .../qpid/client/BasicMessageConsumer_0_8.java | 20 +- .../qpid/client/BasicMessageProducer_0_10.java | 87 ++------ .../apache/qpid/client/TopicSubscriberAdaptor.java | 12 +- .../client/handler/BasicDeliverMethodHandler.java | 5 +- .../client/handler/BasicReturnMethodHandler.java | 4 +- .../client/message/AMQMessageDelegate_0_10.java | 113 ++++++++--- .../qpid/client/message/AbstractJMSMessage.java | 48 ----- .../client/message/AbstractJMSMessageFactory.java | 14 +- .../qpid/client/message/CloseConsumerMessage.java | 43 ++++ .../apache/qpid/client/message/JMSTextMessage.java | 11 - .../apache/qpid/client/message/MessageFactory.java | 2 +- .../client/message/MessageFactoryRegistry.java | 19 +- .../apache/qpid/client/message/ReturnMessage.java | 4 +- .../qpid/client/message/UnprocessedMessage.java | 221 +------------------- .../client/message/UnprocessedMessage_0_10.java | 61 +----- .../client/message/UnprocessedMessage_0_8.java | 41 +++- .../qpid/client/protocol/AMQProtocolSession.java | 19 +- .../client/util/FlowControllingBlockingQueue.java | 23 ++- .../apache/qpid/nclient/MessagePartListener.java | 27 +-- .../qpid/nclient/impl/ClientSessionDelegate.java | 10 +- .../qpid/nclient/util/ByteBufferMessage.java | 43 ++-- .../nclient/util/MessagePartListenerAdapter.java | 44 ++-- .../qpid/test/unit/message/TestAMQSession.java | 8 +- 28 files changed, 546 insertions(+), 842 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 723e502ff0..5203a27f42 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -63,7 +63,6 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; -import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -78,7 +77,6 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,20 +97,20 @@ import org.slf4j.LoggerFactory; * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this, * after looking at worse bottlenecks first. */ -public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession +public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap + public static final class IdToConsumerMap { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentHashMap _slowAccessConsumers = new ConcurrentHashMap(); + private final ConcurrentHashMap _slowAccessConsumers = new ConcurrentHashMap(); - public BasicMessageConsumer get(int id) + public C get(int id) { if ((id & 0xFFFFFFF0) == 0) { - return _fastAccessConsumers[id]; + return (C) _fastAccessConsumers[id]; } else { @@ -120,12 +118,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public BasicMessageConsumer put(int id, BasicMessageConsumer consumer) + public C put(int id, C consumer) { - BasicMessageConsumer oldVal; + C oldVal; if ((id & 0xFFFFFFF0) == 0) { - oldVal = _fastAccessConsumers[id]; + oldVal = (C) _fastAccessConsumers[id]; _fastAccessConsumers[id] = consumer; } else @@ -137,12 +135,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public BasicMessageConsumer remove(int id) + public C remove(int id) { - BasicMessageConsumer consumer; + C consumer; if ((id & 0xFFFFFFF0) == 0) { - consumer = _fastAccessConsumers[id]; + consumer = (C) _fastAccessConsumers[id]; _fastAccessConsumers[id] = null; } else @@ -154,15 +152,15 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public Collection values() + public Collection values() { - ArrayList values = new ArrayList(); + ArrayList values = new ArrayList(); for (int i = 0; i < 16; i++) { if (_fastAccessConsumers[i] != null) { - values.add(_fastAccessConsumers[i]); + values.add((C) _fastAccessConsumers[i]); } } values.addAll(_slowAccessConsumers.values()); @@ -183,8 +181,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); - /** Used for debugging in the dispatcher. */ - private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class); /** The default maximum number of prefetched message at which to suspend the channel. */ public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000; @@ -234,7 +230,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /** Holds this session unique identifier, used to distinguish it from other sessions. */ protected int _channelId; - /** @todo This does not appear to be set? */ private int _ticket; /** Holds the high mark for prefetched message, at which the session is suspended. */ @@ -261,8 +256,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked * up in the {@link #_subscriptions} map. */ - protected final ConcurrentHashMap _reverseSubscriptionMap = - new ConcurrentHashMap(); + protected final ConcurrentHashMap _reverseSubscriptionMap = + new ConcurrentHashMap(); /** * Used to hold incoming messages. @@ -299,7 +294,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right * consumer. */ - protected final IdToConsumerMap _consumers = new IdToConsumerMap(); + protected final IdToConsumerMap _consumers = new IdToConsumerMap(); //Map _consumers = //new ConcurrentHashMap(); @@ -308,7 +303,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Contains a list of consumers which have been removed but which might still have * messages to acknowledge, eg in client ack or transacted modes */ - private CopyOnWriteArrayList _removedConsumers = new CopyOnWriteArrayList(); + private CopyOnWriteArrayList _removedConsumers = new CopyOnWriteArrayList(); /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ private ConcurrentHashMap _destinationConsumerCount = @@ -584,7 +579,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException + public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException { if (consumer.getQueuename() != null) { @@ -755,11 +750,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract void sendCommit() throws AMQException, FailoverException; - public void confirmConsumerCancelled(AMQShortString consumerTag) + + public void confirmConsumerCancelled(int consumerTag) { // Remove the consumer from the map - BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue()); + C consumer = _consumers.get(consumerTag); if (consumer != null) { if (!consumer.isNoConsume()) // Normal Consumer @@ -801,7 +797,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } else { - _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer)); + _queue.add(new CloseConsumerMessage(consumer)); } } } @@ -848,7 +844,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess false, false); } - public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException + public C createExclusiveConsumer(Destination destination) throws JMSException { checkValidDestination(destination); @@ -927,8 +923,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _subscriptions.get(name).close(); } AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + C consumer = (C) createConsumer(dest, messageSelector, noLocal); + TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); _subscriptions.put(name, subscriber); _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); @@ -960,23 +956,23 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return msg; } - public BasicMessageProducer createProducer(Destination destination) throws JMSException + public P createProducer(Destination destination) throws JMSException { return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); } - public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException + public P createProducer(Destination destination, boolean immediate) throws JMSException { return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) + public P createProducer(Destination destination, boolean mandatory, boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate); } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate, + public P createProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException { return createProducerImpl(destination, mandatory, immediate, waitUntilSent); @@ -986,7 +982,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); - return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic, false, false), topic); + return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic); } public Queue createQueue(String queueName) throws JMSException @@ -1074,7 +1070,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); + C consumer = (C) createConsumer(destination); return new QueueReceiverAdaptor(dest, consumer); } @@ -1093,7 +1089,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector); + C consumer = (C) createConsumer(destination, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } @@ -1111,7 +1107,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); + C consumer = (C) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); } @@ -1130,7 +1126,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector); + C consumer = (C) createConsumer(dest, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } @@ -1175,7 +1171,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); + return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); } /** @@ -1195,7 +1191,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal)); + return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal)); } public abstract TemporaryQueue createTemporaryQueue() throws JMSException; @@ -1369,17 +1365,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { _logger.debug("Message[" + message.toString() + "] received in session"); } - - if (message instanceof ReturnMessage) - { - // Return of the bounced message. - returnBouncedMessage((ReturnMessage) message); - } - else - { - _highestDeliveryTag.set(message.getDeliveryTag()); - _queue.add(message); - } + _highestDeliveryTag.set(message.getDeliveryTag()); + _queue.add(message); } public void declareAndBind(AMQDestination amqd) @@ -1618,7 +1605,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh, + protected C createConsumerImpl(final Destination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { @@ -1642,10 +1629,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess messageSelector = selector; } - return new FailoverRetrySupport( - new FailoverProtectedOperation() + return new FailoverRetrySupport( + new FailoverProtectedOperation() { - public MessageConsumer execute() throws JMSException, FailoverException + public C execute() throws JMSException, FailoverException { checkNotClosed(); @@ -1668,7 +1655,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector); } - BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, + C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, noLocal, exclusive, messageSelector, ft, noConsume, autoClose); if (_messageListener != null) @@ -1712,7 +1699,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, + public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments, final boolean noConsume, final boolean autoClose) throws JMSException; @@ -1722,9 +1709,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @param consumer the consum */ - void deregisterConsumer(BasicMessageConsumer consumer) + void deregisterConsumer(C consumer) { - if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null) + if (_consumers.remove(consumer.getConsumerTag()) != null) { String subscriptionName = _reverseSubscriptionMap.remove(consumer); if (subscriptionName != null) @@ -2012,12 +1999,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception - final ArrayList clonedConsumers = new ArrayList(_consumers.values()); + final ArrayList clonedConsumers = new ArrayList(_consumers.values()); - final Iterator it = clonedConsumers.iterator(); + final Iterator it = clonedConsumers.iterator(); while (it.hasNext()) { - final BasicMessageConsumer con = it.next(); + final C con = it.next(); if (error != null) { con.notifyError(error); @@ -2048,7 +2035,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess final Iterator it = clonedProducers.iterator(); while (it.hasNext()) { - final BasicMessageProducer prod = (BasicMessageProducer) it.next(); + final P prod = (P) it.next(); prod.close(); } // at this point the _producers map is empty @@ -2096,20 +2083,18 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @param queueName */ - private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, + private void consumeFromQueue(C consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { int tagId = _nextTag++; - // need to generate a consumer tag on the client so we can exploit the nowait flag - AMQShortString tag = new AMQShortString(Integer.toString(tagId)); - consumer.setConsumerTag(tag); + consumer.setConsumerTag(tagId); // we must register the consumer in the map before we actually start listening _consumers.put(tagId, consumer); try { - sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tag); + sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId); } catch (AMQException e) { @@ -2119,26 +2104,26 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException; + public abstract void sendConsume(C consumer, AMQShortString queueName, + AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException; - private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) + private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate, false); } - private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, + private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent) throws JMSException { - return new FailoverRetrySupport( - new FailoverProtectedOperation() + return new FailoverRetrySupport( + new FailoverProtectedOperation() { - public BasicMessageProducer execute() throws JMSException, FailoverException + public P execute() throws JMSException, FailoverException { checkNotClosed(); long producerId = getNextProducerId(); - BasicMessageProducer producer = createMessageProducer(destination, mandatory, + P producer = createMessageProducer(destination, mandatory, immediate, waitUntilSent, producerId); registerProducer(producerId, producer); @@ -2147,7 +2132,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, + public abstract P createMessageProducer(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent, long producerId); private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException @@ -2320,12 +2305,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception - final ArrayList clonedConsumers = new ArrayList(_consumers.values()); + final ArrayList clonedConsumers = new ArrayList(_consumers.values()); - final Iterator it = clonedConsumers.iterator(); + final Iterator it = clonedConsumers.iterator(); while (it.hasNext()) { - final BasicMessageConsumer con = it.next(); + final C con = it.next(); con.markClosed(); } // at this point the _consumers map will be empty @@ -2360,7 +2345,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @throws AMQException */ - private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException + private void registerConsumer(C consumer, boolean nowait) throws AMQException // , FailoverException { AMQDestination amqd = consumer.getDestination(); @@ -2424,15 +2409,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private void rejectAllMessages(boolean requeue) { - rejectMessagesForConsumerTag(null, requeue); + rejectMessagesForConsumerTag(0, requeue, true); } /** * @param consumerTag The consumerTag to prune from queue or all if null * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) + * @param rejectAllConsumers */ - private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) + private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers) { Iterator messages = _queue.iterator(); if (_logger.isInfoEnabled()) @@ -2453,7 +2439,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { UnprocessedMessage message = (UnprocessedMessage) messages.next(); - if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag.toString())) + if (rejectAllConsumers || (message.getConsumerTag() == consumerTag)) { if (_logger.isDebugEnabled()) { @@ -2475,12 +2461,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private void resubscribeConsumers() throws AMQException { - ArrayList consumers = new ArrayList(_consumers.values()); + ArrayList consumers = new ArrayList(_consumers.values()); _consumers.clear(); - for (Iterator it = consumers.iterator(); it.hasNext();) - { - BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); + for (C consumer : consumers) + { consumer.failedOver(); registerConsumer(consumer, true); } @@ -2492,53 +2477,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey for (Iterator it = producers.iterator(); it.hasNext();) { - BasicMessageProducer producer = (BasicMessageProducer) it.next(); + P producer = (P) it.next(); producer.resubscribe(); } } - private void returnBouncedMessage(final ReturnMessage msg) - { - _connection.performConnectionTask(new Runnable() - { - public void run() - { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = - _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); - AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); - AMQShortString reason = msg.getReplyText(); - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); - } - else - { - _connection.exceptionReceived( - new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); - } - - } - catch (Exception e) - { - _logger.error( - "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", - e); - } - } - }); - } - /** * Suspends or unsuspends this session. * @@ -2641,6 +2584,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } + /** Used for debugging in the dispatcher. */ + private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher"); + + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ class Dispatcher extends Thread { @@ -2652,6 +2599,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private final AtomicLong _rollbackMark = new AtomicLong(-1); private String dispatcherID = "" + System.identityHashCode(this); + + public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -2670,7 +2619,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public void rejectPending(BasicMessageConsumer consumer) + public void rejectPending(C consumer) { synchronized (_lock) { @@ -2685,7 +2634,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess consumer.rollbackPendingMessages(); // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); //Let the dispatcher deal with this when it gets to them. // closeConsumer @@ -2712,7 +2661,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); - for (BasicMessageConsumer consumer : _consumers.values()) + for (C consumer : _consumers.values()) { if (!consumer.isNoConsume()) { @@ -2778,7 +2727,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _lock.wait(); } - if (!(message instanceof UnprocessedMessage.CloseConsumerMessage) + if (!(message instanceof CloseConsumerMessage) && tagLE(deliveryTag, _rollbackMark.get())) { rejectMessage(message, true); @@ -2840,8 +2789,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess //This if block is not needed anymore as bounce messages are handled separately //if (message.getDeliverBody() != null) //{ - final BasicMessageConsumer consumer = - _consumers.get(message.getConsumerTag().toIntValue()); + final C consumer = + _consumers.get(message.getConsumerTag()); if ((consumer == null) || consumer.isClosed()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 32a0fdd5f5..aa0ff66545 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -52,7 +52,7 @@ import java.util.Map; /** * This is a 0.10 Session */ -public class AMQSession_0_10 extends AMQSession +public class AMQSession_0_10 extends AMQSession { /** @@ -345,7 +345,7 @@ public class AMQSession_0_10 extends AMQSession /** * Create an 0_10 message consumer */ - public BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, + public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft, final boolean noConsume, @@ -406,8 +406,8 @@ public class AMQSession_0_10 extends AMQSession * This method is invoked when a consumer is creted * Registers the consumer with the broker */ - public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, String messageSelector, AMQShortString tag) + public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, + boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException { boolean preAcquire; @@ -416,10 +416,10 @@ public class AMQSession_0_10 extends AMQSession preAcquire = ( ! consumer.isNoConsume() && (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) ) || !(consumer.getDestination() instanceof AMQQueue); - getQpidSession().messageSubscribe(queueName.toString(), tag.toString(), + getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED, preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, - new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, + (BasicMessageConsumer_0_10) consumer, null, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } catch (JMSException e) @@ -427,21 +427,23 @@ public class AMQSession_0_10 extends AMQSession throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e); } + String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); + if (! prefetch()) { - getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT); + getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT); } else { - getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.WINDOW); + getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW); } - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. // only if not immediat prefetch if(prefetch() && (consumer.isStrated() || _immediatePrefetch)) { // set the flow - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, getAMQConnection().getMaxPrefetch()); } @@ -452,7 +454,7 @@ public class AMQSession_0_10 extends AMQSession /** * Create an 0_10 message producer */ - public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, + public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent, long producerId) { @@ -542,13 +544,14 @@ public class AMQSession_0_10 extends AMQSession { for (BasicMessageConsumer consumer : _consumers.values()) { - getQpidSession().messageStop(consumer.getConsumerTag().toString()); + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag())); } } else { - for (BasicMessageConsumer consumer : _consumers.values()) + for (BasicMessageConsumer_0_10 consumer : _consumers.values()) { + String consumerTag = String.valueOf(consumer.getConsumerTag()); //only set if msg list is null try { @@ -556,18 +559,18 @@ public class AMQSession_0_10 extends AMQSession { if (consumer.getMessageListener() != null) { - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, 1); } } else { getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE, + .messageFlow(consumerTag, MessageCreditUnit.MESSAGE, getAMQConnection().getMaxPrefetch()); } getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); + .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); } catch (Exception e) { @@ -715,7 +718,7 @@ public class AMQSession_0_10 extends AMQSession AMQTopic origTopic=checkValidTopic(topic); AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection); - TopicSubscriberAdaptor subscriber=_subscriptions.get(name); + TopicSubscriberAdaptor subscriber=_subscriptions.get(name); if (subscriber != null) { if (subscriber.getTopic().equals(topic)) @@ -766,7 +769,7 @@ public class AMQSession_0_10 extends AMQSession } } - subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); + subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest)); _subscriptions.put(name, subscriber); _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 2c88d6f557..2442b157f1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -25,10 +25,11 @@ import javax.jms.*; import javax.jms.IllegalStateException; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.*; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; @@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory; import java.util.Map; -public final class AMQSession_0_8 extends AMQSession +public final class AMQSession_0_8 extends AMQSession { /** Used for debugging. */ @@ -218,6 +219,7 @@ public final class AMQSession_0_8 extends AMQSession return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName()); } + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws JMSException { @@ -245,10 +247,14 @@ public final class AMQSession_0_8 extends AMQSession { throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); } - } - - public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, - String messageSelector, AMQShortString tag) throws AMQException, FailoverException + } + + @Override public void sendConsume(BasicMessageConsumer_0_8 consumer, + AMQShortString queueName, + AMQProtocolHandler protocolHandler, + boolean nowait, + String messageSelector, + int tag) throws AMQException, FailoverException { FieldTable arguments = FieldTableFactory.newFieldTable(); if ((messageSelector != null) && !messageSelector.equals("")) @@ -268,7 +274,7 @@ public final class AMQSession_0_8 extends AMQSession BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, - tag, + new AMQShortString(String.valueOf(tag)), consumer.isNoLocal(), consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, consumer.isExclusive(), @@ -337,7 +343,7 @@ public final class AMQSession_0_8 extends AMQSession } - public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, + public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent, long producerId) { @@ -345,6 +351,66 @@ public final class AMQSession_0_8 extends AMQSession this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); } + + @Override public void messageReceived(UnprocessedMessage message) + { + + if (message instanceof ReturnMessage) + { + // Return of the bounced message. + returnBouncedMessage((ReturnMessage) message); + } + else + { + super.messageReceived(message); + } + } + + private void returnBouncedMessage(final ReturnMessage msg) + { + _connection.performConnectionTask(new Runnable() + { + public void run() + { + try + { + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = + _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), + msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); + AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); + AMQShortString reason = msg.getReplyText(); + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) + { + _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); + } + else + { + _connection.exceptionReceived( + new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); + } + + } + catch (Exception e) + { + _logger.error( + "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", + e); + } + } + }); + } + + + + public void sendRollback() throws AMQException, FailoverException { TxRollbackBody body = getMethodRegistry().createTxRollbackBody(); @@ -365,7 +431,7 @@ public final class AMQSession_0_8 extends AMQSession checkNotClosed(); AMQTopic origTopic = checkValidTopic(topic); AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); if (subscriber != null) { if (subscriber.getTopic().equals(topic)) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 0176f66552..01bb68c23e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -22,10 +22,7 @@ package org.apache.qpid.client; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; import org.apache.qpid.jms.MessageConsumer; @@ -49,7 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -public abstract class BasicMessageConsumer extends Closeable implements MessageConsumer +public abstract class BasicMessageConsumer extends Closeable implements MessageConsumer { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); @@ -72,7 +69,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Me private final AtomicReference _messageListener = new AtomicReference(); /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */ - protected AMQShortString _consumerTag; + protected int _consumerTag; /** We need to know the channel id when constructing frames */ protected final int _channelId; @@ -517,7 +514,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Me throw e; } - else if (o instanceof UnprocessedMessage.CloseConsumerMessage) + else if (o instanceof CloseConsumerMessage) { _closed.set(true); deregisterConsumer(); @@ -635,7 +632,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Me * @param closeMessage * this message signals that we should close the browser */ - public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage) + public void notifyCloseMessage(CloseConsumerMessage closeMessage) { if (isMessageListenerSet()) { @@ -667,26 +664,21 @@ public abstract class BasicMessageConsumer extends Closeable implements Me * * @param messageFrame the raw unprocessed mesage */ - void notifyMessage(UnprocessedMessage messageFrame) + void notifyMessage(U messageFrame) { - if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage) + if (messageFrame instanceof CloseConsumerMessage) { - notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame); + notifyCloseMessage((CloseConsumerMessage) messageFrame); return; } - final boolean debug = _logger.isDebugEnabled(); - if (debug) - { - _logger.debug("notifyMessage called with message number " + messageFrame.getDeliveryTag()); - } try { AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame); - if (debug) + if (_logger.isDebugEnabled()) { _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); } @@ -721,7 +713,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Me } } - public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage messageFrame) + public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, U messageFrame) throws Exception; /** @param jmsMessage this message has already been processed so can't redo preDeliver */ @@ -936,12 +928,12 @@ public abstract class BasicMessageConsumer extends Closeable implements Me _session.deregisterConsumer(this); } - public AMQShortString getConsumerTag() + public int getConsumerTag() { return _consumerTag; } - public void setConsumerTag(AMQShortString consumerTag) + public void setConsumerTag(int consumerTag) { _consumerTag = consumerTag; } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 388adfb434..2a37298a43 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -22,11 +22,8 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.api.Message; import org.apache.qpid.transport.*; import org.apache.qpid.QpidException; import org.apache.qpid.filter.MessageFilter; @@ -35,16 +32,14 @@ import org.apache.qpid.filter.JMSSelectorFilter; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.MessageListener; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; /** * This is a 0.10 message consumer. */ -public class BasicMessageConsumer_0_10 extends BasicMessageConsumer - implements org.apache.qpid.nclient.util.MessageListener +public class BasicMessageConsumer_0_10 extends BasicMessageConsumer + implements org.apache.qpid.nclient.MessagePartListener { /** @@ -76,6 +71,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer:///[]/[]?