diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-07-31 13:19:35 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-07-31 13:19:35 +0000 |
| commit | 8c69f1fe80520b301a3707b9e7079575f195995e (patch) | |
| tree | faf2183be4dd8c786266f95f146e712f45043365 /java/client | |
| parent | 75a7d3aa30f4341e55b2033164c3e1af8f854abb (diff) | |
| download | qpid-python-8c69f1fe80520b301a3707b9e7079575f195995e.tar.gz | |
improved implementation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@561322 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
7 files changed, 143 insertions, 58 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java index 9daed6201d..da06b09f6d 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java @@ -134,7 +134,6 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public String getClientID() throws JMSException { checkNotClosed(); - return _clientID; } @@ -225,8 +224,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect // start all the sessions for (SessionImpl session : _sessions) { - //TODO session.start(); - //TODO Exception handling + session.start(); } _started = true; } @@ -247,11 +245,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect checkNotClosed(); if (_started) { - // start all the sessions + // stop all the sessions for (SessionImpl session : _sessions) { - //TODO session.stop(); - //TODO Exception handling + session.stop(); } _started = false; } @@ -282,8 +279,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect // close all the sessions for (SessionImpl session : _sessions) { - //TODO session.close(); - //TODO Exception handling + session.close(); } // close the underlaying Qpid connection try @@ -337,13 +333,24 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect //-------------- QueueConnection API - public QueueSession createQueueSession(boolean b, int i) throws JMSException + /** + * Create a QueueSession. + * + * @param transacted Indicates whether the session is transacted. + * @param acknowledgeMode Indicates whether the consumer or the + * client will acknowledge any messages it receives; ignored if the session + * is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>, + * <code>Session.CLIENT_ACKNOWLEDGE</code> and <code>Session.DUPS_OK_ACKNOWLEDGE</code>. + * @return A queueSession object/ + * @throws JMSException If creating a QueueSession fails due to some internal error. + */ + public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { checkNotClosed(); - //TODO: create a queue session - QueueSessionImpl queueSession = null; + QueueSessionImpl queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode); + // add this session to the list of handled sessions. _sessions.add(queueSession); - return null; //To change body of implemented methods use File | Settings | File Templates. + return queueSession; } /** diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java index f9486b027e..be3c9de194 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java @@ -18,6 +18,7 @@ package org.apache.qpid.nclient.jms; import javax.jms.Destination; +import javax.jms.JMSException; /** * Implementation of the JMS Destination interface @@ -29,13 +30,24 @@ public class DestinationImpl implements Destination */ protected String _name = null; + /** + * The session used to create this destination + */ + protected SessionImpl _session; + //--- Constructor /** * Create a new DestinationImpl with a given name. - * @param name The name of this destination + * + * @param name The name of this destination. + * @param session The session used to create this destination. + * @throws JMSException If the destiantion name is not valid */ - protected DestinationImpl(String name) + protected DestinationImpl(SessionImpl session, String name) throws JMSException { + // TODO validate that this destination name exists + //_session.getQpidSession() + _session = session; _name = name; } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java index cf16df280d..9120173fd9 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java @@ -29,10 +29,14 @@ public class QueueImpl extends DestinationImpl implements Queue //--- Constructor /** * Create a new QueueImpl with a given name. + * + * @param name The name of this queue. + * @param session The session used to create this queue. + * @throws JMSException If the queue name is not valid */ - public QueueImpl(String name) + protected QueueImpl(SessionImpl session, String name) throws JMSException { - super(name); + super(session, name); } //---- Interface javax.jms.Queue diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java index 08bc3cc3b5..bb1181e814 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java @@ -21,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.nclient.jms.message.*; import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; import javax.jms.*; import javax.jms.IllegalStateException; @@ -111,7 +112,7 @@ public class SessionImpl implements Session _acknowledgeMode = acknowledgeMode; try { - // create the qpid session + // create the qpid session with an expiry <= 0 so that the session does not expire _qpidSession = _connection.getQpidConnection().createSession(0); // set transacted if required if (_transacted) @@ -153,9 +154,8 @@ public class SessionImpl implements Session } /** - * Creates a <CODE>Message</CODE> object that holds all the - * standard message header information. It can be sent when a message - * containing only header information is sufficient. + * Creates a <code>Message</code> object that holds all the standard message header information. + * It can be sent when a message containing only header information is sufficient. * We simply return a ByteMessage * * @return A Message. @@ -167,7 +167,7 @@ public class SessionImpl implements Session } /** - * Creates an <CODE>ObjectMessage</CODE> used to send a message + * Creates an <code>ObjectMessage</code> used to send a message * that contains a serializable Java object. * * @return An ObjectMessage. @@ -180,7 +180,7 @@ public class SessionImpl implements Session } /** - * Creates an initialized <CODE>ObjectMessage</CODE> used to send a message that contains + * Creates an initialized <code>ObjectMessage</code> used to send a message that contains * a serializable Java object. * * @param serializable The object to use to initialize this message. @@ -195,7 +195,7 @@ public class SessionImpl implements Session } /** - * Creates a <CODE>StreamMessage</CODE> used to send a + * Creates a <code>StreamMessage</code> used to send a * self-defining stream of primitive values in the Java programming * language. * @@ -209,7 +209,7 @@ public class SessionImpl implements Session } /** - * Creates a <CODE>TextMessage</CODE> object used to send a message containing a String. + * Creates a <code>TextMessage</code> object used to send a message containing a String. * * @return A TextMessage object * @throws JMSException If Creating an TextMessage object fails due to some internal error. @@ -221,7 +221,7 @@ public class SessionImpl implements Session } /** - * Creates an initialized <CODE>TextMessage</CODE> used to send + * Creates an initialized <code>TextMessage</code> used to send * a message containing a String. * * @param text The string used to initialize this message. @@ -320,13 +320,13 @@ public class SessionImpl implements Session /** * Closes this session. * <p> The JMS specification says - * <P> This call will block until a <CODE>receive</CODE> call or message + * <P> This call will block until a <code>receive</code> call or message * listener in progress has completed. A blocked message consumer - * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed. + * <code>receive</code> call returns <code>null</code> when this session is closed. * <P>Closing a transacted session must roll back the transaction in progress. - * <P>This method is the only <CODE>Session</CODE> method that can be called concurrently. - * <P>Invoking any other <CODE>Session</CODE> method on a closed session - * must throw a <CODE>javax.jms.IllegalStateException</CODE>. + * <P>This method is the only <code>Session</code> method that can be called concurrently. + * <P>Invoking any other <code>Session</code> method on a closed session + * must throw a <code>javax.jms.IllegalStateException</code>. * <p> Closing a closed session must <I>not</I> throw an exception. * * @throws JMSException If closing the session fails due to some internal error. @@ -443,7 +443,7 @@ public class SessionImpl implements Session /** * Creates a MessageConsumer for the specified destination. * - * @param destination The <CODE>Destination</CODE> to access + * @param destination The <code>Destination</code> to access * @return A new MessageConsumer for the specified destination. * @throws JMSException If the session fails to create a MessageConsumer due to some internal error. * @throws InvalidDestinationException If an invalid destination is specified. @@ -456,7 +456,7 @@ public class SessionImpl implements Session /** * Creates a MessageConsumer for the specified destination, using a message selector. * - * @param destination The <CODE>Destination</CODE> to access + * @param destination The <code>Destination</code> to access * @param messageSelector Only messages with properties matching the message selector expression are delivered. * @return A new MessageConsumer for the specified destination. * @throws JMSException If the session fails to create a MessageConsumer due to some internal error. @@ -478,7 +478,7 @@ public class SessionImpl implements Session * NoLocal attribute allows a consumer to inhibit the delivery of messages published by its * own connection. The default value for this attribute is False. * - * @param destination The <CODE>Destination</CODE> to access + * @param destination The <code>Destination</code> to access * @param messageSelector Only messages with properties matching the message selector expression are delivered. * @param noLocal If true, and the destination is a topic, inhibits the delivery of messages published * by its own connection. @@ -505,17 +505,16 @@ public class SessionImpl implements Session * The physical creation of queues is an administrative task and is not * to be initiated by the JMS API. The one exception is the * creation of temporary queues, which is accomplished with the - * <CODE>createTemporaryQueue</CODE> method. + * <code>createTemporaryQueue</code> method. * - * @param queueName the name of this <CODE>Queue</CODE> - * @return a <CODE>Queue</CODE> with the given name + * @param queueName the name of this <code>Queue</code> + * @return a <code>Queue</code> with the given name * @throws JMSException If the session fails to create a queue due to some internal error. */ public Queue createQueue(String queueName) throws JMSException { checkNotClosed(); - // todo: check that this destiantion name does exist - return new QueueImpl(queueName); + return new QueueImpl(this, queueName); } /** @@ -528,23 +527,22 @@ public class SessionImpl implements Session * The physical creation of queues is an administrative task and is not * to be initiated by the JMS API. The one exception is the * creation of temporary queues, which is accomplished with the - * <CODE>createTemporaryTopic</CODE> method. + * <code>createTemporaryTopic</code> method. * - * @param topicName The name of this <CODE>Topic</CODE> - * @return a <CODE>Topic</CODE> with the given name + * @param topicName The name of this <code>Topic</code> + * @return a <code>Topic</code> with the given name * @throws JMSException If the session fails to create a topic due to some internal error. */ public Topic createTopic(String topicName) throws JMSException { checkNotClosed(); - // todo: check that this destiantion name does exist - return new TopicImpl(topicName); + return new TopicImpl(this, topicName); } /** * Creates a durable subscriber to the specified topic, * - * @param topic The non-temporary <CODE>Topic</CODE> to subscribe to. + * @param topic The non-temporary <code>Topic</code> to subscribe to. * @param name The name used to identify this subscription. * @return A durable subscriber to the specified topic, * @throws JMSException If creating a subscriber fails due to some internal error. @@ -561,11 +559,11 @@ public class SessionImpl implements Session * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages * published by its * own connection should be delivered to it. - * <p> A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with + * <p> A client can change an existing durable subscription by creating a durable <code>TopicSubscriber</code> with * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to * unsubscribing (deleting) the old one and creating a new one. * - * @param topic The non-temporary <CODE>Topic</CODE> to subscribe to. + * @param topic The non-temporary <code>Topic</code> to subscribe to. * @param name The name used to identify this subscription. * @param messageSelector Only messages with properties matching the message selector expression are delivered. * @param noLocal If set, inhibits the delivery of messages published by its own connection @@ -585,7 +583,7 @@ public class SessionImpl implements Session /** * Create a QueueBrowser to peek at the messages on the specified queue. * - * @param queue The <CODE>Queue</CODE> to browse. + * @param queue The <code>Queue</code> to browse. * @return A QueueBrowser. * @throws JMSException If creating a browser fails due to some internal error. * @throws InvalidDestinationException If an invalid queue is specified. @@ -598,7 +596,7 @@ public class SessionImpl implements Session /** * Create a QueueBrowser to peek at the messages on the specified queue using a message selector. * - * @param queue The <CODE>Queue</CODE> to browse. + * @param queue The <code>Queue</code> to browse. * @param messageSelector Only messages with properties matching the message selector expression are delivered. * @return A QueueBrowser. * @throws JMSException If creating a browser fails due to some internal error. @@ -620,7 +618,7 @@ public class SessionImpl implements Session */ public TemporaryQueue createTemporaryQueue() throws JMSException { - return new TemporaryQueueImpl(); + return new TemporaryQueueImpl(this); } /** @@ -631,7 +629,7 @@ public class SessionImpl implements Session */ public TemporaryTopic createTemporaryTopic() throws JMSException { - return new TemporaryTopicImpl(); + return new TemporaryTopicImpl(this); } /** @@ -641,7 +639,7 @@ public class SessionImpl implements Session * subscriber by its provider. * <p/> * <P>It is erroneous for a client to delete a durable subscription - * while there is an active <CODE>TopicSubscriber</CODE> for the + * while there is an active <code>TopicSubscriber</code> for the * subscription, or while a consumed message is part of a pending * transaction or has not been acknowledged in the session. * @@ -657,6 +655,43 @@ public class SessionImpl implements Session } //----- Protected methods + + /** + * Start the flow of message to this session. + * + * @throws JMSException If starting the session fails due to some communication error. + */ + protected void start() throws JMSException + { + try + { + // TODO: make sure that the correct options are used + _qpidSession.sessionFlow(Option.SUSPEND); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + /** + * Stop the flow of message to this session. + * + * @throws JMSException If stopping the session fails due to some communication error. + */ + protected void stop() throws JMSException + { + try + { + // TODO: make sure that the correct options are used + _qpidSession.sessionFlow(Option.RESUME); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + /** * Notify this session that a message is processed * diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java index ef5d9858df..b33ab0d990 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java @@ -32,19 +32,35 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, Tem //--- constructor - public TemporaryQueueImpl() + /** + * Create a new TemporaryQueueImpl with a given name. + * + * @param session The session used to create this TemporaryQueueImpl. + * @throws JMSException If creating the TemporaryQueueImpl fails due to some error. + */ + public TemporaryQueueImpl(SessionImpl session) throws JMSException { // temporary destinations do not have names and are not registered in the JNDI namespace. - super("NAME_NOT_SET"); + super(session, "NAME_NOT_SET"); } //-- TemporaryDestination Interface + /** + * Specify whether this temporary destination is deleted. + * + * @return true is this temporary destination is deleted. + */ public boolean isdeleted() { return _isDeleted; } //-- TemporaryTopic Interface + /** + * Delete this temporary destinaiton + * + * @throws JMSException If deleting this temporary queue fails due to some error. + */ public void delete() throws JMSException { // todo delete this temporary queue diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java index 7daad4480b..11deba8361 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java @@ -32,10 +32,16 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic, Tem private boolean _isDeleted = false; //--- constructor - public TemporaryTopicImpl() + /** + * Create a new TemporaryTopicImpl with a given name. + * + * @param session The session used to create this TemporaryTopicImpl. + * @throws JMSException If creating the TemporaryTopicImpl fails due to some error. + */ + public TemporaryTopicImpl(SessionImpl session) throws JMSException { - // temporary destinations do not have names and are not registered in the JNDI namespace. - super("NAME_NOT_SET"); + // temporary destinations do not have names. + super(session, "NAME_NOT_SET"); } //-- TemporaryDestination Interface diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java index 43fa34b0fe..25c2afa4e7 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java @@ -18,6 +18,7 @@ package org.apache.qpid.nclient.jms; import javax.jms.Topic; +import javax.jms.JMSException; /** * Implementation of the javax.jms.Topic interface. @@ -27,10 +28,14 @@ public class TopicImpl extends DestinationImpl implements Topic //--- Constructor /** * Create a new TopicImpl with a given name. + * + * @param name The name of this topic + * @param session The session used to create this queue. + * @throws JMSException If the topic name is not valid */ - public TopicImpl(String name) + public TopicImpl(SessionImpl session, String name) throws JMSException { - super(name); + super(session, name); } //--- javax.jsm.Topic Interface |
