summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-07-31 13:19:35 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-07-31 13:19:35 +0000
commit8c69f1fe80520b301a3707b9e7079575f195995e (patch)
treefaf2183be4dd8c786266f95f146e712f45043365 /java/client
parent75a7d3aa30f4341e55b2033164c3e1af8f854abb (diff)
downloadqpid-python-8c69f1fe80520b301a3707b9e7079575f195995e.tar.gz
improved implementation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@561322 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java105
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java9
7 files changed, 143 insertions, 58 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
index 9daed6201d..da06b09f6d 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
@@ -134,7 +134,6 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
public String getClientID() throws JMSException
{
checkNotClosed();
-
return _clientID;
}
@@ -225,8 +224,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
// start all the sessions
for (SessionImpl session : _sessions)
{
- //TODO session.start();
- //TODO Exception handling
+ session.start();
}
_started = true;
}
@@ -247,11 +245,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
checkNotClosed();
if (_started)
{
- // start all the sessions
+ // stop all the sessions
for (SessionImpl session : _sessions)
{
- //TODO session.stop();
- //TODO Exception handling
+ session.stop();
}
_started = false;
}
@@ -282,8 +279,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
// close all the sessions
for (SessionImpl session : _sessions)
{
- //TODO session.close();
- //TODO Exception handling
+ session.close();
}
// close the underlaying Qpid connection
try
@@ -337,13 +333,24 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
//-------------- QueueConnection API
- public QueueSession createQueueSession(boolean b, int i) throws JMSException
+ /**
+ * Create a QueueSession.
+ *
+ * @param transacted Indicates whether the session is transacted.
+ * @param acknowledgeMode Indicates whether the consumer or the
+ * client will acknowledge any messages it receives; ignored if the session
+ * is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
+ * <code>Session.CLIENT_ACKNOWLEDGE</code> and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+ * @return A queueSession object/
+ * @throws JMSException If creating a QueueSession fails due to some internal error.
+ */
+ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
{
checkNotClosed();
- //TODO: create a queue session
- QueueSessionImpl queueSession = null;
+ QueueSessionImpl queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode);
+ // add this session to the list of handled sessions.
_sessions.add(queueSession);
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return queueSession;
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java
index f9486b027e..be3c9de194 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java
@@ -18,6 +18,7 @@
package org.apache.qpid.nclient.jms;
import javax.jms.Destination;
+import javax.jms.JMSException;
/**
* Implementation of the JMS Destination interface
@@ -29,13 +30,24 @@ public class DestinationImpl implements Destination
*/
protected String _name = null;
+ /**
+ * The session used to create this destination
+ */
+ protected SessionImpl _session;
+
//--- Constructor
/**
* Create a new DestinationImpl with a given name.
- * @param name The name of this destination
+ *
+ * @param name The name of this destination.
+ * @param session The session used to create this destination.
+ * @throws JMSException If the destiantion name is not valid
*/
- protected DestinationImpl(String name)
+ protected DestinationImpl(SessionImpl session, String name) throws JMSException
{
+ // TODO validate that this destination name exists
+ //_session.getQpidSession()
+ _session = session;
_name = name;
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java
index cf16df280d..9120173fd9 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java
@@ -29,10 +29,14 @@ public class QueueImpl extends DestinationImpl implements Queue
//--- Constructor
/**
* Create a new QueueImpl with a given name.
+ *
+ * @param name The name of this queue.
+ * @param session The session used to create this queue.
+ * @throws JMSException If the queue name is not valid
*/
- public QueueImpl(String name)
+ protected QueueImpl(SessionImpl session, String name) throws JMSException
{
- super(name);
+ super(session, name);
}
//---- Interface javax.jms.Queue
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
index 08bc3cc3b5..bb1181e814 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
@@ -21,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.nclient.jms.message.*;
import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -111,7 +112,7 @@ public class SessionImpl implements Session
_acknowledgeMode = acknowledgeMode;
try
{
- // create the qpid session
+ // create the qpid session with an expiry <= 0 so that the session does not expire
_qpidSession = _connection.getQpidConnection().createSession(0);
// set transacted if required
if (_transacted)
@@ -153,9 +154,8 @@ public class SessionImpl implements Session
}
/**
- * Creates a <CODE>Message</CODE> object that holds all the
- * standard message header information. It can be sent when a message
- * containing only header information is sufficient.
+ * Creates a <code>Message</code> object that holds all the standard message header information.
+ * It can be sent when a message containing only header information is sufficient.
* We simply return a ByteMessage
*
* @return A Message.
@@ -167,7 +167,7 @@ public class SessionImpl implements Session
}
/**
- * Creates an <CODE>ObjectMessage</CODE> used to send a message
+ * Creates an <code>ObjectMessage</code> used to send a message
* that contains a serializable Java object.
*
* @return An ObjectMessage.
@@ -180,7 +180,7 @@ public class SessionImpl implements Session
}
/**
- * Creates an initialized <CODE>ObjectMessage</CODE> used to send a message that contains
+ * Creates an initialized <code>ObjectMessage</code> used to send a message that contains
* a serializable Java object.
*
* @param serializable The object to use to initialize this message.
@@ -195,7 +195,7 @@ public class SessionImpl implements Session
}
/**
- * Creates a <CODE>StreamMessage</CODE> used to send a
+ * Creates a <code>StreamMessage</code> used to send a
* self-defining stream of primitive values in the Java programming
* language.
*
@@ -209,7 +209,7 @@ public class SessionImpl implements Session
}
/**
- * Creates a <CODE>TextMessage</CODE> object used to send a message containing a String.
+ * Creates a <code>TextMessage</code> object used to send a message containing a String.
*
* @return A TextMessage object
* @throws JMSException If Creating an TextMessage object fails due to some internal error.
@@ -221,7 +221,7 @@ public class SessionImpl implements Session
}
/**
- * Creates an initialized <CODE>TextMessage</CODE> used to send
+ * Creates an initialized <code>TextMessage</code> used to send
* a message containing a String.
*
* @param text The string used to initialize this message.
@@ -320,13 +320,13 @@ public class SessionImpl implements Session
/**
* Closes this session.
* <p> The JMS specification says
- * <P> This call will block until a <CODE>receive</CODE> call or message
+ * <P> This call will block until a <code>receive</code> call or message
* listener in progress has completed. A blocked message consumer
- * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed.
+ * <code>receive</code> call returns <code>null</code> when this session is closed.
* <P>Closing a transacted session must roll back the transaction in progress.
- * <P>This method is the only <CODE>Session</CODE> method that can be called concurrently.
- * <P>Invoking any other <CODE>Session</CODE> method on a closed session
- * must throw a <CODE>javax.jms.IllegalStateException</CODE>.
+ * <P>This method is the only <code>Session</code> method that can be called concurrently.
+ * <P>Invoking any other <code>Session</code> method on a closed session
+ * must throw a <code>javax.jms.IllegalStateException</code>.
* <p> Closing a closed session must <I>not</I> throw an exception.
*
* @throws JMSException If closing the session fails due to some internal error.
@@ -443,7 +443,7 @@ public class SessionImpl implements Session
/**
* Creates a MessageConsumer for the specified destination.
*
- * @param destination The <CODE>Destination</CODE> to access
+ * @param destination The <code>Destination</code> to access
* @return A new MessageConsumer for the specified destination.
* @throws JMSException If the session fails to create a MessageConsumer due to some internal error.
* @throws InvalidDestinationException If an invalid destination is specified.
@@ -456,7 +456,7 @@ public class SessionImpl implements Session
/**
* Creates a MessageConsumer for the specified destination, using a message selector.
*
- * @param destination The <CODE>Destination</CODE> to access
+ * @param destination The <code>Destination</code> to access
* @param messageSelector Only messages with properties matching the message selector expression are delivered.
* @return A new MessageConsumer for the specified destination.
* @throws JMSException If the session fails to create a MessageConsumer due to some internal error.
@@ -478,7 +478,7 @@ public class SessionImpl implements Session
* NoLocal attribute allows a consumer to inhibit the delivery of messages published by its
* own connection. The default value for this attribute is False.
*
- * @param destination The <CODE>Destination</CODE> to access
+ * @param destination The <code>Destination</code> to access
* @param messageSelector Only messages with properties matching the message selector expression are delivered.
* @param noLocal If true, and the destination is a topic, inhibits the delivery of messages published
* by its own connection.
@@ -505,17 +505,16 @@ public class SessionImpl implements Session
* The physical creation of queues is an administrative task and is not
* to be initiated by the JMS API. The one exception is the
* creation of temporary queues, which is accomplished with the
- * <CODE>createTemporaryQueue</CODE> method.
+ * <code>createTemporaryQueue</code> method.
*
- * @param queueName the name of this <CODE>Queue</CODE>
- * @return a <CODE>Queue</CODE> with the given name
+ * @param queueName the name of this <code>Queue</code>
+ * @return a <code>Queue</code> with the given name
* @throws JMSException If the session fails to create a queue due to some internal error.
*/
public Queue createQueue(String queueName) throws JMSException
{
checkNotClosed();
- // todo: check that this destiantion name does exist
- return new QueueImpl(queueName);
+ return new QueueImpl(this, queueName);
}
/**
@@ -528,23 +527,22 @@ public class SessionImpl implements Session
* The physical creation of queues is an administrative task and is not
* to be initiated by the JMS API. The one exception is the
* creation of temporary queues, which is accomplished with the
- * <CODE>createTemporaryTopic</CODE> method.
+ * <code>createTemporaryTopic</code> method.
*
- * @param topicName The name of this <CODE>Topic</CODE>
- * @return a <CODE>Topic</CODE> with the given name
+ * @param topicName The name of this <code>Topic</code>
+ * @return a <code>Topic</code> with the given name
* @throws JMSException If the session fails to create a topic due to some internal error.
*/
public Topic createTopic(String topicName) throws JMSException
{
checkNotClosed();
- // todo: check that this destiantion name does exist
- return new TopicImpl(topicName);
+ return new TopicImpl(this, topicName);
}
/**
* Creates a durable subscriber to the specified topic,
*
- * @param topic The non-temporary <CODE>Topic</CODE> to subscribe to.
+ * @param topic The non-temporary <code>Topic</code> to subscribe to.
* @param name The name used to identify this subscription.
* @return A durable subscriber to the specified topic,
* @throws JMSException If creating a subscriber fails due to some internal error.
@@ -561,11 +559,11 @@ public class SessionImpl implements Session
* Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
* published by its
* own connection should be delivered to it.
- * <p> A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
+ * <p> A client can change an existing durable subscription by creating a durable <code>TopicSubscriber</code> with
* the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
* unsubscribing (deleting) the old one and creating a new one.
*
- * @param topic The non-temporary <CODE>Topic</CODE> to subscribe to.
+ * @param topic The non-temporary <code>Topic</code> to subscribe to.
* @param name The name used to identify this subscription.
* @param messageSelector Only messages with properties matching the message selector expression are delivered.
* @param noLocal If set, inhibits the delivery of messages published by its own connection
@@ -585,7 +583,7 @@ public class SessionImpl implements Session
/**
* Create a QueueBrowser to peek at the messages on the specified queue.
*
- * @param queue The <CODE>Queue</CODE> to browse.
+ * @param queue The <code>Queue</code> to browse.
* @return A QueueBrowser.
* @throws JMSException If creating a browser fails due to some internal error.
* @throws InvalidDestinationException If an invalid queue is specified.
@@ -598,7 +596,7 @@ public class SessionImpl implements Session
/**
* Create a QueueBrowser to peek at the messages on the specified queue using a message selector.
*
- * @param queue The <CODE>Queue</CODE> to browse.
+ * @param queue The <code>Queue</code> to browse.
* @param messageSelector Only messages with properties matching the message selector expression are delivered.
* @return A QueueBrowser.
* @throws JMSException If creating a browser fails due to some internal error.
@@ -620,7 +618,7 @@ public class SessionImpl implements Session
*/
public TemporaryQueue createTemporaryQueue() throws JMSException
{
- return new TemporaryQueueImpl();
+ return new TemporaryQueueImpl(this);
}
/**
@@ -631,7 +629,7 @@ public class SessionImpl implements Session
*/
public TemporaryTopic createTemporaryTopic() throws JMSException
{
- return new TemporaryTopicImpl();
+ return new TemporaryTopicImpl(this);
}
/**
@@ -641,7 +639,7 @@ public class SessionImpl implements Session
* subscriber by its provider.
* <p/>
* <P>It is erroneous for a client to delete a durable subscription
- * while there is an active <CODE>TopicSubscriber</CODE> for the
+ * while there is an active <code>TopicSubscriber</code> for the
* subscription, or while a consumed message is part of a pending
* transaction or has not been acknowledged in the session.
*
@@ -657,6 +655,43 @@ public class SessionImpl implements Session
}
//----- Protected methods
+
+ /**
+ * Start the flow of message to this session.
+ *
+ * @throws JMSException If starting the session fails due to some communication error.
+ */
+ protected void start() throws JMSException
+ {
+ try
+ {
+ // TODO: make sure that the correct options are used
+ _qpidSession.sessionFlow(Option.SUSPEND);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Stop the flow of message to this session.
+ *
+ * @throws JMSException If stopping the session fails due to some communication error.
+ */
+ protected void stop() throws JMSException
+ {
+ try
+ {
+ // TODO: make sure that the correct options are used
+ _qpidSession.sessionFlow(Option.RESUME);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
/**
* Notify this session that a message is processed
*
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java
index ef5d9858df..b33ab0d990 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java
@@ -32,19 +32,35 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, Tem
//--- constructor
- public TemporaryQueueImpl()
+ /**
+ * Create a new TemporaryQueueImpl with a given name.
+ *
+ * @param session The session used to create this TemporaryQueueImpl.
+ * @throws JMSException If creating the TemporaryQueueImpl fails due to some error.
+ */
+ public TemporaryQueueImpl(SessionImpl session) throws JMSException
{
// temporary destinations do not have names and are not registered in the JNDI namespace.
- super("NAME_NOT_SET");
+ super(session, "NAME_NOT_SET");
}
//-- TemporaryDestination Interface
+ /**
+ * Specify whether this temporary destination is deleted.
+ *
+ * @return true is this temporary destination is deleted.
+ */
public boolean isdeleted()
{
return _isDeleted;
}
//-- TemporaryTopic Interface
+ /**
+ * Delete this temporary destinaiton
+ *
+ * @throws JMSException If deleting this temporary queue fails due to some error.
+ */
public void delete() throws JMSException
{
// todo delete this temporary queue
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java
index 7daad4480b..11deba8361 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java
@@ -32,10 +32,16 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic, Tem
private boolean _isDeleted = false;
//--- constructor
- public TemporaryTopicImpl()
+ /**
+ * Create a new TemporaryTopicImpl with a given name.
+ *
+ * @param session The session used to create this TemporaryTopicImpl.
+ * @throws JMSException If creating the TemporaryTopicImpl fails due to some error.
+ */
+ public TemporaryTopicImpl(SessionImpl session) throws JMSException
{
- // temporary destinations do not have names and are not registered in the JNDI namespace.
- super("NAME_NOT_SET");
+ // temporary destinations do not have names.
+ super(session, "NAME_NOT_SET");
}
//-- TemporaryDestination Interface
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java
index 43fa34b0fe..25c2afa4e7 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java
@@ -18,6 +18,7 @@
package org.apache.qpid.nclient.jms;
import javax.jms.Topic;
+import javax.jms.JMSException;
/**
* Implementation of the javax.jms.Topic interface.
@@ -27,10 +28,14 @@ public class TopicImpl extends DestinationImpl implements Topic
//--- Constructor
/**
* Create a new TopicImpl with a given name.
+ *
+ * @param name The name of this topic
+ * @param session The session used to create this queue.
+ * @throws JMSException If the topic name is not valid
*/
- public TopicImpl(String name)
+ public TopicImpl(SessionImpl session, String name) throws JMSException
{
- super(name);
+ super(session, name);
}
//--- javax.jsm.Topic Interface