diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-07-31 11:43:58 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-07-31 11:43:58 +0000 |
| commit | b1fa69955cd435b44e8b68c473ff393ce8dd39a5 (patch) | |
| tree | 79f952935cb867583deb9932b38846418aae4d5f | |
| parent | f952f94b504de0a14abd52944adf5d74f853be68 (diff) | |
| download | qpid-python-b1fa69955cd435b44e8b68c473ff393ce8dd39a5.tar.gz | |
Implemented methods
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@561299 13f79535-47bb-0310-9956-ffa450edef68
11 files changed, 752 insertions, 57 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java index 8d7fab6216..9daed6201d 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java @@ -20,7 +20,6 @@ package org.apache.qpid.nclient.jms; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpidity.QpidException; -import org.apache.qpid.nclient.jms.QueueSessionImpl; import javax.jms.*; import javax.jms.IllegalStateException; @@ -30,7 +29,7 @@ import java.util.Vector; /** - * + * Implements javax.jms.Connection, javax.jms.QueueConnection adn javax.jms.TopicConnection */ public class ConnectionImpl implements Connection, QueueConnection, TopicConnection { @@ -104,9 +103,22 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect //---- Interface javax.jms.Connection ---// - public Session createSession(boolean b, int i) throws JMSException + /** + * Creates a Session + * + * @param transacted Indicates whether the session is transacted. + * @param acknowledgeMode ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>, + * <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>. + * @return A newly created session + * @throws JMSException If the Connection object fails to create a session due to some internal error. + */ + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { - return null; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + SessionImpl session = new SessionImpl(this, transacted, acknowledgeMode); + // add this session with the list of session that are handled by this connection + _sessions.add(session); + return session; } /** @@ -334,23 +346,60 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect return null; //To change body of implemented methods use File | Settings | File Templates. } - public ConnectionConsumer createConnectionConsumer(Queue queue, String string, ServerSessionPool serverSessionPool, - int i) throws JMSException + /** + * Creates a connection consumer for this connection (optional operation). + * This is an expert facility for App server integration. + * + * @param queue The queue to access. + * @param messageSelector Only messages with properties matching the message selector expression are delivered. + * @param sessionPool The session pool to associate with this connection consumer. + * @param maxMessages The maximum number of messages that can be assigned to a server session at one time. + * @return Null for the moment. + * @throws JMSException In case of a problem due to some internal error. + */ + public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws + JMSException { - return null; //To change body of implemented methods use File | Settings | File Templates. + return createConnectionConsumer((Destination) queue, messageSelector, sessionPool, maxMessages); } //-------------- TopicConnection API - - public TopicSession createTopicSession(boolean b, int i) throws JMSException + /** + * Create a TopicSession. + * + * @param transacted Indicates whether the session is transacted + * @param acknowledgeMode Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>, <code>Session.CLIENT_ACKNOWLEDGE</code>, and + * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. + * @return a newly created topic session + * @throws JMSException If creating the session fails due to some internal error. + */ + public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { - return null; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + TopicSessionImpl session = new TopicSessionImpl(this, transacted, acknowledgeMode); + // add the session with this Connection's sessions + // important for when the Connection is closed. + _sessions.add(session); + return session; } - public ConnectionConsumer createConnectionConsumer(Topic topic, String string, ServerSessionPool serverSessionPool, - int i) throws JMSException + /** + * Creates a connection consumer for this connection (optional operation). + * This is an expert facility for App server integration. + * + * @param topic The topic to access. + * @param messageSelector Only messages with properties matching the message selector expression are delivered. + * @param sessionPool The session pool to associate with this connection consumer. + * @param maxMessages The maximum number of messages that can be assigned to a server session at one time. + * @return Null for the moment. + * @throws JMSException In case of a problem due to some internal error. + */ + public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws + JMSException { - return null; //To change body of implemented methods use File | Settings | File Templates. + return createConnectionConsumer((Destination) topic, messageSelector, sessionPool, maxMessages); } //-------------- protected and private methods @@ -375,4 +424,13 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect } } + /** + * Provide access to the underlying qpid Connection. + * + * @return This JMS connection underlying Qpid Connection. + */ + protected org.apache.qpid.nclient.api.Connection getQpidConnection() + { + return _qpidConnection; + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java index 58d4151ba2..fc6d9df480 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java @@ -24,8 +24,8 @@ import javax.jms.JMSException; import java.util.Enumeration; /** - * A <CODE>ConnectionMetaDataImpl</CODE> object provides information describing the - * JMS <CODE>Connection</CODE>. + * Implements javax.jms.ConnectionMetaData + * A ConnectionMetaDataImpl provides information describing the JMS <code>Connection</code>. */ public class ConnectionMetaDataImpl implements ConnectionMetaData { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java index 5e2032d37d..3c60aaab72 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java @@ -50,7 +50,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer /** * The subscription name */ - private String _subscriptionName; + protected String _subscriptionName; /** * A MessageListener set up for this consumer. @@ -58,14 +58,33 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer private MessageListener _messageListener = null; //----- Constructors + /** + * Create a new MessageProducerImpl. + * + * @param session The session from which the MessageProducerImpl is instantiated + * @param destination The default destination for this MessageProducerImpl + * @param messageSelector The message selector for this QueueReceiverImpl. + * @param noLocal If true inhibits the delivery of messages published by its own connection. + * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber. + * If this value is null, a non-durable subscription is created. + * @throws JMSException If the MessageProducerImpl cannot be created due to some internal error. + */ protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector, - boolean noLocal, String subscriptionName) + boolean noLocal, String subscriptionName) throws JMSException { super(session, destination); _messageSelector = messageSelector; _noLocal = noLocal; _subscriptionName = subscriptionName; - + try + { + // TODO define the relevant options + _qpidReceiver = _session.getQpidSession().createReceiver(destination.getName(), null); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } } //----- Message consumer API @@ -112,22 +131,48 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer public void setMessageListener(MessageListener messageListener) throws JMSException { checkNotClosed(); - // create a message listener wrapper + // TODO: create a message listener wrapper } + /** + * Receive the next message produced for this message consumer. + * <P>This call blocks indefinitely until a message is produced or until this message consumer is closed. + * + * @return The next message produced for this message consumer, or + * null if this message consumer is concurrently closed + * @throws JMSException If receiving the next message fails due to some internal error. + */ public Message receive() throws JMSException { - return null; //To change body of implemented methods use File | Settings | File Templates. + return receive(0); } - public Message receive(long l) throws JMSException + /** + * Receive the next message that arrives within the specified timeout interval. + * <p> This call blocks until a message arrives, the timeout expires, or this message consumer is closed. + * <p> A timeout of zero never expires, and the call blocks indefinitely. + * <p> A timeout less than 0 throws a JMSException. + * + * @param timeout The timeout value (in milliseconds) + * @return The next message that arrives within the specified timeout interval. + * @throws JMSException If receiving the next message fails due to some internal error. + */ + public Message receive(long timeout) throws JMSException { - return null; //To change body of implemented methods use File | Settings | File Templates. + Message message = null; + // todo convert this message into a JMS one: _qpidReceiver.receive(-1); + return message; } + /** + * Receive the next message if one is immediately available. + * + * @return the next message or null if one is not available. + * @throws JMSException If receiving the next message fails due to some internal error. + */ public Message receiveNoWait() throws JMSException { - return null; //To change body of implemented methods use File | Settings | File Templates. + return receive(-1); } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java index 43e07a9dfa..454762b3e3 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java @@ -23,7 +23,7 @@ import javax.jms.Destination; import javax.jms.Message; /** - * + * Implements MessageProducer */ public class MessageProducerImpl extends MessageActor implements MessageProducer { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java new file mode 100644 index 0000000000..d45f2b54f1 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java @@ -0,0 +1,55 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient.jms; + +import javax.jms.QueueReceiver; +import javax.jms.JMSException; +import javax.jms.Queue; + +/** + * Implements javax.jms.QueueReceiver + */ +public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver +{ + //--- Constructor + /** + * create a new QueueReceiverImpl. + * + * @param session The session from which the QueueReceiverImpl is instantiated. + * @param queue The default queue for this QueueReceiverImpl. + * @param messageSelector the message selector for this QueueReceiverImpl. + * @throws JMSException If the QueueReceiverImpl cannot be created due to some internal error. + */ + protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws JMSException + { + super(session, (DestinationImpl) queue, messageSelector, false, null); + } + + //--- Interface QueueReceiver + /** + * Get the Queue associated with this queue receiver. + * + * @return this receiver's Queue + * @throws JMSException If getting the queue for this queue receiver fails due to some internal error. + */ + public Queue getQueue() throws JMSException + { + checkNotClosed(); + return (QueueImpl) _destination; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSenderImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSenderImpl.java new file mode 100644 index 0000000000..f9ffb39bf9 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSenderImpl.java @@ -0,0 +1,131 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient.jms; + +import javax.jms.QueueSender; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Message; + +/** + * Implements javax.jms.QueueSender + */ +public class QueueSenderImpl extends MessageProducerImpl implements QueueSender +{ + //--- Constructor + /** + * Create a new QueueSenderImpl. + * + * @param session the session from which the QueueSenderImpl is instantiated + * @param queue the default queue for this QueueSenderImpl + * @throws JMSException If the QueueSenderImpl cannot be created due to some internal error. + */ + protected QueueSenderImpl(SessionImpl session, QueueImpl queue) throws JMSException + { + super(session, queue); + } + + //--- Interface javax.jms.QueueSender + /** + * Get the queue associated with this QueueSender. + * + * @return This QueueSender's queue + * @throws JMSException If getting the queue for this QueueSender fails due to some internal error. + */ + public Queue getQueue() throws JMSException + { + return (Queue) getDestination(); + } + + /** + * Sends a message to the queue. Uses the <CODE>QueueSender</CODE>'s default delivery mode, priority, + * and time to live. + * + * @param message The message to send. + * @throws JMSException if sending the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If the queue is invalid. + * @throws java.lang.UnsupportedOperationException + * If invoked on QueueSender that did not specify a queue at creation time. + */ + public void send(Message message) throws JMSException + { + super.send(message); + } + + /** + * Send a message to the queue, specifying delivery mode, priority, and time to live. + * + * @param message The message to send + * @param deliveryMode The delivery mode to use + * @param priority The priority for this message + * @param timeToLive The message's lifetime (in milliseconds) + * + * @throws JMSException if sending the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If the queue is invalid. + * @throws java.lang.UnsupportedOperationException + * If invoked on QueueSender that did not specify a queue at creation time. + */ + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException + { + super.send(message, deliveryMode, priority, timeToLive); + } + + /** + * Send a message to a queue for an unidentified message producer. + * Uses the <CODE>QueueSender</CODE>'s default delivery mode, priority, + * and time to live. + * + * @param queue The queue to send this message to + * @param message The message to send + * @throws JMSException if sending the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If the queue is invalid. + */ + public void send(Queue queue, Message message) throws JMSException + { + super.send(queue, message); + } + + /** + * Sends a message to a queue for an unidentified message producer, + * specifying delivery mode, priority and time to live. + * + * @param queue The queue to send this message to + * @param message The message to send + * @param deliveryMode The delivery mode to use + * @param priority The priority for this message + * @param timeToLive The message's lifetime (in milliseconds) + * @throws JMSException if sending the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If the queue is invalid. + */ + public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException + { + super.send(queue, message, deliveryMode, priority, timeToLive); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java index 0b37c3af95..ed97145c04 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java @@ -17,11 +17,127 @@ */ package org.apache.qpid.nclient.jms; -import org.apache.qpid.nclient.jms.SessionImpl; +import javax.jms.*; +import javax.jms.IllegalStateException; /** - * + * Implementation of javax.jms.QueueSession */ -public class QueueSessionImpl extends SessionImpl +public class QueueSessionImpl extends SessionImpl implements QueueSession { + //--- constructor + /** + * Create a JMS Session + * + * @param connection The ConnectionImpl object from which the Session is created. + * @param transacted Indicates if the session transacted. + * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to + * {@link javax.jms.Session#SESSION_TRANSACTED} if the <code>transacted</code> + * parameter is true. + * @throws javax.jms.JMSSecurityException If the user could not be authenticated. + * @throws javax.jms.JMSException In case of internal error. + */ + protected QueueSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException + { + super(connection, transacted, acknowledgeMode); + } + + //-- Overwritten methods + /** + * Creates a durable subscriber to the specified topic, + * + * @param topic The non-temporary <CODE>Topic</CODE> to subscribe to. + * @param name The name used to identify this subscription. + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + { + throw new IllegalStateException("Cannot invoke createDurableSubscriber from QueueSession"); + } + + /** + * Create a TemporaryTopic. + * + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public TemporaryTopic createTemporaryTopic() throws JMSException + { + throw new IllegalStateException("Cannot invoke createTemporaryTopic from QueueSession"); + } + + /** + * Creates a topic identity given a Topicname. + * + * @param topicName The name of this <CODE>Topic</CODE> + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public Topic createTopic(String topicName) throws JMSException + { + throw new IllegalStateException("Cannot invoke createTopic from QueueSession"); + } + + /** + * Unsubscribes a durable subscription that has been created by a client. + * + * @param name the name used to identify this subscription + * @throws IllegalStateException Always + */ + @Override + public void unsubscribe(String name) throws JMSException + { + throw new IllegalStateException("Cannot invoke unsubscribe from QueueSession"); + } + + //--- Interface javax.jms.QueueSession + /** + * Create a QueueReceiver to receive messages from the specified queue. + * + * @param queue the <CODE>Queue</CODE> to access + * @return A QueueReceiver + * @throws JMSException If creating a receiver fails due to some internal error. + * @throws InvalidDestinationException If an invalid queue is specified. + */ + public QueueReceiver createReceiver(Queue queue) throws JMSException + { + return createReceiver(queue, null); + } + + /** + * Create a QueueReceiver to receive messages from the specified queue for a given message selector. + * + * @param queue the Queue to access + * @param messageSelector A value of null or an empty string indicates that + * there is no message selector for the message consumer. + * @return A QueueReceiver + * @throws JMSException If creating a receiver fails due to some internal error. + * @throws InvalidDestinationException If an invalid queue is specified. + * @throws InvalidSelectorException If the message selector is invalid. + */ + public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException + { + checkNotClosed(); + checkDestination(queue); + return new QueueReceiverImpl(this, queue, messageSelector); + } + + /** + * Create a QueueSender object to send messages to the specified queue. + * + * @param queue the Queue to access, or null if this is an unidentified producer + * @return A QueueSender + * @throws JMSException If creating the sender fails due to some internal error. + * @throws InvalidDestinationException If an invalid queue is specified. + */ + public QueueSender createSender(Queue queue) throws JMSException + { + checkNotClosed(); + // we do not check the destination since unidentified producers are allowed (no default destination). + return new QueueSenderImpl(this, (QueueImpl) queue); + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java index df286cf273..08bc3cc3b5 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java @@ -87,8 +87,45 @@ public class SessionImpl implements Session * This session connection */ private ConnectionImpl _connection; - //--- javax.jms.Session API + //--- Constructor + /** + * Create a JMS Session + * + * @param connection The ConnectionImpl object from which the Session is created. + * @param transacted Indicates if the session transacted. + * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to + * {@link Session#SESSION_TRANSACTED} if the <code>transacted</code> parameter is true. + * @throws JMSSecurityException If the user could not be authenticated. + * @throws JMSException In case of internal error. + */ + protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException + { + _connection = connection; + _transacted = transacted; + // for transacted sessions we ignore the acknowledgeMode and use GenericAckMode.SESSION_TRANSACTED + if (_transacted) + { + acknowledgeMode = Session.SESSION_TRANSACTED; + } + _acknowledgeMode = acknowledgeMode; + try + { + // create the qpid session + _qpidSession = _connection.getQpidConnection().createSession(0); + // set transacted if required + if (_transacted) + { + _qpidSession.setTransacted(); + } + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + //--- javax.jms.Session API /** * Creates a <CODE>BytesMessage</CODE> object used to send a message * containing a stream of uninterpreted bytes. @@ -245,7 +282,7 @@ public class SessionImpl implements Session try { // Note: this operation makes sure that asynch message processing has returned - _qpidSession.commit(); + _qpidSession.txCommit(); } catch (QpidException e) { @@ -272,7 +309,7 @@ public class SessionImpl implements Session try { // Note: this operation makes sure that asynch message processing has returned - _qpidSession.rollback(); + _qpidSession.txRollback(); } catch (org.apache.qpidity.QpidException e) { @@ -482,7 +519,7 @@ public class SessionImpl implements Session } /** - * reates a topic identity given a Topicname. + * Creates a topic identity given a Topicname. * <P>This facility is provided for the rare cases where clients need to * dynamically manipulate queue identity. It allows the creation of a * queue identity with a provider-specific name. Clients that depend @@ -575,25 +612,23 @@ public class SessionImpl implements Session return new QueueBrowserImpl(this, queue, messageSelector); } - /** - * Create a TemporaryQueue. Its lifetime will be tha of the Connection unless it is deleted earlier. - * - * @return A temporary queue. - * - * @exception JMSException If creating the temporary queue fails due to some internal error. - */ + /** + * Create a TemporaryQueue. Its lifetime will be tha of the Connection unless it is deleted earlier. + * + * @return A temporary queue. + * @throws JMSException If creating the temporary queue fails due to some internal error. + */ public TemporaryQueue createTemporaryQueue() throws JMSException { return new TemporaryQueueImpl(); } - /** - * Create a TemporaryTopic. Its lifetime will be tha of the Connection unless it is deleted earlier. - * - * @return A temporary topic. - * - * @exception JMSException If creating the temporary topic fails due to some internal error. - */ + /** + * Create a TemporaryTopic. Its lifetime will be tha of the Connection unless it is deleted earlier. + * + * @return A temporary topic. + * @throws JMSException If creating the temporary topic fails due to some internal error. + */ public TemporaryTopic createTemporaryTopic() throws JMSException { return new TemporaryTopicImpl(); @@ -665,6 +700,7 @@ public class SessionImpl implements Session /** * Validate that the destination is valid i.e. it is not null * + * @param dest The destination to be checked * @throws InvalidDestinationException If the destination not valid. */ protected void checkDestination(Destination dest) throws InvalidDestinationException @@ -727,8 +763,17 @@ public class SessionImpl implements Session //else there is no effect } - //------ Private Methods + /** + * Access to the underlying Qpid Session + * + * @return The associated Qpid Session. + */ + protected org.apache.qpid.nclient.api.Session getQpidSession() + { + return _qpidSession; + } + //------ Private Methods /** * Close the producer and the consumers of this session * diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicPublisherImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicPublisherImpl.java new file mode 100644 index 0000000000..3e49528c6f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicPublisherImpl.java @@ -0,0 +1,128 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient.jms; + +import javax.jms.*; + +/** + * Implements TopicPublisher + */ +public class TopicPublisherImpl extends MessageProducerImpl implements TopicPublisher +{ + //--- Constructor + /** + * Create a TopicPublisherImpl. + * + * @param session The session from which the TopicPublisherImpl is instantiated + * @param topic The default topic for this TopicPublisherImpl + * @throws JMSException If the TopicPublisherImpl cannot be created due to some internal error. + */ + protected TopicPublisherImpl(SessionImpl session, Topic topic) throws JMSException + { + super(session, (DestinationImpl) topic); + } + + //--- Interface javax.jms.TopicPublisher + /** + * Get the topic associated with this TopicPublisher. + * + * @return This publisher's topic + * @throws JMSException If getting the topic fails due to some internal error. + */ + public Topic getTopic() throws JMSException + { + return (Topic) getDestination(); + } + + + /** + * Publish a message to the topic using the default delivery mode, priority and time to live. + * + * @param message The message to publish + * @throws JMSException If publishing the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If an invalid topic is specified. + * @throws java.lang.UnsupportedOperationException + * If that publisher topic was not specified at creation time. + */ + public void publish(Message message) throws JMSException + { + super.send(message); + } + + /** + * Publish a message to the topic, specifying delivery mode, priority and time to live. + * + * @param message The message to publish + * @param deliveryMode The delivery mode to use + * @param priority The priority for this message + * @param timeToLive The message's lifetime (in milliseconds) + * @throws JMSException If publishing the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If an invalid topic is specified. + * @throws java.lang.UnsupportedOperationException + * If that publisher topic was not specified at creation time. + */ + public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException + { + super.send(message, deliveryMode, priority, timeToLive); + } + + + /** + * Publish a message to a topic for an unidentified message producer. + * Uses this TopicPublisher's default delivery mode, priority and time to live. + * + * @param topic The topic to publish this message to + * @param message The message to publish + * @throws JMSException If publishing the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If an invalid topic is specified. + */ + public void publish(Topic topic, Message message) throws JMSException + { + super.send(topic, message); + } + + /** + * Publishes a message to a topic for an unidentified message + * producer, specifying delivery mode, priority and time to live. + * + * @param topic The topic to publish this message to + * @param message The message to publish + * @param deliveryMode The delivery mode + * @param priority The priority for this message + * @param timeToLive The message's lifetime (in milliseconds) + * @throws JMSException If publishing the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If an invalid topic is specified. + */ + public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive) throws + JMSException + { + super.send(topic, message, deliveryMode, priority, timeToLive); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java index db2515d562..8537fd1268 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java @@ -17,11 +17,128 @@ */ package org.apache.qpid.nclient.jms; -import org.apache.qpid.nclient.jms.SessionImpl; +import javax.jms.*; +import javax.jms.IllegalStateException; /** - * + * Implements TopicSession */ -public class TopicSessionImpl extends SessionImpl +public class TopicSessionImpl extends SessionImpl implements TopicSession { + //-- constructor + /** + * Create a new TopicSessionImpl. + * + * @param connection The ConnectionImpl object from which the Session is created. + * @param transacted Specifiy whether this session is transacted? + * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to + * {@link javax.jms.Session#SESSION_TRANSACTED} if the <code>transacted</code> parameter + * is true. + * @throws javax.jms.JMSSecurityException If the user could not be authenticated. + * @throws javax.jms.JMSException In case of internal error. + */ + protected TopicSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException + { + super(connection, transacted, acknowledgeMode); + } + + //-- Overwritten methods + /** + * Create a QueueBrowser. + * + * @param queue The <CODE>Queue</CODE> to browse. + * @param messageSelector Only messages with properties matching the message selector expression are delivered. + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException + { + throw new IllegalStateException("Cannot invoke createBrowser from TopicSession"); + } + + /** + * Create a QueueBrowser. + * + * @param queue The <CODE>Queue</CODE> to browse. + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public QueueBrowser createBrowser(Queue queue) throws JMSException + { + throw new IllegalStateException("Cannot invoke createBrowser from TopicSession"); + } + + /** + * Creates a temporary queue. + * + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public TemporaryQueue createTemporaryQueue() throws JMSException + { + throw new IllegalStateException("Cannot invoke createTemporaryQueue from TopicSession"); + } + + /** + * Creates a queue identity by a given name. + * + * @param queueName the name of this <CODE>Queue</CODE> + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public Queue createQueue(String queueName) throws JMSException + { + throw new IllegalStateException("Cannot invoke createQueue from TopicSession"); + } + + //--- Interface TopicSession + /** + * Create a publisher for the specified topic. + * + * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified publisher. + * @throws JMSException If the creating a publisher fails due to some internal error. + * @throws InvalidDestinationException If an invalid topic is specified. + */ + public TopicPublisher createPublisher(Topic topic) throws JMSException + { + + checkNotClosed(); + // we do not check the destination topic here, since unidentified publishers are allowed. + return new TopicPublisherImpl(this, topic); + } + + /** + * Creates a nondurable subscriber to the specified topic. + * + * @param topic The Topic to subscribe to + * @throws JMSException If creating a subscriber fails due to some internal error. + * @throws InvalidDestinationException If an invalid topic is specified. + */ + public TopicSubscriber createSubscriber(Topic topic) throws JMSException + { + return createSubscriber(topic, null, false); + } + + /** + * Creates a nondurable subscriber to the specified topic, using a + * message selector or specifying whether messages published by its + * own connection should be delivered to it. + * + * @param topic The Topic to subscribe to + * @param messageSelector A value of null or an empty string indicates that there is no message selector. + * @param noLocal If true then inhibits the delivery of messages published by this subscriber's connection. + * @throws JMSException If creating a subscriber fails due to some internal error. + * @throws InvalidDestinationException If an invalid topic is specified. + * @throws InvalidSelectorException If the message selector is invalid. + */ + public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException + { + checkNotClosed(); + checkDestination(topic); + return new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null); + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java index 9ab45772a7..23007a5839 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java @@ -30,18 +30,18 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub /** * Create a new TopicSubscriberImpl. * - * @param session The session of this topic subscriber. - * @param topic The default topic for this TopicSubscriberImpl - * @param messageSelector The MessageSelector - * @param noLocal If true inhibits the delivery of messages published by its own connection. - * @param name Name of the subscription if this is to be created as a durable subscriber. If this value is null, - * a non-durable subscription is created. + * @param session The session of this topic subscriber. + * @param topic The default topic for this TopicSubscriberImpl + * @param messageSelector The MessageSelector + * @param noLocal If true inhibits the delivery of messages published by its own connection. + * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber. + * If this value is null, a non-durable subscription is created. * @throws javax.jms.JMSException If the TopicSubscriberImpl cannot be created due to internal error. */ protected TopicSubscriberImpl(SessionImpl session, Topic topic, String messageSelector, boolean noLocal, - String name) throws JMSException + String subscriptionName) throws JMSException { - super(session, (DestinationImpl) topic, messageSelector, noLocal, name); + super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName); } //--- javax.jms.TopicSubscriber interface |
