summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-07-27 16:10:27 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-07-27 16:10:27 +0000
commit6b682293955924ea72bd83b0c5f601c7ded405bf (patch)
tree83f06ffb2bb9d9f37b14c2600fadc7733fc80fb8 /java/client
parent80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (diff)
downloadqpid-python-6b682293955924ea72bd83b0c5f601c7ded405bf.tar.gz
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560296 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/FieldTable.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/CreateReceiverOption.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/DeclareExchangeOption.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/DeclareQueueOption.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/DeleteExchangeOption.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/DeleteQueueOption.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/Message.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/MessageSender.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/api/Session.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java76
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java39
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/CustomJMSXProperty.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java140
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java148
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java115
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java25
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/QpidExceptionListenerImpl.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java561
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesMessage.java151
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesTypedMessage.java801
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java685
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessageFactory.java103
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessage.java388
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessageFactory.java43
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSHeaderAdapter.java552
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessage.java507
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessageFactory.java43
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessage.java197
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessageFactory.java43
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessage.java204
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessageFactory.java43
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessage.java201
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessageFactory.java46
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageConverter.java202
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactory.java41
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactoryRegistry.java127
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java135
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/message/UnprocessedMessage.java131
47 files changed, 5671 insertions, 164 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/FieldTable.java b/java/client/src/main/java/org/apache/qpid/nclient/FieldTable.java
index ad6da34d68..37caaf5df9 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/FieldTable.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/FieldTable.java
@@ -19,9 +19,7 @@
package org.apache.qpid.nclient;
/**
- * Created by Arnaud Simon
- * Date: 23-Jul-2007
- * Time: 09:47:32
+ *
*/
public interface FieldTable
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java
index 6ba4787b56..a02e378b43 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java
@@ -25,10 +25,6 @@ import org.apache.qpidity.QpidException;
/**
* This represents a physical connection to a broker.
- * <p/>
- * Created by Arnaud Simon
- * Date: 20-Jul-2007
- * Time: 09:34:15
*/
public interface Connection
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/CreateReceiverOption.java b/java/client/src/main/java/org/apache/qpid/nclient/api/CreateReceiverOption.java
index e3cd813170..34d0a0f250 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/CreateReceiverOption.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/CreateReceiverOption.java
@@ -2,10 +2,6 @@ package org.apache.qpid.nclient.api;
/**
* Enumeration of the options available when creating a receiver
- *
- * Created by Arnaud Simon
- * Date: 20-Jul-2007
- * Time: 09:43:31
*/
public enum CreateReceiverOption
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareExchangeOption.java b/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareExchangeOption.java
index 241dd1d02f..e354539364 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareExchangeOption.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareExchangeOption.java
@@ -2,10 +2,6 @@ package org.apache.qpid.nclient.api;
/**
* Enumeration of the options available when declaring an exchange
- *
- * Created by Arnaud Simon
- * Date: 20-Jul-2007
- * Time: 09:44:52
*/
public enum DeclareExchangeOption
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareQueueOption.java b/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareQueueOption.java
index 89e3573024..689523aeda 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareQueueOption.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareQueueOption.java
@@ -3,9 +3,6 @@ package org.apache.qpid.nclient.api;
/**
* Enumeration of the options available when declaring a queue
*
- * Created by Arnaud Simon
- * Date: 23-Jul-2007
- * Time: 09:44:36
*/
public enum DeclareQueueOption
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteExchangeOption.java b/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteExchangeOption.java
index 17ee5beb1f..415c045de8 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteExchangeOption.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteExchangeOption.java
@@ -1,9 +1,7 @@
package org.apache.qpid.nclient.api;
/**
- * Created by Arnaud Simon
- * Date: 23-Jul-2007
- * Time: 12:55:55
+ * Available options for deleting an exchange.
*/
public enum DeleteExchangeOption
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteQueueOption.java b/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteQueueOption.java
index c0a30f8b68..d110bd2a50 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteQueueOption.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteQueueOption.java
@@ -1,9 +1,7 @@
package org.apache.qpid.nclient.api;
/**
- * Created by Arnaud Simon
- * Date: 23-Jul-2007
- * Time: 12:44:43
+ * Available options for deleting a queue.
*/
public enum DeleteQueueOption
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java b/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java
index aafe0a2054..ac396b7c79 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java
@@ -22,10 +22,6 @@ import org.apache.qpidity.QpidException;
/**
* This session�s resources are control under the scope of a distributed transaction.
- *
- * Created by Arnaud Simon
- * Date: 20-Jul-2007
- * Time: 09:39:11
*/
public interface DtxSession extends Session
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java b/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java
index b624e10cbd..5f7bbe7cf2 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java
@@ -23,10 +23,6 @@ import org.apache.qpidity.QpidException;
/**
* If the communication layer detects a serious problem with a <CODE>connection</CODE>, it
* informs the connection's ExceptionListener
- *
- * Created by Arnaud Simon
- * Date: 25-Jul-2007
- * Time: 12:00:27
*/
public interface ExceptionListener
{
@@ -35,7 +31,7 @@ public interface ExceptionListener
* informs the connection's ExceptionListener
*
* @param exception The exception comming from the communication layer.
- * @see org.apache.qpid.nclient.qpidapi.Connection
+ * @see org.apache.qpid.nclient.api.Connection
*/
public void onException(QpidException exception);
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java
index 64b5876339..4aca6ea203 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java
@@ -26,10 +26,6 @@ import java.nio.ByteBuffer;
/**
* A message is sent and received by resources. It is composed of a set of header and a payload.
- * <p/>
- * Created by Arnaud Simon
- * Date: 20-Jul-2007
- * Time: 09:40:49
*/
public interface Message
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java
index b0dfa15bbf..5b844bb6c2 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java
@@ -20,10 +20,6 @@ package org.apache.qpid.nclient.api;
/**
* MessageListeners are used to asynchronously receive messages.
- *
- * Created by Arnaud Simon
- * Date: 2o-Jul-2007
- * Time: 09:42:52
*/
public interface MessageListener
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java
index d489e34364..e34238abc0 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java
@@ -24,11 +24,6 @@ import java.util.Set;
/**
* Used to receive messages from a queue
- *
- * <p/>
- * Created by Arnaud Simon
- * Date: 20-Jul-2007
- * Time: 09:42:37
*/
public interface MessageReceiver extends Resource
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageSender.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageSender.java
index 9b0a5af0d0..54badefcc6 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageSender.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageSender.java
@@ -22,10 +22,6 @@ import org.apache.qpidity.QpidException;
/**
* A sender is used to send message to its queue.
- * <p/>
- * Created by Arnaud Simon
- * Date: 22-Jul-2007
- * Time: 09:41:58
*/
public interface MessageSender extends Resource
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java
index 1585cc3071..212b0dca80 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java
@@ -22,10 +22,6 @@ import org.apache.qpidity.QpidException;
/**
* A Resource is associated with a session and can be independently closed.
- *
- * Created by Arnaud Simon
- * Date: 21-Jul-2007
- * Time: 09:41:30
*/
public interface Resource
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java
index c103e5bedd..5c960023dd 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java
@@ -25,10 +25,6 @@ import org.apache.qpidity.Option;
* A session is associated with a connection.
* <p> When created a Session is not attached with an underlying channel. Unsuspended a Session is
* equivalent to attaching a communication channel that can be used to communicate with the broker.
- * <p/>
- * Created by Arnaud Simon
- * Date: 20-Jul-2007
- * Time: 09:36:24
*/
public interface Session
{
@@ -198,7 +194,6 @@ public interface Session
* @param queueName The queue to be bound.
* @param exchangeName The exchange name.
* @param routingKey The routing key.
- * @param nowait nowait
* @throws QpidException If the session fails to bind the queue due to some error.
*/
public void queueBind(String queueName, String exchangeName, String routingKey)
@@ -224,7 +219,6 @@ public interface Session
* TODO: Define the exact semantic i.e. are message sent to a dead letter queue?
*
* @param queueName The queue to be purged
- * @param nowait nowait
* @throws QpidException If the session fails to purge the queue due to some error.
*/
public void queuePurge(String queueName)
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
index 416c5b71e5..8d7fab6216 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
@@ -30,9 +30,7 @@ import java.util.Vector;
/**
- * Created by Arnaud Simon
- * Date: 25-Jul-2007
- * Time: 09:47:17
+ *
*/
public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
{
@@ -106,9 +104,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
//---- Interface javax.jms.Connection ---//
- public Session createSession(boolean b, int i)
- throws
- JMSException
+ public Session createSession(boolean b, int i) throws JMSException
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@@ -123,9 +119,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @return The unique client identifier.
* @throws JMSException If this connection is closed.
*/
- public String getClientID()
- throws
- JMSException
+ public String getClientID() throws JMSException
{
checkNotClosed();
@@ -147,9 +141,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @param clientID the unique client identifier
* @throws JMSException Always as clientID is always set at construction time.
*/
- public void setClientID(String clientID)
- throws
- JMSException
+ public void setClientID(String clientID) throws JMSException
{
checkNotClosed();
throw new IllegalStateException("Client name cannot be changed after being set");
@@ -162,9 +154,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @throws JMSException If there ie a problem getting the connection metadata for this connection.
* @see javax.jms.ConnectionMetaData
*/
- public ConnectionMetaData getMetaData()
- throws
- JMSException
+ public ConnectionMetaData getMetaData() throws JMSException
{
checkNotClosed();
return ConnectionMetaDataImpl.getInstance();
@@ -176,9 +166,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @return the <CODE>ExceptionListener</CODE> for this connection
* @throws JMSException In case of unforeseen problem
*/
- public ExceptionListener getExceptionListener()
- throws
- JMSException
+ public ExceptionListener getExceptionListener() throws JMSException
{
checkNotClosed();
return _exceptionListener;
@@ -203,9 +191,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @param exceptionListener The connection listener.
* @throws JMSException If the connection is closed.
*/
- public void setExceptionListener(ExceptionListener exceptionListener)
- throws
- JMSException
+ public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException
{
checkNotClosed();
_exceptionListener = exceptionListener;
@@ -219,9 +205,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
*
* @throws JMSException In case of a problem due to some internal error.
*/
- public void start()
- throws
- JMSException
+ public void start() throws JMSException
{
checkNotClosed();
if (!_started)
@@ -246,9 +230,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
*
* @throws JMSException In case of a problem due to some internal error.
*/
- public void stop()
- throws
- JMSException
+ public void stop() throws JMSException
{
checkNotClosed();
if (_started)
@@ -278,9 +260,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
*
* @throws JMSException In case of a problem due to some internal error.
*/
- public void close()
- throws
- JMSException
+ public void close() throws JMSException
{
checkNotClosed();
if (!_isClosed)
@@ -317,9 +297,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @throws JMSException In case of a problem due to some internal error.
*/
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages)
- throws
- JMSException
+ ServerSessionPool sessionPool, int maxMessages) throws
+ JMSException
{
checkNotClosed();
return null;
@@ -338,9 +317,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
*/
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
String messageSelector, ServerSessionPool sessionPool,
- int maxMessages)
- throws
- JMSException
+ int maxMessages) throws JMSException
{
checkNotClosed();
return null;
@@ -348,9 +325,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
//-------------- QueueConnection API
- public QueueSession createQueueSession(boolean b, int i)
- throws
- JMSException
+ public QueueSession createQueueSession(boolean b, int i) throws JMSException
{
checkNotClosed();
//TODO: create a queue session
@@ -359,25 +334,21 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public ConnectionConsumer createConnectionConsumer(Queue queue, String string, ServerSessionPool serverSessionPool, int i)
- throws
- JMSException
+ public ConnectionConsumer createConnectionConsumer(Queue queue, String string, ServerSessionPool serverSessionPool,
+ int i) throws JMSException
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
//-------------- TopicConnection API
- public TopicSession createTopicSession(boolean b, int i)
- throws
- JMSException
+ public TopicSession createTopicSession(boolean b, int i) throws JMSException
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public ConnectionConsumer createConnectionConsumer(Topic topic, String string, ServerSessionPool serverSessionPool, int i)
- throws
- JMSException
+ public ConnectionConsumer createConnectionConsumer(Topic topic, String string, ServerSessionPool serverSessionPool,
+ int i) throws JMSException
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@@ -387,15 +358,12 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
/**
* Validate that the Connection is not closed.
* <p/>
- * If the Connection has been closed, throw a javax.jms.IllegalStateException. This behaviour is
+ * If the Connection has been closed, throw a IllegalStateException. This behaviour is
* required by the JMS specification.
*
- * @throws javax.jms.IllegalStateException
- * If the session is closed.
+ * @throws IllegalStateException If the session is closed.
*/
- protected synchronized void checkNotClosed()
- throws
- IllegalStateException
+ protected synchronized void checkNotClosed() throws IllegalStateException
{
if (_isClosed)
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java
index 05c759fc2d..58d4151ba2 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java
@@ -26,10 +26,6 @@ import java.util.Enumeration;
/**
* A <CODE>ConnectionMetaDataImpl</CODE> object provides information describing the
* JMS <CODE>Connection</CODE>.
- * <p/>
- * Created by Arnaud Simon
- * Date: 25-Jul-2007
- * Time: 10:47:20
*/
public class ConnectionMetaDataImpl implements ConnectionMetaData
{
@@ -53,8 +49,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData
// Provider minor version
private static final int PROVIDER_MINOR_VERSION = 10;
// Provider version
- private static final String PROVIDER_VERSION = QpidProperties.getProductName() + " (Client: ["
- + QpidProperties.getBuildVersion() + "] ; Protocol: [ 0.10 ] )";
+ private static final String PROVIDER_VERSION = QpidProperties.getProductName() + " (Client: [" + QpidProperties.getBuildVersion() + "] ; Protocol: [ 0.10 ] )";
/**
* Prevent instantiation.
@@ -81,9 +76,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData
* @return the JMS API version
* @throws JMSException Never
*/
- public String getJMSVersion()
- throws
- JMSException
+ public String getJMSVersion() throws JMSException
{
return JMS_VERSION;
}
@@ -95,9 +88,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData
* @return the JMS API major version number
* @throws JMSException Never
*/
- public int getJMSMajorVersion()
- throws
- JMSException
+ public int getJMSMajorVersion() throws JMSException
{
return JMS_MAJOR_VERSION;
}
@@ -109,9 +100,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData
* @return the JMS API minor version number
* @throws JMSException Never
*/
- public int getJMSMinorVersion()
- throws
- JMSException
+ public int getJMSMinorVersion() throws JMSException
{
return JMS_MINOR_VERSION;
}
@@ -123,9 +112,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData
* @return Qpid name
* @throws JMSException Never
*/
- public String getJMSProviderName()
- throws
- JMSException
+ public String getJMSProviderName() throws JMSException
{
return PROVIDER_NAME;
}
@@ -136,9 +123,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData
* @return Qpid version
* @throws JMSException Never
*/
- public String getProviderVersion()
- throws
- JMSException
+ public String getProviderVersion() throws JMSException
{
return PROVIDER_VERSION;
// TODO: We certainly can dynamically get the server version.
@@ -150,9 +135,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData
* @return Qpid major version number
* @throws JMSException Never
*/
- public int getProviderMajorVersion()
- throws
- JMSException
+ public int getProviderMajorVersion() throws JMSException
{
return PROVIDER_MAJOR_VERSION;
}
@@ -163,9 +146,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData
* @return Qpid minor version number
* @throws JMSException Never
*/
- public int getProviderMinorVersion()
- throws
- JMSException
+ public int getProviderMinorVersion() throws JMSException
{
return PROVIDER_MINOR_VERSION;
}
@@ -176,9 +157,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData
* @return an Enumeration of JMSX property names
* @throws JMSException if cannot retrieve metadata due to some internal error.
*/
- public Enumeration getJMSXPropertyNames()
- throws
- JMSException
+ public Enumeration getJMSXPropertyNames() throws JMSException
{
return CustomJMSXProperty.asEnumeration();
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/CustomJMSXProperty.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/CustomJMSXProperty.java
index 19dfc3a0b5..49da4a996d 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/CustomJMSXProperty.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/CustomJMSXProperty.java
@@ -32,11 +32,11 @@ public enum CustomJMSXProperty
public static synchronized Enumeration asEnumeration()
{
- if(_names == null)
+ if (_names == null)
{
CustomJMSXProperty[] properties = values();
ArrayList<String> nameList = new ArrayList<String>(properties.length);
- for(CustomJMSXProperty property : properties)
+ for (CustomJMSXProperty property : properties)
{
nameList.add(property.toString());
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java
index 02bad291fb..0a4c465815 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java
@@ -22,9 +22,7 @@ import org.apache.qpidity.QpidException;
import javax.jms.JMSException;
/**
- * Created by Arnaud Simon
- * Date: 25-Jul-2007
- * Time: 12:54:00
+ *Helper class for handling exceptions
*/
public class ExceptionHelper
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
new file mode 100644
index 0000000000..9ed96b1e22
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
@@ -0,0 +1,140 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient.jms;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.nclient.api.Resource;
+import org.apache.qpid.nclient.exception.QpidException;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+
+/**
+ * MessageActor is the superclass for MessageProducerImpl and MessageProducerImpl.
+ */
+public abstract class MessageActor
+{
+ /**
+ * Used for debugging.
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(MessageActor.class);
+
+ /**
+ * Indicates whether this MessageActor is closed.
+ */
+ boolean _isClosed = false;
+
+ /**
+ * This messageActor's session
+ */
+ SessionImpl _session;
+
+ /**
+ * The underlying Qpid Resource
+ */
+ Resource _qpidResource;
+
+ //-- Constructor
+
+ //TODO define the parameters
+
+ protected MessageActor()
+ {
+
+ }
+
+ protected MessageActor(SessionImpl session)
+ {
+ // TODO create the qpidResource _qpidResource =
+ _session = session;
+ }
+
+ //--- public methods (part of the jms public API)
+ /**
+ * Closes the MessageActor and deregister it from its session.
+ *
+ * @throws JMSException if the MessaeActor cannot be closed due to some internal error.
+ */
+ public void close() throws JMSException
+ {
+ if (!_isClosed)
+ {
+ closeMessageActor();
+ // notify the session that this message actor is closing
+
+ //TODO _session.removeActor(_actorID);
+ }
+ }
+
+ //-- protected methods
+ /**
+ * Check if this MessageActor is not closed.
+ * <p> If the MessageActor is closed, throw a javax.jms.IllegalStateException.
+ * <p> The method is not synchronized, since MessageProducers can only be used by a single thread.
+ *
+ * @throws IllegalStateException if the MessageActor is closed
+ */
+ protected void checkNotClosed() throws IllegalStateException
+ {
+ if (_isClosed || _session == null)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Actor " + this + " is already closed");
+ }
+ throw new IllegalStateException("Actor " + this + " is already closed");
+ }
+ _session.checkNotClosed();
+ }
+
+ /**
+ * Closes a MessageActor.
+ * <p> This method is invoked when the session is closing or when this
+ * messageActor is closing.
+ *
+ * @throws JMSException If the MessaeActor cannot be closed due to some internal error.
+ */
+ protected void closeMessageActor() throws JMSException
+ {
+ if (!_isClosed)
+ {
+ // close the underlying qpid resource
+ try
+ {
+ _qpidResource.close();
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ _isClosed = true;
+ }
+ }
+
+ /**
+ * Get the associated session object.
+ *
+ * @return This Actor's Session.
+ */
+ protected SessionImpl getSession()
+ {
+ return _session;
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
new file mode 100644
index 0000000000..80fba4843d
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
@@ -0,0 +1,148 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient.jms;
+
+import org.apache.qpid.nclient.api.MessageReceiver;
+import org.apache.qpid.nclient.exception.QpidException;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+
+/**
+ * Implementation of JMS message consumer
+ */
+public class MessageConsumerImpl extends MessageActor implements MessageConsumer
+{
+ /**
+ * The underlying qpid receiver
+ */
+ MessageReceiver _qpidReceiver;
+
+ /**
+ * This MessageConsumer's messageselector.
+ */
+ protected String _messageSelector = null;
+
+ /**
+ * A MessageListener set up for this consumer.
+ */
+ private MessageListener _messageListener = null;
+
+ //----- Message consumer API
+
+ /**
+ * Gets this MessageConsumer's message selector.
+ *
+ * @return This MessageConsumer's message selector, or null if no
+ * message selector exists for the message consumer (that is, if
+ * the message selector was not set or was set to null or the
+ * empty string)
+ * @throws JMSException if getting the message selector fails due to some internal error.
+ */
+ public String getMessageSelector() throws JMSException
+ {
+ checkNotClosed();
+ return _messageSelector;
+ }
+
+ /**
+ * Gets this MessageConsumer's <CODE>MessageListener</CODE>.
+ *
+ * @return The listener for the MessageConsumer, or null if no listener is set
+ * @throws JMSException if getting the message listener fails due to some internal error.
+ */
+ public MessageListener getMessageListener() throws JMSException
+ {
+ checkNotClosed();
+ return _messageListener;
+ }
+
+ /**
+ * Sets the MessageConsumer's <CODE>MessageListener</CODE>.
+ * <p> The JMS specification says:
+ * <P>Setting the message listener to null is the equivalent of
+ * unsetting the message listener for the message consumer.
+ * <P>The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
+ * while messages are being consumed by an existing listener
+ * or the consumer is being used to consume messages synchronously
+ * is undefined.
+ *
+ * @param messageListener The listener to which the messages are to be delivered
+ * @throws JMSException If setting the message listener fails due to some internal error.
+ */
+ public void setMessageListener(MessageListener messageListener) throws JMSException
+ {
+ checkNotClosed();
+ // create a message listener wrapper
+ }
+
+ public Message receive() throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Message receive(long l) throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Message receiveNoWait() throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ // not public methods
+ /**
+ * Stop the delivery of messages to this receiver.
+ * <p>For asynchronous receiver, this operation blocks until the message listener
+ * finishes processing the current message,
+ *
+ * @throws JMSException If the consumer cannot be stopped due to some internal error.
+ */
+ void stop() throws JMSException
+ {
+ try
+ {
+ _qpidReceiver.stop();
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Start the delivery of messages to this consumer.
+ *
+ * @throws JMSException If the consumer cannot be started due to some internal error.
+ */
+ void start() throws JMSException
+ {
+ try
+ {
+ _qpidReceiver.start();
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
new file mode 100644
index 0000000000..3e58247597
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
@@ -0,0 +1,115 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient.jms;
+
+import org.apache.qpid.nclient.api.MessageListener;
+import org.apache.qpid.nclient.api.Message;
+import org.apache.qpid.nclient.jms.message.AbstractJMSMessage;
+import org.apache.qpid.nclient.jms.message.QpidMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+
+/**
+ * This is a wrapper for the JMS message listener
+ */
+public class MessageListenerWrapper implements MessageListener
+{
+ /**
+ * Used for debugging.
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
+
+ /**
+ * This message listener consumer
+ */
+ MessageConsumerImpl _consumer = null;
+
+ /**
+ * The jms listener of this consumer.
+ */
+ javax.jms.MessageListener _jmsMessageLstener = null;
+
+ //---- constructor
+ /**
+ * Create a message listener wrapper for a given consumer
+ *
+ * @param consumer The consumer of this listener
+ */
+ public MessageListenerWrapper(MessageConsumerImpl consumer)
+ {
+ _consumer = consumer;
+ }
+
+ //---- org.apache.qpid.nclient.api.MessageListener API
+ /**
+ * Deliver a message to the listener.
+ *
+ * @param message The message delivered to the listner.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ // tell the session that a message is in process
+ _consumer.getSession().preProcessMessage((QpidMessage) message);
+ //TODO build the JMS message form a qpid message
+ AbstractJMSMessage jmsMessage = null;
+ // If the session is transacted we need to ack the message first
+ // This is because a message is associated with its tx only when acked
+ if (_consumer.getSession().getTransacted())
+ {
+ _consumer.getSession().acknowledgeMessage(jmsMessage);
+ }
+ // The JMS specs says:
+ /* The result of a listener throwing a RuntimeException depends on the session’s
+ * acknowledgment mode.
+ • --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
+ * will be immediately redelivered. The number of times a JMS provider will
+ * redeliver the same message before giving up is provider-dependent.
+ • --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
+ * --- Transacted Session - the next message for the listener is delivered.
+ *
+ * The number of time we try redelivering the message is 0
+ **/
+ try
+ {
+ _jmsMessageLstener.onMessage(jmsMessage);
+ }
+ catch (RuntimeException re)
+ {
+ // do nothing as this message will not be redelivered
+ }
+ // If the session has been recovered we then need to redelivered this message
+ if (_consumer.getSession().isInRecovery())
+ {
+ message.release();
+ }
+ // Tell the jms Session to ack this message if required
+ else if (!_consumer.getSession().getTransacted())
+ {
+ _consumer.getSession().acknowledgeMessage(jmsMessage);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java
new file mode 100644
index 0000000000..4bae376492
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java
@@ -0,0 +1,25 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient.jms;
+
+/**
+ *
+ */
+public class MessageProducerImpl extends MessageActor
+{
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidExceptionListenerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidExceptionListenerImpl.java
index dd63240021..6a849a5b9f 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidExceptionListenerImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidExceptionListenerImpl.java
@@ -23,9 +23,7 @@ import org.apache.qpidity.QpidException;
import javax.jms.JMSException;
/**
- * Created by Arnaud Simon
- * Date: 25-Jul-2007
- * Time: 12:08:47
+ * An exception listner
*/
public class QpidExceptionListenerImpl implements ExceptionListener
{
@@ -37,7 +35,7 @@ public class QpidExceptionListenerImpl implements ExceptionListener
void setJMSExceptionListner(javax.jms.ExceptionListener jmsExceptionListener)
{
- _jmsExceptionListener = jmsExceptionListener;
+ _jmsExceptionListener = jmsExceptionListener;
}
//----- ExceptionListener API
@@ -46,7 +44,7 @@ public class QpidExceptionListenerImpl implements ExceptionListener
// convert this exception in a JMS exception
JMSException jmsException = ExceptionHelper.convertQpidExceptionToJMSException(exception);
// propagate to the jms exception listener
- if( _jmsExceptionListener != null )
+ if (_jmsExceptionListener != null)
{
_jmsExceptionListener.onException(jmsException);
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java
index f5a3ca3f84..0b37c3af95 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java
@@ -20,9 +20,7 @@ package org.apache.qpid.nclient.jms;
import org.apache.qpid.nclient.jms.SessionImpl;
/**
- * Created by Arnaud Simon
- * Date: 25-Jul-2007
- * Time: 13:38:58
+ *
*/
public class QueueSessionImpl extends SessionImpl
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
index c68f558542..483257db88 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
@@ -17,11 +17,564 @@
*/
package org.apache.qpid.nclient.jms;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.nclient.exception.QpidException;
+import org.apache.qpid.nclient.jms.message.*;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.jms.Session;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Vector;
+
/**
- * Created by Arnaud Simon
- * Date: 25-Jul-2007
- * Time: 12:35:36
+ * Implementation of the JMS Session interface
*/
-public class SessionImpl
+public class SessionImpl implements Session
{
+ /**
+ * this session's logger
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
+
+ /**
+ * The messageConsumers of this session.
+ */
+ private ArrayList<MessageConsumerImpl> _messageConsumers = new ArrayList<MessageConsumerImpl>();
+
+ /**
+ * The messageProducers of this session.
+ */
+ private ArrayList<MessageProducerImpl> _messageProducers = new ArrayList<MessageProducerImpl>();
+
+ /**
+ * All the not yet acknoledged messages
+ * We use a vector as access to this list has to be synchronised
+ * This is because messages are acked from messagelistner threads
+ */
+ private Vector<QpidMessage> _unacknowledgedMessages = new Vector<QpidMessage>();
+
+ /**
+ * Indicates whether this session is closed.
+ */
+ private boolean _isClosed = false;
+
+ /**
+ * Used to indicate whether or not this is a transactional session.
+ */
+ private boolean _transacted;
+
+ /**
+ * Holds the sessions acknowledgement mode.
+ */
+ private int _acknowledgeMode;
+
+ /**
+ * The underlying QpidSession
+ */
+ private org.apache.qpid.nclient.api.Session _qpidSession;
+
+ /**
+ * Indicates whether this session is recovering
+ */
+ private boolean _inRecovery = false;
+
+ //--- javax.jms.Session API
+
+ /**
+ * Creates a <CODE>BytesMessage</CODE> object used to send a message
+ * containing a stream of uninterpreted bytes.
+ *
+ * @return A BytesMessage.
+ * @throws JMSException If Creating a BytesMessage object fails due to some internal error.
+ */
+ public BytesMessage createBytesMessage() throws JMSException
+ {
+ checkNotClosed();
+ return new JMSBytesMessage();
+ }
+
+ /**
+ * Creates a <CODE>MapMessage</CODE> object used to send a self-defining set
+ * of name-value pairs, where names are Strings and values are primitive values.
+ *
+ * @return A MapMessage.
+ * @throws JMSException If Creating a MapMessage object fails due to some internal error.
+ */
+ public MapMessage createMapMessage() throws JMSException
+ {
+ checkNotClosed();
+ return new JMSMapMessage();
+ }
+
+ /**
+ * Creates a <CODE>Message</CODE> object that holds all the
+ * standard message header information. It can be sent when a message
+ * containing only header information is sufficient.
+ * We simply return a ByteMessage
+ *
+ * @return A Message.
+ * @throws JMSException If Creating a Message object fails due to some internal error.
+ */
+ public Message createMessage() throws JMSException
+ {
+ return createBytesMessage();
+ }
+
+ /**
+ * Creates an <CODE>ObjectMessage</CODE> used to send a message
+ * that contains a serializable Java object.
+ *
+ * @return An ObjectMessage.
+ * @throws JMSException If Creating an ObjectMessage object fails due to some internal error.
+ */
+ public ObjectMessage createObjectMessage() throws JMSException
+ {
+ checkNotClosed();
+ return new JMSObjectMessage();
+ }
+
+ /**
+ * Creates an initialized <CODE>ObjectMessage</CODE> used to send a message that contains
+ * a serializable Java object.
+ *
+ * @param serializable The object to use to initialize this message.
+ * @return An initialised ObjectMessage.
+ * @throws JMSException If Creating an initialised ObjectMessage object fails due to some internal error.
+ */
+ public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException
+ {
+ ObjectMessage msg = createObjectMessage();
+ msg.setObject(serializable);
+ return msg;
+ }
+
+ /**
+ * Creates a <CODE>StreamMessage</CODE> used to send a
+ * self-defining stream of primitive values in the Java programming
+ * language.
+ *
+ * @return A StreamMessage
+ * @throws JMSException If Creating an StreamMessage object fails due to some internal error.
+ */
+ public StreamMessage createStreamMessage() throws JMSException
+ {
+ checkNotClosed();
+ return new JMSStreamMessage();
+ }
+
+ /**
+ * Creates a <CODE>TextMessage</CODE> object used to send a message containing a String.
+ *
+ * @return A TextMessage object
+ * @throws JMSException If Creating an TextMessage object fails due to some internal error.
+ */
+ public TextMessage createTextMessage() throws JMSException
+ {
+ checkNotClosed();
+ return new JMSTextMessage();
+ }
+
+ /**
+ * Creates an initialized <CODE>TextMessage</CODE> used to send
+ * a message containing a String.
+ *
+ * @param text The string used to initialize this message.
+ * @return An initialized TextMessage
+ * @throws JMSException If Creating an initialised TextMessage object fails due to some internal error.
+ */
+ public TextMessage createTextMessage(String text) throws JMSException
+ {
+ TextMessage msg = createTextMessage();
+ msg.setText(text);
+ return msg;
+ }
+
+ /**
+ * Indicates whether the session is in transacted mode.
+ *
+ * @return true if the session is in transacted mode
+ * @throws JMSException If geting the transaction mode fails due to some internal error.
+ */
+ public boolean getTransacted() throws JMSException
+ {
+ checkNotClosed();
+ return _transacted;
+ }
+
+ /**
+ * Returns the acknowledgement mode of this session.
+ * <p> The acknowledgement mode is set at the time that the session is created.
+ * If the session is transacted, the acknowledgement mode is ignored.
+ *
+ * @return If the session is not transacted, returns the current acknowledgement mode for the session.
+ * else returns SESSION_TRANSACTED.
+ * @throws JMSException if geting the acknowledgement mode fails due to some internal error.
+ */
+ public int getAcknowledgeMode() throws JMSException
+ {
+ checkNotClosed();
+ return _acknowledgeMode;
+ }
+
+ /**
+ * Commits all messages done in this transaction.
+ *
+ * @throws JMSException If committing the transaction fails due to some internal error.
+ * @throws TransactionRolledBackException If the transaction is rolled back due to some internal error during commit.
+ * @throws javax.jms.IllegalStateException
+ * If the method is not called by a transacted session.
+ */
+ public void commit() throws JMSException
+ {
+ checkNotClosed();
+ //make sure the Session is a transacted one
+ if (!_transacted)
+ {
+ throw new IllegalStateException("Cannot commit non-transacted session", "Session is not transacted");
+ }
+ // commit the underlying Qpid Session
+ try
+ {
+ // Note: this operation makes sure that asynch message processing has returned
+ _qpidSession.commit();
+ }
+ catch (org.apache.qpidity.QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Rolls back any messages done in this transaction.
+ *
+ * @throws JMSException If rolling back the session fails due to some internal error.
+ * @throws javax.jms.IllegalStateException
+ * If the method is not called by a transacted session.
+ */
+ public void rollback() throws JMSException
+ {
+ checkNotClosed();
+ //make sure the Session is a transacted one
+ if (!_transacted)
+ {
+ throw new IllegalStateException("Cannot rollback non-transacted session", "Session is not transacted");
+ }
+ // rollback the underlying Qpid Session
+ try
+ {
+ // Note: this operation makes sure that asynch message processing has returned
+ _qpidSession.rollback();
+ }
+ catch (org.apache.qpidity.QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Closes this session.
+ * <p> The JMS specification says
+ * <P> This call will block until a <CODE>receive</CODE> call or message
+ * listener in progress has completed. A blocked message consumer
+ * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed.
+ * <P>Closing a transacted session must roll back the transaction in progress.
+ * <P>This method is the only <CODE>Session</CODE> method that can be called concurrently.
+ * <P>Invoking any other <CODE>Session</CODE> method on a closed session
+ * must throw a <CODE>javax.jms.IllegalStateException</CODE>.
+ * <p> Closing a closed session must <I>not</I> throw an exception.
+ *
+ * @throws JMSException If closing the session fails due to some internal error.
+ */
+ public synchronized void close() throws JMSException
+ {
+ if (!_isClosed)
+ {
+ // from now all the session methods will throw a IllegalStateException
+ _isClosed = true;
+ // close all the actors
+ closeAllActors();
+ // close the underlaying QpidSession
+ try
+ {
+ _qpidSession.sessionClose();
+ }
+ catch ( org.apache.qpidity.QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+ }
+
+ /**
+ * Stops message delivery in this session, and restarts message delivery with
+ * the oldest unacknowledged message.
+ * <p>Recovering a session causes it to take the following actions:
+ * <ul>
+ * <li>Stop message delivery.
+ * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
+ * <li>Restart the delivery sequence including all unacknowledged messages that had been
+ * previously delivered.
+ * Redelivered messages do not have to be delivered in exactly their original delivery order.
+ * </ul>
+ *
+ * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error.
+ * Not that this does not necessarily mean that the recovery has failed, but simply that it is
+ * not possible to tell if it has or not.
+ */
+ public void recover() throws JMSException
+ {
+ // Ensure that the session is open.
+ checkNotClosed();
+ // we are recovering
+ _inRecovery = true;
+ // Ensure that the session is not transacted.
+ if (getTransacted())
+ {
+ throw new IllegalStateException("Session is transacted");
+ }
+ // release all unack messages
+ for(QpidMessage message : _unacknowledgedMessages)
+ {
+ // release all those messages
+ //Todo: message.getQpidMEssage.release();
+ }
+ }
+
+ /**
+ * Returns the session's distinguished message listener (optional).
+ * <p>This is an expert facility used only by Application Servers.
+ * <p> This is an optional operation that is not yet supported
+ *
+ * @return The message listener associated with this session.
+ * @throws JMSException If getting the message listener fails due to an internal error.
+ */
+ public MessageListener getMessageListener() throws JMSException
+ {
+ checkNotClosed();
+ throw new java.lang.UnsupportedOperationException();
+ }
+
+ /**
+ * Sets the session's distinguished message listener.
+ * <p>This is an expert facility used only by Application Servers.
+ * <p> This is an optional operation that is not yet supported
+ *
+ * @param messageListener The message listener to associate with this session
+ * @throws JMSException If setting the message listener fails due to an internal error.
+ */
+ public void setMessageListener(MessageListener messageListener) throws JMSException
+ {
+ checkNotClosed();
+ throw new java.lang.UnsupportedOperationException();
+ }
+
+ /**
+ * Optional operation, intended to be used only by Application Servers,
+ * not by ordinary JMS clients.
+ * <p> This is an optional operation that is not yet supported
+ */
+ public void run()
+ {
+ throw new java.lang.UnsupportedOperationException();
+ }
+
+ public MessageProducer createProducer(Destination destination) throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public MessageConsumer createConsumer(Destination destination) throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public MessageConsumer createConsumer(Destination destination, String string) throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public MessageConsumer createConsumer(Destination destination, String string, boolean b) throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Queue createQueue(String string) throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Topic createTopic(String string) throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String string) throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String string, String string1, boolean b) throws
+ JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public QueueBrowser createBrowser(Queue queue) throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public QueueBrowser createBrowser(Queue queue, String string) throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TemporaryQueue createTemporaryQueue() throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TemporaryTopic createTemporaryTopic() throws JMSException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ /**
+ * Unsubscribes a durable subscription that has been created by a client.
+ *
+ * <P>This method deletes the state being maintained on behalf of the
+ * subscriber by its provider.
+ *
+ * <P>It is erroneous for a client to delete a durable subscription
+ * while there is an active <CODE>TopicSubscriber</CODE> for the
+ * subscription, or while a consumed message is part of a pending
+ * transaction or has not been acknowledged in the session.
+ *
+ * @param name the name used to identify this subscription
+ *
+ * @exception JMSException if the session fails to unsubscribe to the durable subscription due to some internal error.
+ * @exception InvalidDestinationException if an invalid subscription name
+ * is specified.
+ */
+ public void unsubscribe(String name) throws JMSException
+ {
+ checkNotClosed();
+
+ }
+
+ //----- Protected methods
+ /**
+ * Notify this session that a message is processed
+ * @param message The processed message.
+ */
+ protected void preProcessMessage(QpidMessage message)
+ {
+ _inRecovery = false;
+ }
+
+ /**
+ * Indicate whether this session is recovering .
+ *
+ * @return true if this session is recovering.
+ */
+ protected boolean isInRecovery()
+ {
+ return _inRecovery;
+ }
+
+ /**
+ * Validate that the Session is not closed.
+ * <p/>
+ * If the Session has been closed, throw a IllegalStateException. This behaviour is
+ * required by the JMS specification.
+ *
+ * @throws IllegalStateException If the session is closed.
+ */
+ protected void checkNotClosed() throws IllegalStateException
+ {
+ if (_isClosed)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Session has been closed. Cannot invoke any further operations.");
+ }
+ throw new javax.jms.IllegalStateException("Session has been closed. Cannot invoke any further operations.");
+ }
+ }
+
+ /**
+ * A session keeps the list of unack messages only when the ack mode is
+ * set to client ack mode. Otherwise messages are always ack.
+ * <p> We can use an ack heuristic for dups ok mode where bunch of messages are ack.
+ * This has to be done.
+ *
+ * @param message The message to be acknowledged.
+ * @throws JMSException If the message cannot be acknowledged due to an internal error.
+ */
+ protected void acknowledgeMessage(QpidMessage message) throws JMSException
+ {
+ if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ // messages will be acknowldeged by the client application.
+ // store this message for acknowledging it afterward
+ _unacknowledgedMessages.add(message);
+ }
+ else
+ {
+ // acknowledge this message
+ // TODO: message.acknowledgeQpidMessge();
+ }
+ //TODO: Implement DUPS OK heuristic
+ }
+
+ /**
+ * This method is called when a message is acked.
+ * <p/>
+ * <P>Acknowledgment of a message automatically acknowledges all
+ * messages previously received by the session. Clients may
+ * individually acknowledge messages or they may choose to acknowledge
+ * messages in application defined groups (which is done by acknowledging
+ * the last received message in the group).
+ *
+ * @throws JMSException If this method is called on a closed session.
+ */
+ protected void acknowledge() throws JMSException
+ {
+ checkNotClosed();
+ if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ for (QpidMessage message : _unacknowledgedMessages)
+ {
+ // acknowledge this message
+ // TODO: message.acknowledgeQpidMessge();
+ }
+ //empty the list of unack messages
+ _unacknowledgedMessages.clear();
+ }
+ //else there is no effect
+ }
+
+ //------ Private Methods
+
+ /**
+ * Close the producer and the consumers of this session
+ *
+ * @throws JMSException If one of the MessaeActor cannot be closed due to some internal error.
+ */
+ private void closeAllActors() throws JMSException
+ {
+ for (MessageActor messageActor : _messageProducers)
+ {
+ messageActor.closeMessageActor();
+ }
+ for (MessageActor messageActor : _messageConsumers)
+ {
+ messageActor.closeMessageActor();
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java
index fe511fa398..db2515d562 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java
@@ -20,9 +20,7 @@ package org.apache.qpid.nclient.jms;
import org.apache.qpid.nclient.jms.SessionImpl;
/**
- * Created by Arnaud Simon
- * Date: 25-Jul-2007
- * Time: 13:39:35
+ *
*/
public class TopicSessionImpl extends SessionImpl
{
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesMessage.java
new file mode 100644
index 0000000000..5963a3780f
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesMessage.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * @author Apache Software Foundation
+ */
+public abstract class AbstractBytesMessage extends AbstractJMSMessage
+{
+
+ /**
+ * The default initial size of the buffer. The buffer expands automatically.
+ */
+ private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
+
+ AbstractBytesMessage()
+ {
+ this(null);
+ }
+
+ /**
+ * Construct a bytes message with existing data.
+ *
+ * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
+ * set to auto expand
+ */
+ AbstractBytesMessage(ByteBuffer data)
+ {
+ super(data); // this instanties a content header
+ getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
+
+ if (_data == null)
+ {
+ allocateInitialBuffer();
+ }
+ }
+
+ protected void allocateInitialBuffer()
+ {
+ _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
+ _data.setAutoExpand(true);
+ }
+
+ AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
+ {
+ // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
+ super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
+ getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
+ }
+
+ public void clearBodyImpl() throws JMSException
+ {
+ allocateInitialBuffer();
+ }
+
+ public String toBodyString() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return getText();
+ }
+ catch (IOException e)
+ {
+ JMSException jmse = new JMSException(e.toString());
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ }
+
+ /**
+ * We reset the stream before and after reading the data. This means that toString() will always output
+ * the entire message and also that the caller can then immediately start reading as if toString() had
+ * never been called.
+ *
+ * @return
+ * @throws IOException
+ */
+ private String getText() throws IOException
+ {
+ // this will use the default platform encoding
+ if (_data == null)
+ {
+ return null;
+ }
+
+ int pos = _data.position();
+ _data.rewind();
+ // one byte left is for the end of frame marker
+ if (_data.remaining() == 0)
+ {
+ // this is really redundant since pos must be zero
+ _data.position(pos);
+
+ return null;
+ }
+ else
+ {
+ String data = _data.getString(Charset.forName("UTF8").newDecoder());
+ _data.position(pos);
+
+ return data;
+ }
+ }
+
+ /**
+ * Check that there is at least a certain number of bytes available to read
+ *
+ * @param len the number of bytes
+ * @throws javax.jms.MessageEOFException if there are less than len bytes available to read
+ */
+ protected void checkAvailable(int len) throws MessageEOFException
+ {
+ if (_data.remaining() < len)
+ {
+ throw new MessageEOFException("Unable to read " + len + " bytes");
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesTypedMessage.java
new file mode 100644
index 0000000000..e7d51a6797
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesTypedMessage.java
@@ -0,0 +1,801 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+
+package org.apache.qpid.nclient.jms.message;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * @author Apache Software Foundation
+ */
+public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage
+{
+
+ protected static final byte BOOLEAN_TYPE = (byte) 1;
+
+ protected static final byte BYTE_TYPE = (byte) 2;
+
+ protected static final byte BYTEARRAY_TYPE = (byte) 3;
+
+ protected static final byte SHORT_TYPE = (byte) 4;
+
+ protected static final byte CHAR_TYPE = (byte) 5;
+
+ protected static final byte INT_TYPE = (byte) 6;
+
+ protected static final byte LONG_TYPE = (byte) 7;
+
+ protected static final byte FLOAT_TYPE = (byte) 8;
+
+ protected static final byte DOUBLE_TYPE = (byte) 9;
+
+ protected static final byte STRING_TYPE = (byte) 10;
+
+ protected static final byte NULL_STRING_TYPE = (byte) 11;
+
+ /**
+ * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
+ * a byte array in multiple chunks, hence this is used to track how much is left to be read
+ */
+ private int _byteArrayRemaining = -1;
+
+ AbstractBytesTypedMessage()
+ {
+ this(null);
+ }
+
+ /**
+ * Construct a stream message with existing data.
+ *
+ * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
+ * set to auto expand
+ */
+ AbstractBytesTypedMessage(ByteBuffer data)
+ {
+ super(data); // this instanties a content header
+ }
+
+
+ AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
+ {
+ super(messageNbr, contentHeader, exchange, routingKey, data);
+ }
+
+
+ protected byte readWireType() throws MessageFormatException, MessageEOFException,
+ MessageNotReadableException
+ {
+ checkReadable();
+ checkAvailable(1);
+ return _data.get();
+ }
+
+ protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException
+ {
+ checkWritable();
+ _data.put(type);
+ _changedData = true;
+ }
+
+ protected boolean readBoolean() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ boolean result;
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Boolean.parseBoolean(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private boolean readBooleanImpl()
+ {
+ return _data.get() != 0;
+ }
+
+ protected byte readByte() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ byte result;
+ try
+ {
+ switch (wireType)
+ {
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Byte.parseByte(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ return result;
+ }
+
+ private byte readByteImpl()
+ {
+ return _data.get();
+ }
+
+ protected short readShort() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ short result;
+ try
+ {
+ switch (wireType)
+ {
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Short.parseShort(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ return result;
+ }
+
+ private short readShortImpl()
+ {
+ return _data.getShort();
+ }
+
+ /**
+ * Note that this method reads a unicode character as two bytes from the stream
+ *
+ * @return the character read from the stream
+ * @throws javax.jms.JMSException
+ */
+ protected char readChar() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ try
+ {
+ if(wireType == NULL_STRING_TYPE){
+ throw new NullPointerException();
+ }
+
+ if (wireType != CHAR_TYPE)
+ {
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ }
+ else
+ {
+ checkAvailable(2);
+ return readCharImpl();
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private char readCharImpl()
+ {
+ return _data.getChar();
+ }
+
+ protected int readInt() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ int result;
+ try
+ {
+ switch (wireType)
+ {
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Integer.parseInt(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected int readIntImpl()
+ {
+ return _data.getInt();
+ }
+
+ protected long readLong() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ long result;
+ try
+ {
+ switch (wireType)
+ {
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Long.parseLong(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private long readLongImpl()
+ {
+ return _data.getLong();
+ }
+
+ protected float readFloat() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ float result;
+ try
+ {
+ switch (wireType)
+ {
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Float.parseFloat(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private float readFloatImpl()
+ {
+ return _data.getFloat();
+ }
+
+ protected double readDouble() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ double result;
+ try
+ {
+ switch (wireType)
+ {
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Double.parseDouble(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private double readDoubleImpl()
+ {
+ return _data.getDouble();
+ }
+
+ protected String readString() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ String result;
+ try
+ {
+ switch (wireType)
+ {
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ throw new NullPointerException("data is null");
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readBooleanImpl());
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readLongImpl());
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readIntImpl());
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readShortImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readByteImpl());
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readFloatImpl());
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readDoubleImpl());
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readCharImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected String readStringImpl() throws JMSException
+ {
+ try
+ {
+ return _data.getString(Charset.forName("UTF-8").newDecoder());
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+ je.setLinkedException(e);
+ throw je;
+ }
+ }
+
+ protected int readBytes(byte[] bytes) throws JMSException
+ {
+ if (bytes == null)
+ {
+ throw new IllegalArgumentException("byte array must not be null");
+ }
+ checkReadable();
+ // first call
+ if (_byteArrayRemaining == -1)
+ {
+ // type discriminator checked separately so you get a MessageFormatException rather than
+ // an EOF even in the case where both would be applicable
+ checkAvailable(1);
+ byte wireType = readWireType();
+ if (wireType != BYTEARRAY_TYPE)
+ {
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
+ }
+ checkAvailable(4);
+ int size = _data.getInt();
+ // length of -1 indicates null
+ if (size == -1)
+ {
+ return -1;
+ }
+ else
+ {
+ if (size > _data.remaining())
+ {
+ throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " +
+ _data.remaining() + " bytes");
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ }
+ }
+ }
+ else if (_byteArrayRemaining == 0)
+ {
+ _byteArrayRemaining = -1;
+ return -1;
+ }
+
+ int returnedSize = readBytesImpl(bytes);
+ if (returnedSize < bytes.length)
+ {
+ _byteArrayRemaining = -1;
+ }
+ return returnedSize;
+ }
+
+ private int readBytesImpl(byte[] bytes)
+ {
+ int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
+ _byteArrayRemaining -= count;
+
+ if (count == 0)
+ {
+ return 0;
+ }
+ else
+ {
+ _data.get(bytes, 0, count);
+ return count;
+ }
+ }
+
+ protected Object readObject() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ Object result = null;
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case BYTEARRAY_TYPE:
+ checkAvailable(4);
+ int size = _data.getInt();
+ if (size == -1)
+ {
+ result = null;
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ byte[] bytesResult = new byte[size];
+ readBytesImpl(bytesResult);
+ result = bytesResult;
+ }
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = readCharImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected void writeBoolean(boolean b) throws JMSException
+ {
+ writeTypeDiscriminator(BOOLEAN_TYPE);
+ _data.put(b ? (byte) 1 : (byte) 0);
+ }
+
+ protected void writeByte(byte b) throws JMSException
+ {
+ writeTypeDiscriminator(BYTE_TYPE);
+ _data.put(b);
+ }
+
+ protected void writeShort(short i) throws JMSException
+ {
+ writeTypeDiscriminator(SHORT_TYPE);
+ _data.putShort(i);
+ }
+
+ protected void writeChar(char c) throws JMSException
+ {
+ writeTypeDiscriminator(CHAR_TYPE);
+ _data.putChar(c);
+ }
+
+ protected void writeInt(int i) throws JMSException
+ {
+ writeTypeDiscriminator(INT_TYPE);
+ writeIntImpl(i);
+ }
+
+ protected void writeIntImpl(int i)
+ {
+ _data.putInt(i);
+ }
+
+ protected void writeLong(long l) throws JMSException
+ {
+ writeTypeDiscriminator(LONG_TYPE);
+ _data.putLong(l);
+ }
+
+ protected void writeFloat(float v) throws JMSException
+ {
+ writeTypeDiscriminator(FLOAT_TYPE);
+ _data.putFloat(v);
+ }
+
+ protected void writeDouble(double v) throws JMSException
+ {
+ writeTypeDiscriminator(DOUBLE_TYPE);
+ _data.putDouble(v);
+ }
+
+ protected void writeString(String string) throws JMSException
+ {
+ if (string == null)
+ {
+ writeTypeDiscriminator(NULL_STRING_TYPE);
+ }
+ else
+ {
+ writeTypeDiscriminator(STRING_TYPE);
+ try
+ {
+ writeStringImpl(string);
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException ex = new JMSException("Unable to encode string: " + e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
+ }
+ }
+
+ protected void writeStringImpl(String string)
+ throws CharacterCodingException
+ {
+ _data.putString(string, Charset.forName("UTF-8").newEncoder());
+ // we must write the null terminator ourselves
+ _data.put((byte) 0);
+ }
+
+ protected void writeBytes(byte[] bytes) throws JMSException
+ {
+ writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
+ }
+
+ protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+ {
+ writeTypeDiscriminator(BYTEARRAY_TYPE);
+ if (bytes == null)
+ {
+ _data.putInt(-1);
+ }
+ else
+ {
+ _data.putInt(length);
+ _data.put(bytes, offset, length);
+ }
+ }
+
+ protected void writeObject(Object object) throws JMSException
+ {
+ checkWritable();
+ Class clazz;
+
+ if (object == null)
+ {
+ // string handles the output of null values
+ clazz = String.class;
+ }
+ else
+ {
+ clazz = object.getClass();
+ }
+
+ if (clazz == Byte.class)
+ {
+ writeByte((Byte) object);
+ }
+ else if (clazz == Boolean.class)
+ {
+ writeBoolean((Boolean) object);
+ }
+ else if (clazz == byte[].class)
+ {
+ writeBytes((byte[]) object);
+ }
+ else if (clazz == Short.class)
+ {
+ writeShort((Short) object);
+ }
+ else if (clazz == Character.class)
+ {
+ writeChar((Character) object);
+ }
+ else if (clazz == Integer.class)
+ {
+ writeInt((Integer) object);
+ }
+ else if (clazz == Long.class)
+ {
+ writeLong((Long) object);
+ }
+ else if (clazz == Float.class)
+ {
+ writeFloat((Float) object);
+ }
+ else if (clazz == Double.class)
+ {
+ writeDouble((Double) object);
+ }
+ else if (clazz == String.class)
+ {
+ writeString((String) object);
+ }
+ else
+ {
+ throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java
new file mode 100644
index 0000000000..5ea830b2cc
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java
@@ -0,0 +1,685 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import org.apache.commons.collections.map.ReferenceMap;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.*;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.UUID;
+
+public abstract class AbstractJMSMessage extends QpidMessage implements org.apache.qpid.jms.Message
+{
+ private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+
+ protected boolean _redelivered;
+
+ protected ByteBuffer _data;
+ private boolean _readableProperties = false;
+ protected boolean _readableMessage = false;
+ protected boolean _changedData;
+ private Destination _destination;
+ private JMSHeaderAdapter _headerAdapter;
+ private BasicMessageConsumer _consumer;
+ private boolean _strictAMQP;
+
+ protected AbstractJMSMessage(ByteBuffer data)
+ {
+ super(new BasicContentHeaderProperties());
+ _data = data;
+ if (_data != null)
+ {
+ _data.acquire();
+ }
+
+ _readableProperties = false;
+ _readableMessage = (data != null);
+ _changedData = (data == null);
+ _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+
+ _strictAMQP =
+ Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
+ }
+
+ protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
+ {
+ this(contentHeader, deliveryTag);
+
+ Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
+
+ AMQDestination dest;
+
+ if (AMQDestination.QUEUE_TYPE.equals(type))
+ {
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ }
+ else if (AMQDestination.TOPIC_TYPE.equals(type))
+ {
+ dest = new AMQTopic(exchange, routingKey, null);
+ }
+ else
+ {
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ }
+ // Destination dest = AMQDestination.createDestination(url);
+ setJMSDestination(dest);
+
+ _data = data;
+ if (_data != null)
+ {
+ _data.acquire();
+ }
+
+ _readableMessage = data != null;
+
+ }
+
+ protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag)
+ {
+ super(contentHeader, deliveryTag);
+ _readableProperties = (_contentHeaderProperties != null);
+ _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+ }
+
+ public String getJMSMessageID() throws JMSException
+ {
+ if (getContentHeaderProperties().getMessageIdAsString() == null)
+ {
+ getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID());
+ }
+
+ return getContentHeaderProperties().getMessageIdAsString();
+ }
+
+ public void setJMSMessageID(String messageId) throws JMSException
+ {
+ getContentHeaderProperties().setMessageId(messageId);
+ }
+
+ public long getJMSTimestamp() throws JMSException
+ {
+ return getContentHeaderProperties().getTimestamp();
+ }
+
+ public void setJMSTimestamp(long timestamp) throws JMSException
+ {
+ getContentHeaderProperties().setTimestamp(timestamp);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+ {
+ return getContentHeaderProperties().getCorrelationIdAsString().getBytes();
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
+ {
+ getContentHeaderProperties().setCorrelationId(new String(bytes));
+ }
+
+ public void setJMSCorrelationID(String correlationId) throws JMSException
+ {
+ getContentHeaderProperties().setCorrelationId(correlationId);
+ }
+
+ public String getJMSCorrelationID() throws JMSException
+ {
+ return getContentHeaderProperties().getCorrelationIdAsString();
+ }
+
+ public Destination getJMSReplyTo() throws JMSException
+ {
+ String replyToEncoding = getContentHeaderProperties().getReplyToAsString();
+ if (replyToEncoding == null)
+ {
+ return null;
+ }
+ else
+ {
+ Destination dest = (Destination) _destinationCache.get(replyToEncoding);
+ if (dest == null)
+ {
+ try
+ {
+ BindingURL binding = new AMQBindingURL(replyToEncoding);
+ dest = AMQDestination.createDestination(binding);
+ }
+ catch (URLSyntaxException e)
+ {
+ throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
+ }
+
+ _destinationCache.put(replyToEncoding, dest);
+ }
+
+ return dest;
+ }
+ }
+
+ public void setJMSReplyTo(Destination destination) throws JMSException
+ {
+ if (destination == null)
+ {
+ throw new IllegalArgumentException("Null destination not allowed");
+ }
+
+ if (!(destination instanceof AMQDestination))
+ {
+ throw new IllegalArgumentException(
+ "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
+ }
+
+ final AMQDestination amqd = (AMQDestination) destination;
+
+ final AMQShortString encodedDestination = amqd.getEncodedName();
+ _destinationCache.put(encodedDestination, destination);
+ getContentHeaderProperties().setReplyTo(encodedDestination);
+ }
+
+ public Destination getJMSDestination() throws JMSException
+ {
+ return _destination;
+ }
+
+ public void setJMSDestination(Destination destination)
+ {
+ _destination = destination;
+ }
+
+ public int getJMSDeliveryMode() throws JMSException
+ {
+ return getContentHeaderProperties().getDeliveryMode();
+ }
+
+ public void setJMSDeliveryMode(int i) throws JMSException
+ {
+ getContentHeaderProperties().setDeliveryMode((byte) i);
+ }
+
+ public BasicContentHeaderProperties getContentHeaderProperties()
+ {
+ return (BasicContentHeaderProperties) _contentHeaderProperties;
+ }
+
+ public boolean getJMSRedelivered() throws JMSException
+ {
+ return _redelivered;
+ }
+
+ public void setJMSRedelivered(boolean b) throws JMSException
+ {
+ _redelivered = b;
+ }
+
+ public String getJMSType() throws JMSException
+ {
+ return getContentHeaderProperties().getTypeAsString();
+ }
+
+ public void setJMSType(String string) throws JMSException
+ {
+ getContentHeaderProperties().setType(string);
+ }
+
+ public long getJMSExpiration() throws JMSException
+ {
+ return getContentHeaderProperties().getExpiration();
+ }
+
+ public void setJMSExpiration(long l) throws JMSException
+ {
+ getContentHeaderProperties().setExpiration(l);
+ }
+
+ public int getJMSPriority() throws JMSException
+ {
+ return getContentHeaderProperties().getPriority();
+ }
+
+ public void setJMSPriority(int i) throws JMSException
+ {
+ getContentHeaderProperties().setPriority((byte) i);
+ }
+
+ public void clearProperties() throws JMSException
+ {
+ getJmsHeaders().clear();
+
+ _readableProperties = false;
+ }
+
+ public void clearBody() throws JMSException
+ {
+ clearBodyImpl();
+ _readableMessage = false;
+ }
+
+ public boolean propertyExists(AMQShortString propertyName) throws JMSException
+ {
+ return getJmsHeaders().propertyExists(propertyName);
+ }
+
+ public boolean propertyExists(String propertyName) throws JMSException
+ {
+ return getJmsHeaders().propertyExists(propertyName);
+ }
+
+ public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getBoolean(propertyName);
+ }
+
+ public boolean getBooleanProperty(String propertyName) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getBoolean(propertyName);
+ }
+
+ public byte getByteProperty(String propertyName) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getByte(propertyName);
+ }
+
+ public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getBytes(propertyName);
+ }
+
+ public short getShortProperty(String propertyName) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getShort(propertyName);
+ }
+
+ public int getIntProperty(String propertyName) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getInteger(propertyName);
+ }
+
+ public long getLongProperty(String propertyName) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getLong(propertyName);
+ }
+
+ public float getFloatProperty(String propertyName) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getFloat(propertyName);
+ }
+
+ public double getDoubleProperty(String propertyName) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getDouble(propertyName);
+ }
+
+ public String getStringProperty(String propertyName) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getString(propertyName);
+ }
+
+ public Object getObjectProperty(String propertyName) throws JMSException
+ {
+ return getJmsHeaders().getObject(propertyName);
+ }
+
+ public Enumeration getPropertyNames() throws JMSException
+ {
+ return getJmsHeaders().getPropertyNames();
+ }
+
+ public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setBoolean(propertyName, b);
+ }
+
+ public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setBoolean(propertyName, b);
+ }
+
+ public void setByteProperty(String propertyName, byte b) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setByte(propertyName, new Byte(b));
+ }
+
+ public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setBytes(propertyName, bytes);
+ }
+
+ public void setShortProperty(String propertyName, short i) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setShort(propertyName, new Short(i));
+ }
+
+ public void setIntProperty(String propertyName, int i) throws JMSException
+ {
+ checkWritableProperties();
+ JMSHeaderAdapter.checkPropertyName(propertyName);
+ super.setIntProperty(new AMQShortString(propertyName), new Integer(i));
+ }
+
+ public void setLongProperty(String propertyName, long l) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setLong(propertyName, new Long(l));
+ }
+
+ public void setFloatProperty(String propertyName, float f) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setFloat(propertyName, new Float(f));
+ }
+
+ public void setDoubleProperty(String propertyName, double v) throws JMSException
+ {
+ if (_strictAMQP)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setDouble(propertyName, new Double(v));
+ }
+
+ public void setStringProperty(String propertyName, String value) throws JMSException
+ {
+ checkWritableProperties();
+ JMSHeaderAdapter.checkPropertyName(propertyName);
+ super.setLongStringProperty(new AMQShortString(propertyName), value);
+ }
+
+ public void setObjectProperty(String propertyName, Object object) throws JMSException
+ {
+ checkWritableProperties();
+ getJmsHeaders().setObject(propertyName, object);
+ }
+
+ protected void removeProperty(AMQShortString propertyName) throws JMSException
+ {
+ getJmsHeaders().remove(propertyName);
+ }
+
+ protected void removeProperty(String propertyName) throws JMSException
+ {
+ getJmsHeaders().remove(propertyName);
+ }
+
+ public void acknowledgeThis() throws JMSException
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_session != null)
+ {
+ if (_session.getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _session.acknowledgeMessage(_deliveryTag, true);
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ if (_session != null)
+ {
+ _session.acknowledge();
+ }
+ }
+
+ /**
+ * This forces concrete classes to implement clearBody()
+ *
+ * @throws JMSException
+ */
+ public abstract void clearBodyImpl() throws JMSException;
+
+ /**
+ * Get a String representation of the body of the message. Used in the toString() method which outputs this before
+ * message properties.
+ */
+ public abstract String toBodyString() throws JMSException;
+
+ public String getMimeType()
+ {
+ return getMimeTypeAsShortString().toString();
+ }
+
+ public abstract AMQShortString getMimeTypeAsShortString();
+
+ public String toString()
+ {
+ try
+ {
+ StringBuffer buf = new StringBuffer("Body:\n");
+ buf.append(toBodyString());
+ buf.append("\nJMS Correlation ID: ").append(getJMSCorrelationID());
+ buf.append("\nJMS timestamp: ").append(getJMSTimestamp());
+ buf.append("\nJMS expiration: ").append(getJMSExpiration());
+ buf.append("\nJMS priority: ").append(getJMSPriority());
+ buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode());
+ buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
+ buf.append("\nJMS Redelivered: ").append(_redelivered);
+ buf.append("\nJMS Destination: ").append(getJMSDestination());
+ buf.append("\nJMS Type: ").append(getJMSType());
+ buf.append("\nJMS MessageID: ").append(getJMSMessageID());
+ buf.append("\nAMQ message number: ").append(_deliveryTag);
+
+ buf.append("\nProperties:");
+ if (getJmsHeaders().isEmpty())
+ {
+ buf.append("<NONE>");
+ }
+ else
+ {
+ buf.append('\n').append(getJmsHeaders().getHeaders());
+ }
+
+ return buf.toString();
+ }
+ catch (JMSException e)
+ {
+ return e.toString();
+ }
+ }
+
+ public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties)
+ {
+ getContentHeaderProperties().setHeaders(messageProperties);
+ }
+
+ public JMSHeaderAdapter getJmsHeaders()
+ {
+ return _headerAdapter;
+ }
+
+ public ByteBuffer getData()
+ {
+ // make sure we rewind the data just in case any method has moved the
+ // position beyond the start
+ if (_data != null)
+ {
+ reset();
+ }
+
+ return _data;
+ }
+
+ protected void checkReadable() throws MessageNotReadableException
+ {
+ if (!_readableMessage)
+ {
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
+ }
+ }
+
+ protected void checkWritable() throws MessageNotWriteableException
+ {
+ if (_readableMessage)
+ {
+ throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
+ }
+ }
+
+ protected void checkWritableProperties() throws MessageNotWriteableException
+ {
+ if (_readableProperties)
+ {
+ throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
+ }
+ }
+
+ public boolean isReadable()
+ {
+ return _readableMessage;
+ }
+
+ public boolean isWritable()
+ {
+ return !_readableMessage;
+ }
+
+ public void reset()
+ {
+ if (!_changedData)
+ {
+ _data.rewind();
+ }
+ else
+ {
+ _data.flip();
+ _changedData = false;
+ }
+ }
+
+ public void setConsumer(BasicMessageConsumer basicMessageConsumer)
+ {
+ _consumer = basicMessageConsumer;
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessageFactory.java
new file mode 100644
index 0000000000..39924f52f9
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessageFactory.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class AbstractJMSMessageFactory implements MessageFactory
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
+
+ protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange,
+ AMQShortString routingKey, ContentHeaderBody contentHeader) throws AMQException;
+
+ protected AbstractJMSMessage createMessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey, List bodies) throws AMQException
+ {
+ ByteBuffer data;
+ final boolean debug = _logger.isDebugEnabled();
+
+ // we optimise the non-fragmented case to avoid copying
+ if ((bodies != null) && (bodies.size() == 1))
+ {
+ if (debug)
+ {
+ _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")");
+ }
+
+ data = ((ContentBody) bodies.get(0)).payload;
+ }
+ else if (bodies != null)
+ {
+ if (debug)
+ {
+ _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize
+ + ")");
+ }
+
+ data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem?
+ final Iterator it = bodies.iterator();
+ while (it.hasNext())
+ {
+ ContentBody cb = (ContentBody) it.next();
+ data.put(cb.payload);
+ cb.payload.release();
+ }
+
+ data.flip();
+ }
+ else // bodies == null
+ {
+ data = ByteBuffer.allocate(0);
+ }
+
+ if (debug)
+ {
+ _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining="
+ + data.remaining());
+ }
+
+ return createMessage(messageNbr, data, exchange, routingKey, contentHeader);
+ }
+
+ public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey, List bodies) throws JMSException, AMQException
+ {
+ final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
+ msg.setJMSRedelivered(redelivered);
+
+ return msg;
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessage.java
new file mode 100644
index 0000000000..adea0f96ba
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessage.java
@@ -0,0 +1,388 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
+{
+ public static final String MIME_TYPE = "application/octet-stream";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
+
+ public JMSBytesMessage()
+ {
+ this(null);
+ }
+
+ /**
+ * Construct a bytes message with existing data.
+ *
+ * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
+ * set to auto expand
+ */
+ JMSBytesMessage(ByteBuffer data)
+ {
+ super(data); // this instanties a content header
+ }
+
+ JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
+ {
+ super(messageNbr, contentHeader, exchange, routingKey, data);
+ }
+
+ public void reset()
+ {
+ super.reset();
+ _readableMessage = true;
+ }
+
+ public AMQShortString getMimeTypeAsShortString()
+ {
+ return MIME_TYPE_SHORT_STRING;
+ }
+
+ public long getBodyLength() throws JMSException
+ {
+ checkReadable();
+ return _data.limit();
+ }
+
+ public boolean readBoolean() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(1);
+ return _data.get() != 0;
+ }
+
+ public byte readByte() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(1);
+ return _data.get();
+ }
+
+ public int readUnsignedByte() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(1);
+ return _data.getUnsigned();
+ }
+
+ public short readShort() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(2);
+ return _data.getShort();
+ }
+
+ public int readUnsignedShort() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(2);
+ return _data.getUnsignedShort();
+ }
+
+ /**
+ * Note that this method reads a unicode character as two bytes from the stream
+ *
+ * @return the character read from the stream
+ * @throws JMSException
+ */
+ public char readChar() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(2);
+ return _data.getChar();
+ }
+
+ public int readInt() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(4);
+ return _data.getInt();
+ }
+
+ public long readLong() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(8);
+ return _data.getLong();
+ }
+
+ public float readFloat() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(4);
+ return _data.getFloat();
+ }
+
+ public double readDouble() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(8);
+ return _data.getDouble();
+ }
+
+ public String readUTF() throws JMSException
+ {
+ checkReadable();
+ // we check only for one byte since theoretically the string could be only a
+ // single byte when using UTF-8 encoding
+
+ try
+ {
+ short length = readShort();
+ if(length == 0)
+ {
+ return "";
+ }
+ else
+ {
+ CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
+ ByteBuffer encodedString = _data.slice();
+ encodedString.limit(length);
+ _data.position(_data.position()+length);
+ CharBuffer string = decoder.decode(encodedString.buf());
+
+ return string.toString();
+ }
+
+
+
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+ je.setLinkedException(e);
+ throw je;
+ }
+ }
+
+ public int readBytes(byte[] bytes) throws JMSException
+ {
+ if (bytes == null)
+ {
+ throw new IllegalArgumentException("byte array must not be null");
+ }
+ checkReadable();
+ int count = (_data.remaining() >= bytes.length ? bytes.length : _data.remaining());
+ if (count == 0)
+ {
+ return -1;
+ }
+ else
+ {
+ _data.get(bytes, 0, count);
+ return count;
+ }
+ }
+
+ public int readBytes(byte[] bytes, int maxLength) throws JMSException
+ {
+ if (bytes == null)
+ {
+ throw new IllegalArgumentException("byte array must not be null");
+ }
+ if (maxLength > bytes.length)
+ {
+ throw new IllegalArgumentException("maxLength must be <= bytes.length");
+ }
+ checkReadable();
+ int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining());
+ if (count == 0)
+ {
+ return -1;
+ }
+ else
+ {
+ _data.get(bytes, 0, count);
+ return count;
+ }
+ }
+
+ public void writeBoolean(boolean b) throws JMSException
+ {
+ checkWritable();
+ _changedData = true;
+ _data.put(b ? (byte) 1 : (byte) 0);
+ }
+
+ public void writeByte(byte b) throws JMSException
+ {
+ checkWritable();
+ _changedData = true;
+ _data.put(b);
+ }
+
+ public void writeShort(short i) throws JMSException
+ {
+ checkWritable();
+ _changedData = true;
+ _data.putShort(i);
+ }
+
+ public void writeChar(char c) throws JMSException
+ {
+ checkWritable();
+ _changedData = true;
+ _data.putChar(c);
+ }
+
+ public void writeInt(int i) throws JMSException
+ {
+ checkWritable();
+ _changedData = true;
+ _data.putInt(i);
+ }
+
+ public void writeLong(long l) throws JMSException
+ {
+ checkWritable();
+ _changedData = true;
+ _data.putLong(l);
+ }
+
+ public void writeFloat(float v) throws JMSException
+ {
+ checkWritable();
+ _changedData = true;
+ _data.putFloat(v);
+ }
+
+ public void writeDouble(double v) throws JMSException
+ {
+ checkWritable();
+ _changedData = true;
+ _data.putDouble(v);
+ }
+
+ public void writeUTF(String string) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
+ java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
+
+ _data.putShort((short)encodedString.limit());
+ _data.put(encodedString);
+ _changedData = true;
+ //_data.putString(string, Charset.forName("UTF-8").newEncoder());
+ // we must add the null terminator manually
+ //_data.put((byte)0);
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException ex = new JMSException("Unable to encode string: " + e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
+ }
+
+ public void writeBytes(byte[] bytes) throws JMSException
+ {
+ checkWritable();
+ _data.put(bytes);
+ _changedData = true;
+ }
+
+ public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+ {
+ checkWritable();
+ _data.put(bytes, offset, length);
+ _changedData = true;
+ }
+
+ public void writeObject(Object object) throws JMSException
+ {
+ checkWritable();
+ if (object == null)
+ {
+ throw new NullPointerException("Argument must not be null");
+ }
+ Class clazz = object.getClass();
+ if (clazz == Byte.class)
+ {
+ writeByte((Byte) object);
+ }
+ else if (clazz == Boolean.class)
+ {
+ writeBoolean((Boolean) object);
+ }
+ else if (clazz == byte[].class)
+ {
+ writeBytes((byte[]) object);
+ }
+ else if (clazz == Short.class)
+ {
+ writeShort((Short) object);
+ }
+ else if (clazz == Character.class)
+ {
+ writeChar((Character) object);
+ }
+ else if (clazz == Integer.class)
+ {
+ writeInt((Integer) object);
+ }
+ else if (clazz == Long.class)
+ {
+ writeLong((Long) object);
+ }
+ else if (clazz == Float.class)
+ {
+ writeFloat((Float) object);
+ }
+ else if (clazz == Double.class)
+ {
+ writeDouble((Double) object);
+ }
+ else if (clazz == String.class)
+ {
+ writeUTF((String) object);
+ }
+ else
+ {
+ throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
+ }
+ }
+
+ public String toString()
+ {
+ return String.valueOf(System.identityHashCode(this));
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessageFactory.java
new file mode 100644
index 0000000000..2c59aeebe1
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessageFactory.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
+{
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
+ {
+ return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+ }
+
+ public AbstractJMSMessage createMessage() throws JMSException
+ {
+ return new JMSBytesMessage();
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSHeaderAdapter.java
new file mode 100644
index 0000000000..c19c0b3290
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSHeaderAdapter.java
@@ -0,0 +1,552 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import java.util.Enumeration;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+
+public final class JMSHeaderAdapter
+{
+ private final FieldTable _headers;
+
+ public JMSHeaderAdapter(FieldTable headers)
+ {
+ _headers = headers;
+ }
+
+
+ public FieldTable getHeaders()
+ {
+ return _headers;
+ }
+
+ public boolean getBoolean(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Boolean b = getHeaders().getBoolean(string);
+
+ if (b == null)
+ {
+ if (getHeaders().containsKey(string))
+ {
+ Object str = getHeaders().getObject(string);
+
+ if (str == null || !(str instanceof String))
+ {
+ throw new MessageFormatException("getBoolean can't use " + string + " item.");
+ }
+ else
+ {
+ return Boolean.valueOf((String) str);
+ }
+ }
+ else
+ {
+ b = Boolean.valueOf(null);
+ }
+ }
+
+ return b;
+ }
+
+ public boolean getBoolean(AMQShortString string) throws JMSException
+ {
+ checkPropertyName(string);
+ Boolean b = getHeaders().getBoolean(string);
+
+ if (b == null)
+ {
+ if (getHeaders().containsKey(string))
+ {
+ Object str = getHeaders().getObject(string);
+
+ if (str == null || !(str instanceof String))
+ {
+ throw new MessageFormatException("getBoolean can't use " + string + " item.");
+ }
+ else
+ {
+ return Boolean.valueOf((String) str);
+ }
+ }
+ else
+ {
+ b = Boolean.valueOf(null);
+ }
+ }
+
+ return b;
+ }
+
+ public char getCharacter(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Character c = getHeaders().getCharacter(string);
+
+ if (c == null)
+ {
+ if (getHeaders().isNullStringValue(string))
+ {
+ throw new NullPointerException("Cannot convert null char");
+ }
+ else
+ {
+ throw new MessageFormatException("getChar can't use " + string + " item.");
+ }
+ }
+ else
+ {
+ return (char) c;
+ }
+ }
+
+ public byte[] getBytes(String string) throws JMSException
+ {
+ return getBytes(new AMQShortString(string));
+ }
+
+ public byte[] getBytes(AMQShortString string) throws JMSException
+ {
+ checkPropertyName(string);
+
+ byte[] bs = getHeaders().getBytes(string);
+
+ if (bs == null)
+ {
+ throw new MessageFormatException("getBytes can't use " + string + " item.");
+ }
+ else
+ {
+ return bs;
+ }
+ }
+
+ public byte getByte(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Byte b = getHeaders().getByte(string);
+ if (b == null)
+ {
+ if (getHeaders().containsKey(string))
+ {
+ Object str = getHeaders().getObject(string);
+
+ if (str == null || !(str instanceof String))
+ {
+ throw new MessageFormatException("getByte can't use " + string + " item.");
+ }
+ else
+ {
+ return Byte.valueOf((String) str);
+ }
+ }
+ else
+ {
+ b = Byte.valueOf(null);
+ }
+ }
+
+ return b;
+ }
+
+ public short getShort(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Short s = getHeaders().getShort(string);
+
+ if (s == null)
+ {
+ s = Short.valueOf(getByte(string));
+ }
+
+ return s;
+ }
+
+ public int getInteger(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Integer i = getHeaders().getInteger(string);
+
+ if (i == null)
+ {
+ i = Integer.valueOf(getShort(string));
+ }
+
+ return i;
+ }
+
+ public long getLong(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Long l = getHeaders().getLong(string);
+
+ if (l == null)
+ {
+ l = Long.valueOf(getInteger(string));
+ }
+
+ return l;
+ }
+
+ public float getFloat(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Float f = getHeaders().getFloat(string);
+
+ if (f == null)
+ {
+ if (getHeaders().containsKey(string))
+ {
+ Object str = getHeaders().getObject(string);
+
+ if (str == null || !(str instanceof String))
+ {
+ throw new MessageFormatException("getFloat can't use " + string + " item.");
+ }
+ else
+ {
+ return Float.valueOf((String) str);
+ }
+ }
+ else
+ {
+ f = Float.valueOf(null);
+ }
+
+ }
+
+ return f;
+ }
+
+ public double getDouble(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ Double d = getHeaders().getDouble(string);
+
+ if (d == null)
+ {
+ d = Double.valueOf(getFloat(string));
+ }
+
+ return d;
+ }
+
+ public String getString(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ String s = getHeaders().getString(string);
+
+ if (s == null)
+ {
+ if (getHeaders().containsKey(string))
+ {
+ Object o = getHeaders().getObject(string);
+ if (o instanceof byte[])
+ {
+ throw new MessageFormatException("getObject couldn't find " + string + " item.");
+ }
+ else
+ {
+ if (o == null)
+ {
+ return null;
+ }
+ else
+ {
+ s = String.valueOf(o);
+ }
+ }
+ }//else return s // null;
+ }
+
+ return s;
+ }
+
+ public Object getObject(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ return getHeaders().getObject(string);
+ }
+
+ public void setBoolean(AMQShortString string, boolean b) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setBoolean(string, b);
+ }
+
+ public void setBoolean(String string, boolean b) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setBoolean(string, b);
+ }
+
+ public void setChar(String string, char c) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setChar(string, c);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes)
+ {
+ checkPropertyName(string);
+ return getHeaders().setBytes(string, bytes);
+ }
+
+ public Object setBytes(String string, byte[] bytes)
+ {
+ checkPropertyName(string);
+ return getHeaders().setBytes(string, bytes);
+ }
+
+ public Object setBytes(String string, byte[] bytes, int start, int length)
+ {
+ checkPropertyName(string);
+ return getHeaders().setBytes(string, bytes, start, length);
+ }
+
+ public void setByte(String string, byte b) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setByte(string, b);
+ }
+
+ public void setByte(AMQShortString string, byte b) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setByte(string, b);
+ }
+
+
+ public void setShort(String string, short i) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setShort(string, i);
+ }
+
+ public void setInteger(String string, int i) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setInteger(string, i);
+ }
+
+ public void setInteger(AMQShortString string, int i) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setInteger(string, i);
+ }
+
+ public void setLong(String string, long l) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setLong(string, l);
+ }
+
+ public void setFloat(String string, float v) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setFloat(string, v);
+ }
+
+ public void setDouble(String string, double v) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setDouble(string, v);
+ }
+
+ public void setString(String string, String string1) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setString(string, string1);
+ }
+
+ public void setString(AMQShortString string, String string1) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setString(string, string1);
+ }
+
+ public void setObject(String string, Object object) throws JMSException
+ {
+ checkPropertyName(string);
+ try
+ {
+ getHeaders().setObject(string, object);
+ }
+ catch (AMQPInvalidClassException aice)
+ {
+ MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ mfe.setLinkedException(aice);
+ throw mfe;
+ }
+ }
+
+ public boolean itemExists(String string) throws JMSException
+ {
+ checkPropertyName(string);
+ return getHeaders().containsKey(string);
+ }
+
+ public Enumeration getPropertyNames()
+ {
+ return getHeaders().getPropertyNames();
+ }
+
+ public void clear()
+ {
+ getHeaders().clear();
+ }
+
+ public boolean propertyExists(AMQShortString propertyName)
+ {
+ checkPropertyName(propertyName);
+ return getHeaders().propertyExists(propertyName);
+ }
+
+ public boolean propertyExists(String propertyName)
+ {
+ checkPropertyName(propertyName);
+ return getHeaders().propertyExists(propertyName);
+ }
+
+ public Object put(Object key, Object value)
+ {
+ checkPropertyName(key.toString());
+ return getHeaders().setObject(key.toString(), value);
+ }
+
+ public Object remove(AMQShortString propertyName)
+ {
+ checkPropertyName(propertyName);
+ return getHeaders().remove(propertyName);
+ }
+
+ public Object remove(String propertyName)
+ {
+ checkPropertyName(propertyName);
+ return getHeaders().remove(propertyName);
+ }
+
+ public boolean isEmpty()
+ {
+ return getHeaders().isEmpty();
+ }
+
+ public void writeToBuffer(ByteBuffer data)
+ {
+ getHeaders().writeToBuffer(data);
+ }
+
+ public Enumeration getMapNames()
+ {
+ return getPropertyNames();
+ }
+
+ protected static void checkPropertyName(CharSequence propertyName)
+ {
+ if (propertyName == null)
+ {
+ throw new IllegalArgumentException("Property name must not be null");
+ }
+ else if (propertyName.length() == 0)
+ {
+ throw new IllegalArgumentException("Property name must not be the empty string");
+ }
+
+ checkIdentiferFormat(propertyName);
+ }
+
+ protected static void checkIdentiferFormat(CharSequence propertyName)
+ {
+// JMS requirements 3.5.1 Property Names
+// Identifiers:
+// - An identifier is an unlimited-length character sequence that must begin
+// with a Java identifier start character; all following characters must be Java
+// identifier part characters. An identifier start character is any character for
+// which the method Character.isJavaIdentifierStart returns true. This includes
+// '_' and '$'. An identifier part character is any character for which the
+// method Character.isJavaIdentifierPart returns true.
+// - Identifiers cannot be the names NULL, TRUE, or FALSE.
+// � Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
+// ESCAPE.
+// � Identifiers are either header field references or property references. The
+// type of a property value in a message selector corresponds to the type
+// used to set the property. If a property that does not exist in a message is
+// referenced, its value is NULL. The semantics of evaluating NULL values
+// in a selector are described in Section 3.8.1.2, �Null Values.�
+// � The conversions that apply to the get methods for properties do not
+// apply when a property is used in a message selector expression. For
+// example, suppose you set a property as a string value, as in the
+// following:
+// myMessage.setStringProperty("NumberOfOrders", "2");
+// The following expression in a message selector would evaluate to false,
+// because a string cannot be used in an arithmetic expression:
+// "NumberOfOrders > 1"
+// � Identifiers are case sensitive.
+// � Message header field references are restricted to JMSDeliveryMode,
+// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
+// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
+// null and if so are treated as a NULL value.
+
+ if (Boolean.getBoolean("strict-jms"))
+ {
+ // JMS start character
+ if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
+ }
+
+ // JMS part character
+ int length = propertyName.length();
+ for (int c = 1; c < length; c++)
+ {
+ if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
+ }
+ }
+
+ // JMS invalid names
+ if ((propertyName.equals("NULL")
+ || propertyName.equals("TRUE")
+ || propertyName.equals("FALSE")
+ || propertyName.equals("NOT")
+ || propertyName.equals("AND")
+ || propertyName.equals("OR")
+ || propertyName.equals("BETWEEN")
+ || propertyName.equals("LIKE")
+ || propertyName.equals("IN")
+ || propertyName.equals("IS")
+ || propertyName.equals("ESCAPE")))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
+ }
+ }
+
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessage.java
new file mode 100644
index 0000000000..2d90157410
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessage.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
+import java.nio.charset.CharacterCodingException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage
+{
+ private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class);
+
+ public static final String MIME_TYPE = "jms/map-message";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
+ private Map<String, Object> _map = new HashMap<String, Object>();
+
+ public JMSMapMessage() throws JMSException
+ {
+ this(null);
+ }
+
+ JMSMapMessage(ByteBuffer data) throws JMSException
+ {
+ super(data); // this instantiates a content header
+ populateMapFromData();
+ }
+
+ JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+ ByteBuffer data) throws AMQException
+ {
+ super(messageNbr, contentHeader, exchange, routingKey, data);
+ try
+ {
+ populateMapFromData();
+ }
+ catch (JMSException je)
+ {
+ throw new AMQException(null, "Error populating MapMessage from ByteBuffer", je);
+
+ }
+
+ }
+
+ public String toBodyString() throws JMSException
+ {
+ return _map.toString();
+ }
+
+ public AMQShortString getMimeTypeAsShortString()
+ {
+ return MIME_TYPE_SHORT_STRING;
+ }
+
+ public ByteBuffer getData()
+ {
+ // What if _data is null?
+ writeMapToData();
+
+ return super.getData();
+ }
+
+ @Override
+ public void clearBodyImpl() throws JMSException
+ {
+ super.clearBodyImpl();
+ _map.clear();
+ }
+
+ public boolean getBoolean(String propName) throws JMSException
+ {
+ Object value = _map.get(propName);
+
+ if (value instanceof Boolean)
+ {
+ return ((Boolean) value).booleanValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Boolean.valueOf((String) value);
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to boolean.");
+ }
+
+ }
+
+ public byte getByte(String propName) throws JMSException
+ {
+ Object value = _map.get(propName);
+
+ if (value instanceof Byte)
+ {
+ return ((Byte) value).byteValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Byte.valueOf((String) value).byteValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to byte.");
+ }
+ }
+
+ public short getShort(String propName) throws JMSException
+ {
+ Object value = _map.get(propName);
+
+ if (value instanceof Short)
+ {
+ return ((Short) value).shortValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).shortValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Short.valueOf((String) value).shortValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to short.");
+ }
+
+ }
+
+ public int getInt(String propName) throws JMSException
+ {
+ Object value = _map.get(propName);
+
+ if (value instanceof Integer)
+ {
+ return ((Integer) value).intValue();
+ }
+ else if (value instanceof Short)
+ {
+ return ((Short) value).intValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).intValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Integer.valueOf((String) value).intValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to int.");
+ }
+
+ }
+
+ public long getLong(String propName) throws JMSException
+ {
+ Object value = _map.get(propName);
+
+ if (value instanceof Long)
+ {
+ return ((Long) value).longValue();
+ }
+ else if (value instanceof Integer)
+ {
+ return ((Integer) value).longValue();
+ }
+
+ if (value instanceof Short)
+ {
+ return ((Short) value).longValue();
+ }
+
+ if (value instanceof Byte)
+ {
+ return ((Byte) value).longValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Long.valueOf((String) value).longValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to long.");
+ }
+
+ }
+
+ public char getChar(String propName) throws JMSException
+ {
+ Object value = _map.get(propName);
+
+ if (!_map.containsKey(propName))
+ {
+ throw new MessageFormatException("Property " + propName + " not present");
+ }
+ else if (value instanceof Character)
+ {
+ return ((Character) value).charValue();
+ }
+ else if (value == null)
+ {
+ throw new NullPointerException("Property " + propName + " has null value and therefore cannot "
+ + "be converted to char.");
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to boolan.");
+ }
+
+ }
+
+ public float getFloat(String propName) throws JMSException
+ {
+ Object value = _map.get(propName);
+
+ if (value instanceof Float)
+ {
+ return ((Float) value).floatValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Float.valueOf((String) value).floatValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to float.");
+ }
+ }
+
+ public double getDouble(String propName) throws JMSException
+ {
+ Object value = _map.get(propName);
+
+ if (value instanceof Double)
+ {
+ return ((Double) value).doubleValue();
+ }
+ else if (value instanceof Float)
+ {
+ return ((Float) value).doubleValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Double.valueOf((String) value).doubleValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to double.");
+ }
+ }
+
+ public String getString(String propName) throws JMSException
+ {
+ Object value = _map.get(propName);
+
+ if ((value instanceof String) || (value == null))
+ {
+ return (String) value;
+ }
+ else if (value instanceof byte[])
+ {
+ throw new MessageFormatException("Property " + propName + " of type byte[] " + "cannot be converted to String.");
+ }
+ else
+ {
+ return value.toString();
+ }
+
+ }
+
+ public byte[] getBytes(String propName) throws JMSException
+ {
+ Object value = _map.get(propName);
+
+ if (!_map.containsKey(propName))
+ {
+ throw new MessageFormatException("Property " + propName + " not present");
+ }
+ else if ((value instanceof byte[]) || (value == null))
+ {
+ return (byte[]) value;
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+ + " cannot be converted to byte[].");
+ }
+ }
+
+ public Object getObject(String propName) throws JMSException
+ {
+ return _map.get(propName);
+ }
+
+ public Enumeration getMapNames() throws JMSException
+ {
+ return Collections.enumeration(_map.keySet());
+ }
+
+ public void setBoolean(String propName, boolean b) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(propName);
+ _map.put(propName, b);
+ }
+
+ public void setByte(String propName, byte b) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(propName);
+ _map.put(propName, b);
+ }
+
+ public void setShort(String propName, short i) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(propName);
+ _map.put(propName, i);
+ }
+
+ public void setChar(String propName, char c) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(propName);
+ _map.put(propName, c);
+ }
+
+ public void setInt(String propName, int i) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(propName);
+ _map.put(propName, i);
+ }
+
+ public void setLong(String propName, long l) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(propName);
+ _map.put(propName, l);
+ }
+
+ public void setFloat(String propName, float v) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(propName);
+ _map.put(propName, v);
+ }
+
+ public void setDouble(String propName, double v) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(propName);
+ _map.put(propName, v);
+ }
+
+ public void setString(String propName, String string1) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(propName);
+ _map.put(propName, string1);
+ }
+
+ public void setBytes(String propName, byte[] bytes) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(propName);
+ _map.put(propName, bytes);
+ }
+
+ public void setBytes(String propName, byte[] bytes, int offset, int length) throws JMSException
+ {
+ if ((offset == 0) && (length == bytes.length))
+ {
+ setBytes(propName, bytes);
+ }
+ else
+ {
+ byte[] newBytes = new byte[length];
+ System.arraycopy(bytes, offset, newBytes, 0, length);
+ setBytes(propName, newBytes);
+ }
+ }
+
+ public void setObject(String propName, Object value) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(propName);
+ if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer)
+ || (value instanceof Long) || (value instanceof Character) || (value instanceof Float)
+ || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null))
+ {
+ _map.put(propName, value);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot set property " + propName + " to value " + value + "of type "
+ + value.getClass().getName() + ".");
+ }
+ }
+
+ private void checkPropertyName(String propName)
+ {
+ if ((propName == null) || propName.equals(""))
+ {
+ throw new IllegalArgumentException("Property name cannot be null, or the empty String.");
+ }
+ }
+
+ public boolean itemExists(String propName) throws JMSException
+ {
+ return _map.containsKey(propName);
+ }
+
+ private void populateMapFromData() throws JMSException
+ {
+ if (_data != null)
+ {
+ _data.rewind();
+
+ final int entries = readIntImpl();
+ for (int i = 0; i < entries; i++)
+ {
+ String propName = readStringImpl();
+ Object value = readObject();
+ _map.put(propName, value);
+ }
+ }
+ else
+ {
+ _map.clear();
+ }
+ }
+
+ private void writeMapToData()
+ {
+ allocateInitialBuffer();
+ final int size = _map.size();
+ writeIntImpl(size);
+ for (Map.Entry<String, Object> entry : _map.entrySet())
+ {
+ try
+ {
+ writeStringImpl(entry.getKey());
+ }
+ catch (CharacterCodingException e)
+ {
+ throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e);
+
+ }
+
+ try
+ {
+ writeObject(entry.getValue());
+ }
+ catch (JMSException e)
+ {
+ Object value = entry.getValue();
+ throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + " value : " + value
+ + " (type: " + value.getClass().getName() + ").", e);
+ }
+ }
+
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessageFactory.java
new file mode 100644
index 0000000000..c5594862e2
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessageFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class JMSMapMessageFactory extends AbstractJMSMessageFactory
+{
+ public AbstractJMSMessage createMessage() throws JMSException
+ {
+ return new JMSMapMessage();
+ }
+
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
+ {
+ return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessage.java
new file mode 100644
index 0000000000..4afb7f050f
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessage.java
@@ -0,0 +1,197 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.ObjectMessage;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
+{
+ public static final String MIME_TYPE = "application/java-object-stream";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
+ private static final int DEFAULT_BUFFER_SIZE = 1024;
+
+ /**
+ * Creates empty, writable message for use by producers
+ */
+ public JMSObjectMessage()
+ {
+ this(null);
+ }
+
+ private JMSObjectMessage(ByteBuffer data)
+ {
+ super(data);
+ if (data == null)
+ {
+ _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ _data.setAutoExpand(true);
+ }
+
+ getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
+ }
+
+ /**
+ * Creates read only message for delivery to consumers
+ */
+ JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+ ByteBuffer data) throws AMQException
+ {
+ super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
+ }
+
+ public void clearBodyImpl() throws JMSException
+ {
+ if (_data != null)
+ {
+ _data.release();
+ }
+
+ _data = null;
+
+ }
+
+ public String toBodyString() throws JMSException
+ {
+ return toString(_data);
+ }
+
+ public AMQShortString getMimeTypeAsShortString()
+ {
+ return MIME_TYPE_SHORT_STRING;
+ }
+
+ public void setObject(Serializable serializable) throws JMSException
+ {
+ checkWritable();
+
+ if (_data == null)
+ {
+ _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ _data.setAutoExpand(true);
+ }
+ else
+ {
+ _data.rewind();
+ }
+
+ try
+ {
+ ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
+ out.writeObject(serializable);
+ out.flush();
+ out.close();
+ }
+ catch (IOException e)
+ {
+ MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e);
+ mfe.setLinkedException(e);
+ throw mfe;
+ }
+
+ }
+
+ public Serializable getObject() throws JMSException
+ {
+ ObjectInputStream in = null;
+ if (_data == null)
+ {
+ return null;
+ }
+
+ try
+ {
+ _data.rewind();
+ in = new ObjectInputStream(_data.asInputStream());
+
+ return (Serializable) in.readObject();
+ }
+ catch (IOException e)
+ {
+ MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
+ mfe.setLinkedException(e);
+ throw mfe;
+ }
+ catch (ClassNotFoundException e)
+ {
+ MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
+ mfe.setLinkedException(e);
+ throw mfe;
+ }
+ finally
+ {
+ _data.rewind();
+ close(in);
+ }
+ }
+
+ private static void close(InputStream in)
+ {
+ try
+ {
+ if (in != null)
+ {
+ in.close();
+ }
+ }
+ catch (IOException ignore)
+ { }
+ }
+
+ private static String toString(ByteBuffer data)
+ {
+ if (data == null)
+ {
+ return null;
+ }
+
+ int pos = data.position();
+ try
+ {
+ return data.getString(Charset.forName("UTF8").newDecoder());
+ }
+ catch (CharacterCodingException e)
+ {
+ return null;
+ }
+ finally
+ {
+ data.position(pos);
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessageFactory.java
new file mode 100644
index 0000000000..764984eea5
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessageFactory.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
+{
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
+ {
+ return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+ }
+
+ public AbstractJMSMessage createMessage() throws JMSException
+ {
+ return new JMSObjectMessage();
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessage.java
new file mode 100644
index 0000000000..e889936d20
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessage.java
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+import javax.jms.StreamMessage;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage
+{
+ public static final String MIME_TYPE="jms/stream-message";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
+
+ /**
+ * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
+ * a byte array in multiple chunks, hence this is used to track how much is left to be read
+ */
+ private int _byteArrayRemaining = -1;
+
+ public JMSStreamMessage()
+ {
+ this(null);
+ }
+
+ /**
+ * Construct a stream message with existing data.
+ *
+ * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
+ * set to auto expand
+ */
+ JMSStreamMessage(ByteBuffer data)
+ {
+ super(data); // this instanties a content header
+ }
+
+
+ JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
+ {
+ super(messageNbr, contentHeader, exchange, routingKey, data);
+ }
+
+ public void reset()
+ {
+ super.reset();
+ _readableMessage = true;
+ }
+
+ public AMQShortString getMimeTypeAsShortString()
+ {
+ return MIME_TYPE_SHORT_STRING;
+ }
+
+
+
+ public boolean readBoolean() throws JMSException
+ {
+ return super.readBoolean();
+ }
+
+
+ public byte readByte() throws JMSException
+ {
+ return super.readByte();
+ }
+
+ public short readShort() throws JMSException
+ {
+ return super.readShort();
+ }
+
+ /**
+ * Note that this method reads a unicode character as two bytes from the stream
+ *
+ * @return the character read from the stream
+ * @throws JMSException
+ */
+ public char readChar() throws JMSException
+ {
+ return super.readChar();
+ }
+
+ public int readInt() throws JMSException
+ {
+ return super.readInt();
+ }
+
+ public long readLong() throws JMSException
+ {
+ return super.readLong();
+ }
+
+ public float readFloat() throws JMSException
+ {
+ return super.readFloat();
+ }
+
+ public double readDouble() throws JMSException
+ {
+ return super.readDouble();
+ }
+
+ public String readString() throws JMSException
+ {
+ return super.readString();
+ }
+
+ public int readBytes(byte[] bytes) throws JMSException
+ {
+ return super.readBytes(bytes);
+ }
+
+
+ public Object readObject() throws JMSException
+ {
+ return super.readObject();
+ }
+
+ public void writeBoolean(boolean b) throws JMSException
+ {
+ super.writeBoolean(b);
+ }
+
+ public void writeByte(byte b) throws JMSException
+ {
+ super.writeByte(b);
+ }
+
+ public void writeShort(short i) throws JMSException
+ {
+ super.writeShort(i);
+ }
+
+ public void writeChar(char c) throws JMSException
+ {
+ super.writeChar(c);
+ }
+
+ public void writeInt(int i) throws JMSException
+ {
+ super.writeInt(i);
+ }
+
+ public void writeLong(long l) throws JMSException
+ {
+ super.writeLong(l);
+ }
+
+ public void writeFloat(float v) throws JMSException
+ {
+ super.writeFloat(v);
+ }
+
+ public void writeDouble(double v) throws JMSException
+ {
+ super.writeDouble(v);
+ }
+
+ public void writeString(String string) throws JMSException
+ {
+ super.writeString(string);
+ }
+
+ public void writeBytes(byte[] bytes) throws JMSException
+ {
+ super.writeBytes(bytes);
+ }
+
+ public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+ {
+ super.writeBytes(bytes,offset,length);
+ }
+
+ public void writeObject(Object object) throws JMSException
+ {
+ super.writeObject(object);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessageFactory.java
new file mode 100644
index 0000000000..40fe6da228
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessageFactory.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
+{
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
+ {
+ return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+ }
+
+ public AbstractJMSMessage createMessage() throws JMSException
+ {
+ return new JMSStreamMessage();
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessage.java
new file mode 100644
index 0000000000..83edc3dea6
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessage.java
@@ -0,0 +1,201 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+
+public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
+{
+ private static final String MIME_TYPE = "text/plain";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
+
+ private String _decodedValue;
+
+ /**
+ * This constant represents the name of a property that is set when the message payload is null.
+ */
+ private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName();
+ private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
+
+ public JMSTextMessage() throws JMSException
+ {
+ this(null, null);
+ }
+
+ JMSTextMessage(ByteBuffer data, String encoding) throws JMSException
+ {
+ super(data); // this instantiates a content header
+ getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
+ getContentHeaderProperties().setEncoding(encoding);
+ }
+
+ JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data)
+ throws AMQException
+ {
+ super(deliveryTag, contentHeader, exchange, routingKey, data);
+ contentHeader.setContentType(MIME_TYPE_SHORT_STRING);
+ _data = data;
+ }
+
+ JMSTextMessage(ByteBuffer data) throws JMSException
+ {
+ this(data, null);
+ }
+
+ JMSTextMessage(String text) throws JMSException
+ {
+ super((ByteBuffer) null);
+ setText(text);
+ }
+
+ public void clearBodyImpl() throws JMSException
+ {
+ if (_data != null)
+ {
+ _data.release();
+ }
+ _data = null;
+ _decodedValue = null;
+ }
+
+ public String toBodyString() throws JMSException
+ {
+ return getText();
+ }
+
+ public void setData(ByteBuffer data)
+ {
+ _data = data;
+ }
+
+ public AMQShortString getMimeTypeAsShortString()
+ {
+ return MIME_TYPE_SHORT_STRING;
+ }
+
+ public void setText(String text) throws JMSException
+ {
+ checkWritable();
+
+ clearBody();
+ try
+ {
+ if (text != null)
+ {
+ _data = ByteBuffer.allocate(text.length());
+ _data.limit(text.length()) ;
+ //_data.sweep();
+ _data.setAutoExpand(true);
+ final String encoding = getContentHeaderProperties().getEncodingAsString();
+ if (encoding == null)
+ {
+ _data.put(text.getBytes(DEFAULT_CHARSET.name()));
+ }
+ else
+ {
+ _data.put(text.getBytes(encoding));
+ }
+ _changedData=true;
+ }
+ _decodedValue = text;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // should never occur
+ JMSException jmse = new JMSException("Unable to decode text data");
+ jmse.setLinkedException(e);
+ }
+ }
+
+ public String getText() throws JMSException
+ {
+ if (_data == null && _decodedValue == null)
+ {
+ return null;
+ }
+ else if (_decodedValue != null)
+ {
+ return _decodedValue;
+ }
+ else
+ {
+ _data.rewind();
+
+ if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY))
+ {
+ return null;
+ }
+ if (getContentHeaderProperties().getEncodingAsString() != null)
+ {
+ try
+ {
+ _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder());
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException je = new JMSException("Could not decode string data: " + e);
+ je.setLinkedException(e);
+ throw je;
+ }
+ }
+ else
+ {
+ try
+ {
+ _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder());
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException je = new JMSException("Could not decode string data: " + e);
+ je.setLinkedException(e);
+ throw je;
+ }
+ }
+ return _decodedValue;
+ }
+ }
+
+ @Override
+ public void prepareForSending() throws JMSException
+ {
+ super.prepareForSending();
+ if (_data == null)
+ {
+ setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
+ }
+ else
+ {
+ removeProperty(PAYLOAD_NULL_PROPERTY);
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessageFactory.java
new file mode 100644
index 0000000000..7d2422d483
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessageFactory.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class JMSTextMessageFactory extends AbstractJMSMessageFactory
+{
+
+ public AbstractJMSMessage createMessage() throws JMSException
+ {
+ return new JMSTextMessage();
+ }
+
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
+ {
+ return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties,
+ exchange, routingKey, data);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageConverter.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageConverter.java
new file mode 100644
index 0000000000..734b7c51ec
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageConverter.java
@@ -0,0 +1,202 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageEOFException;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import java.util.Enumeration;
+
+public class MessageConverter
+{
+
+ /**
+ * Log4J logger
+ */
+ protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+ /**
+ * AbstractJMSMessage which will hold the converted message
+ */
+ private AbstractJMSMessage _newMessage;
+
+ public MessageConverter(AbstractJMSMessage message) throws JMSException
+ {
+ _newMessage = message;
+ }
+
+ public MessageConverter(BytesMessage message) throws JMSException
+ {
+ BytesMessage bytesMessage = (BytesMessage) message;
+ bytesMessage.reset();
+
+ JMSBytesMessage nativeMsg = new JMSBytesMessage();
+
+ byte[] buf = new byte[1024];
+
+ int len;
+
+ while ((len = bytesMessage.readBytes(buf)) != -1)
+ {
+ nativeMsg.writeBytes(buf, 0, len);
+ }
+
+ _newMessage = nativeMsg;
+ setMessageProperties(message);
+ }
+
+ public MessageConverter(MapMessage message) throws JMSException
+ {
+ MapMessage nativeMessage = new JMSMapMessage();
+
+ Enumeration mapNames = message.getMapNames();
+ while (mapNames.hasMoreElements())
+ {
+ String name = (String) mapNames.nextElement();
+ nativeMessage.setObject(name, message.getObject(name));
+ }
+
+ _newMessage = (AbstractJMSMessage) nativeMessage;
+ setMessageProperties(message);
+ }
+
+ public MessageConverter(ObjectMessage message) throws JMSException
+ {
+ ObjectMessage origMessage = (ObjectMessage) message;
+ ObjectMessage nativeMessage = new JMSObjectMessage();
+
+ nativeMessage.setObject(origMessage.getObject());
+
+ _newMessage = (AbstractJMSMessage) nativeMessage;
+ setMessageProperties(message);
+
+ }
+
+ public MessageConverter(TextMessage message) throws JMSException
+ {
+ TextMessage nativeMessage = new JMSTextMessage();
+
+ nativeMessage.setText(message.getText());
+
+ _newMessage = (AbstractJMSMessage) nativeMessage;
+ setMessageProperties(message);
+ }
+
+ public MessageConverter(StreamMessage message) throws JMSException
+ {
+ StreamMessage nativeMessage = new JMSStreamMessage();
+
+ try
+ {
+ message.reset();
+ while (true)
+ {
+ nativeMessage.writeObject(message.readObject());
+ }
+ }
+ catch (MessageEOFException e)
+ {
+ // we're at the end so don't mind the exception
+ }
+
+ _newMessage = (AbstractJMSMessage) nativeMessage;
+ setMessageProperties(message);
+ }
+
+ public MessageConverter(Message message) throws JMSException
+ {
+ // Send a message with just properties.
+ // Throwing away content
+ BytesMessage nativeMessage = new JMSBytesMessage();
+
+ _newMessage = (AbstractJMSMessage) nativeMessage;
+ setMessageProperties(message);
+ }
+
+ public AbstractJMSMessage getConvertedMessage()
+ {
+ return _newMessage;
+ }
+
+ /**
+ * Sets all message properties
+ */
+ protected void setMessageProperties(Message message) throws JMSException
+ {
+ setNonJMSProperties(message);
+ setJMSProperties(message);
+ }
+
+ /**
+ * Sets all non-JMS defined properties on converted message
+ */
+ protected void setNonJMSProperties(Message message) throws JMSException
+ {
+ Enumeration propertyNames = message.getPropertyNames();
+ while (propertyNames.hasMoreElements())
+ {
+ String propertyName = String.valueOf(propertyNames.nextElement());
+ // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
+ if (!propertyName.startsWith("JMSX_"))
+ {
+ Object value = message.getObjectProperty(propertyName);
+ _newMessage.setObjectProperty(propertyName, value);
+ }
+ }
+ }
+
+ /**
+ * Exposed JMS defined properties on converted message:
+ * JMSDestination - we don't set here
+ * JMSDeliveryMode - set
+ * JMSExpiration - we don't set here
+ * JMSPriority - we don't set here
+ * JMSMessageID - we don't set here
+ * JMSTimestamp - we don't set here
+ * JMSCorrelationID - set
+ * JMSReplyTo - set
+ * JMSType - set
+ * JMSRedlivered - we don't set here
+ */
+ protected void setJMSProperties(Message message) throws JMSException
+ {
+ _newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
+
+ if (message.getJMSReplyTo() != null)
+ {
+ _newMessage.setJMSReplyTo(message.getJMSReplyTo());
+ }
+
+ _newMessage.setJMSType(message.getJMSType());
+
+ _newMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactory.java
new file mode 100644
index 0000000000..728766f579
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactory.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+
+public interface MessageFactory
+{
+ AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+ ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
+ List bodies)
+ throws JMSException, AMQException;
+
+ AbstractJMSMessage createMessage() throws JMSException;
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactoryRegistry.java
new file mode 100644
index 0000000000..a6225c2616
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactoryRegistry.java
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class MessageFactoryRegistry
+{
+ private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>();
+ private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap =
+ new HashMap<AMQShortString, MessageFactory>();
+
+ /**
+ * Construct a new registry with the default message factories registered
+ * @return a message factory registry
+ */
+ public static MessageFactoryRegistry newDefaultRegistry()
+ {
+ MessageFactoryRegistry mf = new MessageFactoryRegistry();
+ mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
+ mf.registerFactory("text/plain", new JMSTextMessageFactory());
+ mf.registerFactory("text/xml", new JMSTextMessageFactory());
+ mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
+ mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
+ mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
+ mf.registerFactory(null, new JMSBytesMessageFactory());
+
+ return mf;
+ }
+
+ public void registerFactory(String mimeType, MessageFactory mf)
+ {
+ if (mf == null)
+ {
+ throw new IllegalArgumentException("Message factory must not be null");
+ }
+
+ _mimeStringToFactoryMap.put(mimeType, mf);
+ _mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf);
+ }
+
+ public MessageFactory deregisterFactory(String mimeType)
+ {
+ _mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType));
+
+ return _mimeStringToFactoryMap.remove(mimeType);
+ }
+
+ /**
+ * Create a message. This looks up the MIME type from the content header and instantiates the appropriate
+ * concrete message type.
+ * @param deliveryTag the AMQ message id
+ * @param redelivered true if redelivered
+ * @param contentHeader the content header that was received
+ * @param bodies a list of ContentBody instances
+ * @return the message.
+ * @throws AMQException
+ * @throws JMSException
+ */
+ public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
+ AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
+ throws AMQException, JMSException
+ {
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
+
+ // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
+ // AMQP. When the type is null, it can only be assumed that the message is a byte message.
+ AMQShortString contentTypeShortString = properties.getContentType();
+ contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE)
+ : contentTypeShortString;
+
+ MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString);
+ if (mf == null)
+ {
+ throw new AMQException(null, "Unsupport MIME type of " + properties.getContentTypeAsString(), null);
+ }
+ else
+ {
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
+ }
+ }
+
+ public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException
+ {
+ if (mimeType == null)
+ {
+ throw new IllegalArgumentException("Mime type must not be null");
+ }
+
+ MessageFactory mf = _mimeStringToFactoryMap.get(mimeType);
+ if (mf == null)
+ {
+ throw new AMQException(null, "Unsupport MIME type of " + mimeType, null);
+ }
+ else
+ {
+ return mf.createMessage();
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java
new file mode 100644
index 0000000000..92a32f9bf4
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import java.math.BigDecimal;
+
+public class QpidMessage
+{
+ protected ContentHeaderProperties _contentHeaderProperties;
+
+ /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+ protected AMQSession _session;
+
+ protected final long _deliveryTag;
+
+ public QpidMessage(ContentHeaderProperties properties, long deliveryTag)
+ {
+ _contentHeaderProperties = properties;
+ _deliveryTag = deliveryTag;
+ }
+
+ public QpidMessage(ContentHeaderProperties properties)
+ {
+ this(properties, -1);
+ }
+
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+ * acknowledge()
+ *
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession s)
+ {
+ _session = s;
+ }
+
+ public AMQSession getAMQSession()
+ {
+ return _session;
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ *
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+ /** Invoked prior to sending the message. Allows the message to be modified if necessary before sending. */
+ public void prepareForSending() throws JMSException
+ {
+ }
+
+ public FieldTable getPropertyHeaders()
+ {
+ return ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders();
+ }
+
+ public void setDecimalProperty(AMQShortString propertyName, BigDecimal bd) throws JMSException
+ {
+ getPropertyHeaders().setDecimal(propertyName, bd);
+ }
+
+ public void setIntProperty(AMQShortString propertyName, int i) throws JMSException
+ {
+ getPropertyHeaders().setInteger(propertyName, new Integer(i));
+ }
+
+ public void setLongStringProperty(AMQShortString propertyName, String value)
+ {
+ getPropertyHeaders().setString(propertyName, value);
+ }
+
+ public void setTimestampProperty(AMQShortString propertyName, long value)
+ {
+ getPropertyHeaders().setTimestamp(propertyName, value);
+ }
+
+ public void setVoidProperty(AMQShortString propertyName)
+ {
+ getPropertyHeaders().setVoid(propertyName);
+ }
+
+ //** Getters
+
+ public BigDecimal getDecimalProperty(AMQShortString propertyName) throws JMSException
+ {
+ return getPropertyHeaders().getDecimal(propertyName);
+ }
+
+ public int getIntegerProperty(AMQShortString propertyName) throws JMSException
+ {
+ return getPropertyHeaders().getInteger(propertyName);
+ }
+
+ public String getLongStringProperty(AMQShortString propertyName)
+ {
+ return getPropertyHeaders().getString(propertyName);
+ }
+
+ public Long getTimestampProperty(AMQShortString propertyName)
+ {
+ return getPropertyHeaders().getTimestamp(propertyName);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/UnprocessedMessage.java
new file mode 100644
index 0000000000..6a14bc1117
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/UnprocessedMessage.java
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.message;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.BasicReturnBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
+ * the content body/ies.
+ *
+ * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
+ * thread in order to minimise the amount of work done in the MINA dispatcher thread.
+ */
+public class UnprocessedMessage
+{
+ private long _bytesReceived = 0;
+
+ private final BasicDeliverBody _deliverBody;
+ private final BasicReturnBody _bounceBody; // TODO: check change (gustavo)
+ private final int _channelId;
+ private ContentHeaderBody _contentHeader;
+
+ /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
+ private List<ContentBody> _bodies;
+
+ public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
+ {
+ _deliverBody = deliverBody;
+ _channelId = channelId;
+ _bounceBody = null;
+ }
+
+
+ public UnprocessedMessage(int channelId, BasicReturnBody bounceBody)
+ {
+ _deliverBody = null;
+ _channelId = channelId;
+ _bounceBody = bounceBody;
+ }
+
+ public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException
+ {
+
+ if (body.payload != null)
+ {
+ final long payloadSize = body.payload.remaining();
+
+ if (_bodies == null)
+ {
+ if (payloadSize == getContentHeader().bodySize)
+ {
+ _bodies = Collections.singletonList(body);
+ }
+ else
+ {
+ _bodies = new ArrayList<ContentBody>();
+ _bodies.add(body);
+ }
+
+ }
+ else
+ {
+ _bodies.add(body);
+ }
+ _bytesReceived += payloadSize;
+ }
+ }
+
+ public boolean isAllBodyDataReceived()
+ {
+ return _bytesReceived == getContentHeader().bodySize;
+ }
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return _deliverBody;
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return _bounceBody;
+ }
+
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+
+ public ContentHeaderBody getContentHeader()
+ {
+ return _contentHeader;
+ }
+
+ public void setContentHeader(ContentHeaderBody contentHeader)
+ {
+ this._contentHeader = contentHeader;
+ }
+
+ public List<ContentBody> getBodies()
+ {
+ return _bodies;
+ }
+
+}