From f7de2fde2134635dcf97eb966781746b669bdad0 Mon Sep 17 00:00:00 2001 From: Marnie McCormack Date: Fri, 3 Nov 2006 10:02:56 +0000 Subject: Changes to fix QueueReceiver ClassCastException being thrown by methods in AMQSession which should return this interface. As per QPID-58. Added QueueReceiverAdaptor class now used in AMQSession. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@470742 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/org/apache/qpid/client/AMQSession.java | 76 ++++++++++++++++++-- .../apache/qpid/client/BasicMessageConsumer.java | 2 +- .../apache/qpid/client/QueueReceiverAdaptor.java | 83 ++++++++++++++++++++++ 3 files changed, 153 insertions(+), 8 deletions(-) create mode 100644 java/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java (limited to 'java/client/src') diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java index 8b2c2ec04d..dd868f9dba 100644 --- a/java/client/src/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/org/apache/qpid/client/AMQSession.java @@ -169,7 +169,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String reason = message.bounceBody.replyText; _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - //Todo should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. if (errorCode == AMQConstant.NO_CONSUMERS.getCode()) { _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); @@ -380,7 +380,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (AMQException e) { - throw new JMSException("Unable to create message: " + e); + throw new JMSException("Unable to create text message: " + e); } } } @@ -398,7 +398,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (AMQException e) { - throw new JMSException("Unable to create message: " + e); + throw new JMSException("Unable to create text message: " + e); } } } @@ -718,6 +718,34 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi }.execute(_connection); } + /** + * Creates a QueueReceiver + * @param destination + * @return QueueReceiver - a wrapper around our MessageConsumer + * @throws JMSException + */ + public QueueReceiver createQueueReceiver(Destination destination) throws JMSException + { + AMQQueue dest = (AMQQueue) destination; + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); + return new QueueReceiverAdaptor(dest, consumer); + } + + /** + * Creates a QueueReceiver using a message selector + * @param destination + * @param messageSelector + * @return QueueReceiver - a wrapper around our MessageConsumer + * @throws JMSException + */ + public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException + { + AMQQueue dest = (AMQQueue) destination; + BasicMessageConsumer consumer = (BasicMessageConsumer) + createConsumer(destination, messageSelector); + return new QueueReceiverAdaptor(dest, consumer); + } + public MessageConsumer createConsumer(Destination destination) throws JMSException { return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null); @@ -924,14 +952,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + /** + * Creates a QueueReceiver wrapping a MessageConsumer + * @param queue + * @return QueueReceiver + * @throws JMSException + */ public QueueReceiver createReceiver(Queue queue) throws JMSException { - return (QueueReceiver) createConsumer(queue); + AMQQueue dest = (AMQQueue) queue; + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); + return new QueueReceiverAdaptor(dest, consumer); } + /** + * Creates a QueueReceiver wrapping a MessageConsumer using a message selector + * @param queue + * @param messageSelector + * @return QueueReceiver + * @throws JMSException + */ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { - return (QueueReceiver) createConsumer(queue, messageSelector); + AMQQueue dest = (AMQQueue) queue; + BasicMessageConsumer consumer = (BasicMessageConsumer) + createConsumer(dest, messageSelector); + return new QueueReceiverAdaptor(dest, consumer); } public QueueSender createSender(Queue queue) throws JMSException @@ -961,14 +1007,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + /** + * Creates a non-durable subscriber + * @param topic + * @return TopicSubscriber - a wrapper round our MessageConsumer + * @throws JMSException + */ public TopicSubscriber createSubscriber(Topic topic) throws JMSException { - return (TopicSubscriber) createConsumer(topic); + AMQTopic dest = new AMQTopic(topic.getTopicName()); + return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } + /** + * Creates a non-durable subscriber with a message selector + * @param topic + * @param messageSelector + * @param noLocal + * @return TopicSubscriber - a wrapper round our MessageConsumer + * @throws JMSException + */ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { - return (TopicSubscriber) createConsumer(topic, messageSelector, noLocal); + AMQTopic dest = new AMQTopic(topic.getTopicName()); + return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } /** diff --git a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java index a6f89fd221..51d7337457 100644 --- a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java @@ -129,7 +129,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private boolean _dups_ok_acknowledge_send; - BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode) diff --git a/java/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java b/java/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java new file mode 100644 index 0000000000..fc04181e19 --- /dev/null +++ b/java/client/src/org/apache/qpid/client/QueueReceiverAdaptor.java @@ -0,0 +1,83 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client; + +import javax.jms.*; + +/** + * Class that wraps a MessageConsumer for backwards JMS compatibility + * Returned by methods in AMQSession etc + */ +public class QueueReceiverAdaptor implements QueueReceiver { + + protected MessageConsumer _consumer; + protected Queue _queue; + + protected QueueReceiverAdaptor(Queue queue, MessageConsumer consumer) + { + _consumer = consumer; + _queue = queue; + } + + public String getMessageSelector() throws JMSException + { + return _consumer.getMessageSelector(); + } + + public MessageListener getMessageListener() throws JMSException + { + return _consumer.getMessageListener(); + } + + public void setMessageListener(MessageListener messageListener) throws JMSException + { + _consumer.setMessageListener(messageListener); + } + + public Message receive() throws JMSException + { + return _consumer.receive(); + } + + public Message receive(long l) throws JMSException + { + return _consumer.receive(l); + } + + public Message receiveNoWait() throws JMSException + { + return _consumer.receiveNoWait(); + } + + public void close() throws JMSException + { + _consumer.close(); + } + + /** + * Return the queue associated with this receiver + * @return + * @throws JMSException + */ + public Queue getQueue() throws JMSException + { + return _queue; + } + + +} -- cgit v1.2.1