diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-07-27 16:10:27 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-07-27 16:10:27 +0000 |
| commit | 6b682293955924ea72bd83b0c5f601c7ded405bf (patch) | |
| tree | 83f06ffb2bb9d9f37b14c2600fadc7733fc80fb8 /java/client | |
| parent | 80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (diff) | |
| download | qpid-python-6b682293955924ea72bd83b0c5f601c7ded405bf.tar.gz | |
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560296 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
47 files changed, 5671 insertions, 164 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/FieldTable.java b/java/client/src/main/java/org/apache/qpid/nclient/FieldTable.java index ad6da34d68..37caaf5df9 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/FieldTable.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/FieldTable.java @@ -19,9 +19,7 @@ package org.apache.qpid.nclient; /** - * Created by Arnaud Simon - * Date: 23-Jul-2007 - * Time: 09:47:32 + * */ public interface FieldTable { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java index 6ba4787b56..a02e378b43 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Connection.java @@ -25,10 +25,6 @@ import org.apache.qpidity.QpidException; /** * This represents a physical connection to a broker. - * <p/> - * Created by Arnaud Simon - * Date: 20-Jul-2007 - * Time: 09:34:15 */ public interface Connection { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/CreateReceiverOption.java b/java/client/src/main/java/org/apache/qpid/nclient/api/CreateReceiverOption.java index e3cd813170..34d0a0f250 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/CreateReceiverOption.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/CreateReceiverOption.java @@ -2,10 +2,6 @@ package org.apache.qpid.nclient.api; /** * Enumeration of the options available when creating a receiver - * - * Created by Arnaud Simon - * Date: 20-Jul-2007 - * Time: 09:43:31 */ public enum CreateReceiverOption { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareExchangeOption.java b/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareExchangeOption.java index 241dd1d02f..e354539364 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareExchangeOption.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareExchangeOption.java @@ -2,10 +2,6 @@ package org.apache.qpid.nclient.api; /** * Enumeration of the options available when declaring an exchange - * - * Created by Arnaud Simon - * Date: 20-Jul-2007 - * Time: 09:44:52 */ public enum DeclareExchangeOption { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareQueueOption.java b/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareQueueOption.java index 89e3573024..689523aeda 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareQueueOption.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/DeclareQueueOption.java @@ -3,9 +3,6 @@ package org.apache.qpid.nclient.api; /** * Enumeration of the options available when declaring a queue * - * Created by Arnaud Simon - * Date: 23-Jul-2007 - * Time: 09:44:36 */ public enum DeclareQueueOption { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteExchangeOption.java b/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteExchangeOption.java index 17ee5beb1f..415c045de8 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteExchangeOption.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteExchangeOption.java @@ -1,9 +1,7 @@ package org.apache.qpid.nclient.api; /** - * Created by Arnaud Simon - * Date: 23-Jul-2007 - * Time: 12:55:55 + * Available options for deleting an exchange. */ public enum DeleteExchangeOption { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteQueueOption.java b/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteQueueOption.java index c0a30f8b68..d110bd2a50 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteQueueOption.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/DeleteQueueOption.java @@ -1,9 +1,7 @@ package org.apache.qpid.nclient.api; /** - * Created by Arnaud Simon - * Date: 23-Jul-2007 - * Time: 12:44:43 + * Available options for deleting a queue. */ public enum DeleteQueueOption { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java b/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java index aafe0a2054..ac396b7c79 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/DtxSession.java @@ -22,10 +22,6 @@ import org.apache.qpidity.QpidException; /** * This session�s resources are control under the scope of a distributed transaction. - * - * Created by Arnaud Simon - * Date: 20-Jul-2007 - * Time: 09:39:11 */ public interface DtxSession extends Session { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java b/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java index b624e10cbd..5f7bbe7cf2 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/ExceptionListener.java @@ -23,10 +23,6 @@ import org.apache.qpidity.QpidException; /** * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it * informs the connection's ExceptionListener - * - * Created by Arnaud Simon - * Date: 25-Jul-2007 - * Time: 12:00:27 */ public interface ExceptionListener { @@ -35,7 +31,7 @@ public interface ExceptionListener * informs the connection's ExceptionListener * * @param exception The exception comming from the communication layer. - * @see org.apache.qpid.nclient.qpidapi.Connection + * @see org.apache.qpid.nclient.api.Connection */ public void onException(QpidException exception); } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java index 64b5876339..4aca6ea203 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Message.java @@ -26,10 +26,6 @@ import java.nio.ByteBuffer; /** * A message is sent and received by resources. It is composed of a set of header and a payload. - * <p/> - * Created by Arnaud Simon - * Date: 20-Jul-2007 - * Time: 09:40:49 */ public interface Message { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java index b0dfa15bbf..5b844bb6c2 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageListener.java @@ -20,10 +20,6 @@ package org.apache.qpid.nclient.api; /** * MessageListeners are used to asynchronously receive messages. - * - * Created by Arnaud Simon - * Date: 2o-Jul-2007 - * Time: 09:42:52 */ public interface MessageListener { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java index d489e34364..e34238abc0 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageReceiver.java @@ -24,11 +24,6 @@ import java.util.Set; /** * Used to receive messages from a queue - * - * <p/> - * Created by Arnaud Simon - * Date: 20-Jul-2007 - * Time: 09:42:37 */ public interface MessageReceiver extends Resource { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageSender.java b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageSender.java index 9b0a5af0d0..54badefcc6 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/MessageSender.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/MessageSender.java @@ -22,10 +22,6 @@ import org.apache.qpidity.QpidException; /** * A sender is used to send message to its queue. - * <p/> - * Created by Arnaud Simon - * Date: 22-Jul-2007 - * Time: 09:41:58 */ public interface MessageSender extends Resource { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java index 1585cc3071..212b0dca80 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Resource.java @@ -22,10 +22,6 @@ import org.apache.qpidity.QpidException; /** * A Resource is associated with a session and can be independently closed. - * - * Created by Arnaud Simon - * Date: 21-Jul-2007 - * Time: 09:41:30 */ public interface Resource { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java index c103e5bedd..5c960023dd 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/api/Session.java @@ -25,10 +25,6 @@ import org.apache.qpidity.Option; * A session is associated with a connection. * <p> When created a Session is not attached with an underlying channel. Unsuspended a Session is * equivalent to attaching a communication channel that can be used to communicate with the broker. - * <p/> - * Created by Arnaud Simon - * Date: 20-Jul-2007 - * Time: 09:36:24 */ public interface Session { @@ -198,7 +194,6 @@ public interface Session * @param queueName The queue to be bound. * @param exchangeName The exchange name. * @param routingKey The routing key. - * @param nowait nowait * @throws QpidException If the session fails to bind the queue due to some error. */ public void queueBind(String queueName, String exchangeName, String routingKey) @@ -224,7 +219,6 @@ public interface Session * TODO: Define the exact semantic i.e. are message sent to a dead letter queue? * * @param queueName The queue to be purged - * @param nowait nowait * @throws QpidException If the session fails to purge the queue due to some error. */ public void queuePurge(String queueName) diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java index 416c5b71e5..8d7fab6216 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java @@ -30,9 +30,7 @@ import java.util.Vector; /** - * Created by Arnaud Simon - * Date: 25-Jul-2007 - * Time: 09:47:17 + * */ public class ConnectionImpl implements Connection, QueueConnection, TopicConnection { @@ -106,9 +104,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect //---- Interface javax.jms.Connection ---// - public Session createSession(boolean b, int i) - throws - JMSException + public Session createSession(boolean b, int i) throws JMSException { return null; //To change body of implemented methods use File | Settings | File Templates. } @@ -123,9 +119,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @return The unique client identifier. * @throws JMSException If this connection is closed. */ - public String getClientID() - throws - JMSException + public String getClientID() throws JMSException { checkNotClosed(); @@ -147,9 +141,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @param clientID the unique client identifier * @throws JMSException Always as clientID is always set at construction time. */ - public void setClientID(String clientID) - throws - JMSException + public void setClientID(String clientID) throws JMSException { checkNotClosed(); throw new IllegalStateException("Client name cannot be changed after being set"); @@ -162,9 +154,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @throws JMSException If there ie a problem getting the connection metadata for this connection. * @see javax.jms.ConnectionMetaData */ - public ConnectionMetaData getMetaData() - throws - JMSException + public ConnectionMetaData getMetaData() throws JMSException { checkNotClosed(); return ConnectionMetaDataImpl.getInstance(); @@ -176,9 +166,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @return the <CODE>ExceptionListener</CODE> for this connection * @throws JMSException In case of unforeseen problem */ - public ExceptionListener getExceptionListener() - throws - JMSException + public ExceptionListener getExceptionListener() throws JMSException { checkNotClosed(); return _exceptionListener; @@ -203,9 +191,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @param exceptionListener The connection listener. * @throws JMSException If the connection is closed. */ - public void setExceptionListener(ExceptionListener exceptionListener) - throws - JMSException + public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException { checkNotClosed(); _exceptionListener = exceptionListener; @@ -219,9 +205,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * * @throws JMSException In case of a problem due to some internal error. */ - public void start() - throws - JMSException + public void start() throws JMSException { checkNotClosed(); if (!_started) @@ -246,9 +230,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * * @throws JMSException In case of a problem due to some internal error. */ - public void stop() - throws - JMSException + public void stop() throws JMSException { checkNotClosed(); if (_started) @@ -278,9 +260,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * * @throws JMSException In case of a problem due to some internal error. */ - public void close() - throws - JMSException + public void close() throws JMSException { checkNotClosed(); if (!_isClosed) @@ -317,9 +297,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @throws JMSException In case of a problem due to some internal error. */ public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, - ServerSessionPool sessionPool, int maxMessages) - throws - JMSException + ServerSessionPool sessionPool, int maxMessages) throws + JMSException { checkNotClosed(); return null; @@ -338,9 +317,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect */ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, - int maxMessages) - throws - JMSException + int maxMessages) throws JMSException { checkNotClosed(); return null; @@ -348,9 +325,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect //-------------- QueueConnection API - public QueueSession createQueueSession(boolean b, int i) - throws - JMSException + public QueueSession createQueueSession(boolean b, int i) throws JMSException { checkNotClosed(); //TODO: create a queue session @@ -359,25 +334,21 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect return null; //To change body of implemented methods use File | Settings | File Templates. } - public ConnectionConsumer createConnectionConsumer(Queue queue, String string, ServerSessionPool serverSessionPool, int i) - throws - JMSException + public ConnectionConsumer createConnectionConsumer(Queue queue, String string, ServerSessionPool serverSessionPool, + int i) throws JMSException { return null; //To change body of implemented methods use File | Settings | File Templates. } //-------------- TopicConnection API - public TopicSession createTopicSession(boolean b, int i) - throws - JMSException + public TopicSession createTopicSession(boolean b, int i) throws JMSException { return null; //To change body of implemented methods use File | Settings | File Templates. } - public ConnectionConsumer createConnectionConsumer(Topic topic, String string, ServerSessionPool serverSessionPool, int i) - throws - JMSException + public ConnectionConsumer createConnectionConsumer(Topic topic, String string, ServerSessionPool serverSessionPool, + int i) throws JMSException { return null; //To change body of implemented methods use File | Settings | File Templates. } @@ -387,15 +358,12 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect /** * Validate that the Connection is not closed. * <p/> - * If the Connection has been closed, throw a javax.jms.IllegalStateException. This behaviour is + * If the Connection has been closed, throw a IllegalStateException. This behaviour is * required by the JMS specification. * - * @throws javax.jms.IllegalStateException - * If the session is closed. + * @throws IllegalStateException If the session is closed. */ - protected synchronized void checkNotClosed() - throws - IllegalStateException + protected synchronized void checkNotClosed() throws IllegalStateException { if (_isClosed) { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java index 05c759fc2d..58d4151ba2 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java @@ -26,10 +26,6 @@ import java.util.Enumeration; /** * A <CODE>ConnectionMetaDataImpl</CODE> object provides information describing the * JMS <CODE>Connection</CODE>. - * <p/> - * Created by Arnaud Simon - * Date: 25-Jul-2007 - * Time: 10:47:20 */ public class ConnectionMetaDataImpl implements ConnectionMetaData { @@ -53,8 +49,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData // Provider minor version private static final int PROVIDER_MINOR_VERSION = 10; // Provider version - private static final String PROVIDER_VERSION = QpidProperties.getProductName() + " (Client: [" - + QpidProperties.getBuildVersion() + "] ; Protocol: [ 0.10 ] )"; + private static final String PROVIDER_VERSION = QpidProperties.getProductName() + " (Client: [" + QpidProperties.getBuildVersion() + "] ; Protocol: [ 0.10 ] )"; /** * Prevent instantiation. @@ -81,9 +76,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData * @return the JMS API version * @throws JMSException Never */ - public String getJMSVersion() - throws - JMSException + public String getJMSVersion() throws JMSException { return JMS_VERSION; } @@ -95,9 +88,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData * @return the JMS API major version number * @throws JMSException Never */ - public int getJMSMajorVersion() - throws - JMSException + public int getJMSMajorVersion() throws JMSException { return JMS_MAJOR_VERSION; } @@ -109,9 +100,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData * @return the JMS API minor version number * @throws JMSException Never */ - public int getJMSMinorVersion() - throws - JMSException + public int getJMSMinorVersion() throws JMSException { return JMS_MINOR_VERSION; } @@ -123,9 +112,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData * @return Qpid name * @throws JMSException Never */ - public String getJMSProviderName() - throws - JMSException + public String getJMSProviderName() throws JMSException { return PROVIDER_NAME; } @@ -136,9 +123,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData * @return Qpid version * @throws JMSException Never */ - public String getProviderVersion() - throws - JMSException + public String getProviderVersion() throws JMSException { return PROVIDER_VERSION; // TODO: We certainly can dynamically get the server version. @@ -150,9 +135,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData * @return Qpid major version number * @throws JMSException Never */ - public int getProviderMajorVersion() - throws - JMSException + public int getProviderMajorVersion() throws JMSException { return PROVIDER_MAJOR_VERSION; } @@ -163,9 +146,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData * @return Qpid minor version number * @throws JMSException Never */ - public int getProviderMinorVersion() - throws - JMSException + public int getProviderMinorVersion() throws JMSException { return PROVIDER_MINOR_VERSION; } @@ -176,9 +157,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData * @return an Enumeration of JMSX property names * @throws JMSException if cannot retrieve metadata due to some internal error. */ - public Enumeration getJMSXPropertyNames() - throws - JMSException + public Enumeration getJMSXPropertyNames() throws JMSException { return CustomJMSXProperty.asEnumeration(); } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/CustomJMSXProperty.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/CustomJMSXProperty.java index 19dfc3a0b5..49da4a996d 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/CustomJMSXProperty.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/CustomJMSXProperty.java @@ -32,11 +32,11 @@ public enum CustomJMSXProperty public static synchronized Enumeration asEnumeration() { - if(_names == null) + if (_names == null) { CustomJMSXProperty[] properties = values(); ArrayList<String> nameList = new ArrayList<String>(properties.length); - for(CustomJMSXProperty property : properties) + for (CustomJMSXProperty property : properties) { nameList.add(property.toString()); } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java index 02bad291fb..0a4c465815 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/ExceptionHelper.java @@ -22,9 +22,7 @@ import org.apache.qpidity.QpidException; import javax.jms.JMSException; /** - * Created by Arnaud Simon - * Date: 25-Jul-2007 - * Time: 12:54:00 + *Helper class for handling exceptions */ public class ExceptionHelper { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java new file mode 100644 index 0000000000..9ed96b1e22 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java @@ -0,0 +1,140 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient.jms; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpid.nclient.api.Resource; +import org.apache.qpid.nclient.exception.QpidException; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; + +/** + * MessageActor is the superclass for MessageProducerImpl and MessageProducerImpl. + */ +public abstract class MessageActor +{ + /** + * Used for debugging. + */ + private static final Logger _logger = LoggerFactory.getLogger(MessageActor.class); + + /** + * Indicates whether this MessageActor is closed. + */ + boolean _isClosed = false; + + /** + * This messageActor's session + */ + SessionImpl _session; + + /** + * The underlying Qpid Resource + */ + Resource _qpidResource; + + //-- Constructor + + //TODO define the parameters + + protected MessageActor() + { + + } + + protected MessageActor(SessionImpl session) + { + // TODO create the qpidResource _qpidResource = + _session = session; + } + + //--- public methods (part of the jms public API) + /** + * Closes the MessageActor and deregister it from its session. + * + * @throws JMSException if the MessaeActor cannot be closed due to some internal error. + */ + public void close() throws JMSException + { + if (!_isClosed) + { + closeMessageActor(); + // notify the session that this message actor is closing + + //TODO _session.removeActor(_actorID); + } + } + + //-- protected methods + /** + * Check if this MessageActor is not closed. + * <p> If the MessageActor is closed, throw a javax.jms.IllegalStateException. + * <p> The method is not synchronized, since MessageProducers can only be used by a single thread. + * + * @throws IllegalStateException if the MessageActor is closed + */ + protected void checkNotClosed() throws IllegalStateException + { + if (_isClosed || _session == null) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Actor " + this + " is already closed"); + } + throw new IllegalStateException("Actor " + this + " is already closed"); + } + _session.checkNotClosed(); + } + + /** + * Closes a MessageActor. + * <p> This method is invoked when the session is closing or when this + * messageActor is closing. + * + * @throws JMSException If the MessaeActor cannot be closed due to some internal error. + */ + protected void closeMessageActor() throws JMSException + { + if (!_isClosed) + { + // close the underlying qpid resource + try + { + _qpidResource.close(); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + _isClosed = true; + } + } + + /** + * Get the associated session object. + * + * @return This Actor's Session. + */ + protected SessionImpl getSession() + { + return _session; + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java new file mode 100644 index 0000000000..80fba4843d --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java @@ -0,0 +1,148 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient.jms; + +import org.apache.qpid.nclient.api.MessageReceiver; +import org.apache.qpid.nclient.exception.QpidException; + +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; + +/** + * Implementation of JMS message consumer + */ +public class MessageConsumerImpl extends MessageActor implements MessageConsumer +{ + /** + * The underlying qpid receiver + */ + MessageReceiver _qpidReceiver; + + /** + * This MessageConsumer's messageselector. + */ + protected String _messageSelector = null; + + /** + * A MessageListener set up for this consumer. + */ + private MessageListener _messageListener = null; + + //----- Message consumer API + + /** + * Gets this MessageConsumer's message selector. + * + * @return This MessageConsumer's message selector, or null if no + * message selector exists for the message consumer (that is, if + * the message selector was not set or was set to null or the + * empty string) + * @throws JMSException if getting the message selector fails due to some internal error. + */ + public String getMessageSelector() throws JMSException + { + checkNotClosed(); + return _messageSelector; + } + + /** + * Gets this MessageConsumer's <CODE>MessageListener</CODE>. + * + * @return The listener for the MessageConsumer, or null if no listener is set + * @throws JMSException if getting the message listener fails due to some internal error. + */ + public MessageListener getMessageListener() throws JMSException + { + checkNotClosed(); + return _messageListener; + } + + /** + * Sets the MessageConsumer's <CODE>MessageListener</CODE>. + * <p> The JMS specification says: + * <P>Setting the message listener to null is the equivalent of + * unsetting the message listener for the message consumer. + * <P>The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> + * while messages are being consumed by an existing listener + * or the consumer is being used to consume messages synchronously + * is undefined. + * + * @param messageListener The listener to which the messages are to be delivered + * @throws JMSException If setting the message listener fails due to some internal error. + */ + public void setMessageListener(MessageListener messageListener) throws JMSException + { + checkNotClosed(); + // create a message listener wrapper + } + + public Message receive() throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Message receive(long l) throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Message receiveNoWait() throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + + // not public methods + /** + * Stop the delivery of messages to this receiver. + * <p>For asynchronous receiver, this operation blocks until the message listener + * finishes processing the current message, + * + * @throws JMSException If the consumer cannot be stopped due to some internal error. + */ + void stop() throws JMSException + { + try + { + _qpidReceiver.stop(); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + /** + * Start the delivery of messages to this consumer. + * + * @throws JMSException If the consumer cannot be started due to some internal error. + */ + void start() throws JMSException + { + try + { + _qpidReceiver.start(); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java new file mode 100644 index 0000000000..3e58247597 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java @@ -0,0 +1,115 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient.jms; + +import org.apache.qpid.nclient.api.MessageListener; +import org.apache.qpid.nclient.api.Message; +import org.apache.qpid.nclient.jms.message.AbstractJMSMessage; +import org.apache.qpid.nclient.jms.message.QpidMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; + +/** + * This is a wrapper for the JMS message listener + */ +public class MessageListenerWrapper implements MessageListener +{ + /** + * Used for debugging. + */ + private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class); + + /** + * This message listener consumer + */ + MessageConsumerImpl _consumer = null; + + /** + * The jms listener of this consumer. + */ + javax.jms.MessageListener _jmsMessageLstener = null; + + //---- constructor + /** + * Create a message listener wrapper for a given consumer + * + * @param consumer The consumer of this listener + */ + public MessageListenerWrapper(MessageConsumerImpl consumer) + { + _consumer = consumer; + } + + //---- org.apache.qpid.nclient.api.MessageListener API + /** + * Deliver a message to the listener. + * + * @param message The message delivered to the listner. + */ + public void onMessage(Message message) + { + try + { + // tell the session that a message is in process + _consumer.getSession().preProcessMessage((QpidMessage) message); + //TODO build the JMS message form a qpid message + AbstractJMSMessage jmsMessage = null; + // If the session is transacted we need to ack the message first + // This is because a message is associated with its tx only when acked + if (_consumer.getSession().getTransacted()) + { + _consumer.getSession().acknowledgeMessage(jmsMessage); + } + // The JMS specs says: + /* The result of a listener throwing a RuntimeException depends on the session’s + * acknowledgment mode. + • --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message + * will be immediately redelivered. The number of times a JMS provider will + * redeliver the same message before giving up is provider-dependent. + • --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered. + * --- Transacted Session - the next message for the listener is delivered. + * + * The number of time we try redelivering the message is 0 + **/ + try + { + _jmsMessageLstener.onMessage(jmsMessage); + } + catch (RuntimeException re) + { + // do nothing as this message will not be redelivered + } + // If the session has been recovered we then need to redelivered this message + if (_consumer.getSession().isInRecovery()) + { + message.release(); + } + // Tell the jms Session to ack this message if required + else if (!_consumer.getSession().getTransacted()) + { + _consumer.getSession().acknowledgeMessage(jmsMessage); + } + } + catch (Exception e) + { + throw new RuntimeException(e.getMessage()); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java new file mode 100644 index 0000000000..4bae376492 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java @@ -0,0 +1,25 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.nclient.jms; + +/** + * + */ +public class MessageProducerImpl extends MessageActor +{ +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidExceptionListenerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidExceptionListenerImpl.java index dd63240021..6a849a5b9f 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidExceptionListenerImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QpidExceptionListenerImpl.java @@ -23,9 +23,7 @@ import org.apache.qpidity.QpidException; import javax.jms.JMSException; /** - * Created by Arnaud Simon - * Date: 25-Jul-2007 - * Time: 12:08:47 + * An exception listner */ public class QpidExceptionListenerImpl implements ExceptionListener { @@ -37,7 +35,7 @@ public class QpidExceptionListenerImpl implements ExceptionListener void setJMSExceptionListner(javax.jms.ExceptionListener jmsExceptionListener) { - _jmsExceptionListener = jmsExceptionListener; + _jmsExceptionListener = jmsExceptionListener; } //----- ExceptionListener API @@ -46,7 +44,7 @@ public class QpidExceptionListenerImpl implements ExceptionListener // convert this exception in a JMS exception JMSException jmsException = ExceptionHelper.convertQpidExceptionToJMSException(exception); // propagate to the jms exception listener - if( _jmsExceptionListener != null ) + if (_jmsExceptionListener != null) { _jmsExceptionListener.onException(jmsException); } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java index f5a3ca3f84..0b37c3af95 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java @@ -20,9 +20,7 @@ package org.apache.qpid.nclient.jms; import org.apache.qpid.nclient.jms.SessionImpl; /** - * Created by Arnaud Simon - * Date: 25-Jul-2007 - * Time: 13:38:58 + * */ public class QueueSessionImpl extends SessionImpl { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java index c68f558542..483257db88 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java @@ -17,11 +17,564 @@ */ package org.apache.qpid.nclient.jms; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpid.nclient.exception.QpidException; +import org.apache.qpid.nclient.jms.message.*; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.jms.Session; +import javax.jms.Message; +import javax.jms.MessageListener; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Vector; + /** - * Created by Arnaud Simon - * Date: 25-Jul-2007 - * Time: 12:35:36 + * Implementation of the JMS Session interface */ -public class SessionImpl +public class SessionImpl implements Session { + /** + * this session's logger + */ + private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class); + + /** + * The messageConsumers of this session. + */ + private ArrayList<MessageConsumerImpl> _messageConsumers = new ArrayList<MessageConsumerImpl>(); + + /** + * The messageProducers of this session. + */ + private ArrayList<MessageProducerImpl> _messageProducers = new ArrayList<MessageProducerImpl>(); + + /** + * All the not yet acknoledged messages + * We use a vector as access to this list has to be synchronised + * This is because messages are acked from messagelistner threads + */ + private Vector<QpidMessage> _unacknowledgedMessages = new Vector<QpidMessage>(); + + /** + * Indicates whether this session is closed. + */ + private boolean _isClosed = false; + + /** + * Used to indicate whether or not this is a transactional session. + */ + private boolean _transacted; + + /** + * Holds the sessions acknowledgement mode. + */ + private int _acknowledgeMode; + + /** + * The underlying QpidSession + */ + private org.apache.qpid.nclient.api.Session _qpidSession; + + /** + * Indicates whether this session is recovering + */ + private boolean _inRecovery = false; + + //--- javax.jms.Session API + + /** + * Creates a <CODE>BytesMessage</CODE> object used to send a message + * containing a stream of uninterpreted bytes. + * + * @return A BytesMessage. + * @throws JMSException If Creating a BytesMessage object fails due to some internal error. + */ + public BytesMessage createBytesMessage() throws JMSException + { + checkNotClosed(); + return new JMSBytesMessage(); + } + + /** + * Creates a <CODE>MapMessage</CODE> object used to send a self-defining set + * of name-value pairs, where names are Strings and values are primitive values. + * + * @return A MapMessage. + * @throws JMSException If Creating a MapMessage object fails due to some internal error. + */ + public MapMessage createMapMessage() throws JMSException + { + checkNotClosed(); + return new JMSMapMessage(); + } + + /** + * Creates a <CODE>Message</CODE> object that holds all the + * standard message header information. It can be sent when a message + * containing only header information is sufficient. + * We simply return a ByteMessage + * + * @return A Message. + * @throws JMSException If Creating a Message object fails due to some internal error. + */ + public Message createMessage() throws JMSException + { + return createBytesMessage(); + } + + /** + * Creates an <CODE>ObjectMessage</CODE> used to send a message + * that contains a serializable Java object. + * + * @return An ObjectMessage. + * @throws JMSException If Creating an ObjectMessage object fails due to some internal error. + */ + public ObjectMessage createObjectMessage() throws JMSException + { + checkNotClosed(); + return new JMSObjectMessage(); + } + + /** + * Creates an initialized <CODE>ObjectMessage</CODE> used to send a message that contains + * a serializable Java object. + * + * @param serializable The object to use to initialize this message. + * @return An initialised ObjectMessage. + * @throws JMSException If Creating an initialised ObjectMessage object fails due to some internal error. + */ + public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException + { + ObjectMessage msg = createObjectMessage(); + msg.setObject(serializable); + return msg; + } + + /** + * Creates a <CODE>StreamMessage</CODE> used to send a + * self-defining stream of primitive values in the Java programming + * language. + * + * @return A StreamMessage + * @throws JMSException If Creating an StreamMessage object fails due to some internal error. + */ + public StreamMessage createStreamMessage() throws JMSException + { + checkNotClosed(); + return new JMSStreamMessage(); + } + + /** + * Creates a <CODE>TextMessage</CODE> object used to send a message containing a String. + * + * @return A TextMessage object + * @throws JMSException If Creating an TextMessage object fails due to some internal error. + */ + public TextMessage createTextMessage() throws JMSException + { + checkNotClosed(); + return new JMSTextMessage(); + } + + /** + * Creates an initialized <CODE>TextMessage</CODE> used to send + * a message containing a String. + * + * @param text The string used to initialize this message. + * @return An initialized TextMessage + * @throws JMSException If Creating an initialised TextMessage object fails due to some internal error. + */ + public TextMessage createTextMessage(String text) throws JMSException + { + TextMessage msg = createTextMessage(); + msg.setText(text); + return msg; + } + + /** + * Indicates whether the session is in transacted mode. + * + * @return true if the session is in transacted mode + * @throws JMSException If geting the transaction mode fails due to some internal error. + */ + public boolean getTransacted() throws JMSException + { + checkNotClosed(); + return _transacted; + } + + /** + * Returns the acknowledgement mode of this session. + * <p> The acknowledgement mode is set at the time that the session is created. + * If the session is transacted, the acknowledgement mode is ignored. + * + * @return If the session is not transacted, returns the current acknowledgement mode for the session. + * else returns SESSION_TRANSACTED. + * @throws JMSException if geting the acknowledgement mode fails due to some internal error. + */ + public int getAcknowledgeMode() throws JMSException + { + checkNotClosed(); + return _acknowledgeMode; + } + + /** + * Commits all messages done in this transaction. + * + * @throws JMSException If committing the transaction fails due to some internal error. + * @throws TransactionRolledBackException If the transaction is rolled back due to some internal error during commit. + * @throws javax.jms.IllegalStateException + * If the method is not called by a transacted session. + */ + public void commit() throws JMSException + { + checkNotClosed(); + //make sure the Session is a transacted one + if (!_transacted) + { + throw new IllegalStateException("Cannot commit non-transacted session", "Session is not transacted"); + } + // commit the underlying Qpid Session + try + { + // Note: this operation makes sure that asynch message processing has returned + _qpidSession.commit(); + } + catch (org.apache.qpidity.QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + /** + * Rolls back any messages done in this transaction. + * + * @throws JMSException If rolling back the session fails due to some internal error. + * @throws javax.jms.IllegalStateException + * If the method is not called by a transacted session. + */ + public void rollback() throws JMSException + { + checkNotClosed(); + //make sure the Session is a transacted one + if (!_transacted) + { + throw new IllegalStateException("Cannot rollback non-transacted session", "Session is not transacted"); + } + // rollback the underlying Qpid Session + try + { + // Note: this operation makes sure that asynch message processing has returned + _qpidSession.rollback(); + } + catch (org.apache.qpidity.QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + /** + * Closes this session. + * <p> The JMS specification says + * <P> This call will block until a <CODE>receive</CODE> call or message + * listener in progress has completed. A blocked message consumer + * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed. + * <P>Closing a transacted session must roll back the transaction in progress. + * <P>This method is the only <CODE>Session</CODE> method that can be called concurrently. + * <P>Invoking any other <CODE>Session</CODE> method on a closed session + * must throw a <CODE>javax.jms.IllegalStateException</CODE>. + * <p> Closing a closed session must <I>not</I> throw an exception. + * + * @throws JMSException If closing the session fails due to some internal error. + */ + public synchronized void close() throws JMSException + { + if (!_isClosed) + { + // from now all the session methods will throw a IllegalStateException + _isClosed = true; + // close all the actors + closeAllActors(); + // close the underlaying QpidSession + try + { + _qpidSession.sessionClose(); + } + catch ( org.apache.qpidity.QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + } + + /** + * Stops message delivery in this session, and restarts message delivery with + * the oldest unacknowledged message. + * <p>Recovering a session causes it to take the following actions: + * <ul> + * <li>Stop message delivery. + * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered". + * <li>Restart the delivery sequence including all unacknowledged messages that had been + * previously delivered. + * Redelivered messages do not have to be delivered in exactly their original delivery order. + * </ul> + * + * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error. + * Not that this does not necessarily mean that the recovery has failed, but simply that it is + * not possible to tell if it has or not. + */ + public void recover() throws JMSException + { + // Ensure that the session is open. + checkNotClosed(); + // we are recovering + _inRecovery = true; + // Ensure that the session is not transacted. + if (getTransacted()) + { + throw new IllegalStateException("Session is transacted"); + } + // release all unack messages + for(QpidMessage message : _unacknowledgedMessages) + { + // release all those messages + //Todo: message.getQpidMEssage.release(); + } + } + + /** + * Returns the session's distinguished message listener (optional). + * <p>This is an expert facility used only by Application Servers. + * <p> This is an optional operation that is not yet supported + * + * @return The message listener associated with this session. + * @throws JMSException If getting the message listener fails due to an internal error. + */ + public MessageListener getMessageListener() throws JMSException + { + checkNotClosed(); + throw new java.lang.UnsupportedOperationException(); + } + + /** + * Sets the session's distinguished message listener. + * <p>This is an expert facility used only by Application Servers. + * <p> This is an optional operation that is not yet supported + * + * @param messageListener The message listener to associate with this session + * @throws JMSException If setting the message listener fails due to an internal error. + */ + public void setMessageListener(MessageListener messageListener) throws JMSException + { + checkNotClosed(); + throw new java.lang.UnsupportedOperationException(); + } + + /** + * Optional operation, intended to be used only by Application Servers, + * not by ordinary JMS clients. + * <p> This is an optional operation that is not yet supported + */ + public void run() + { + throw new java.lang.UnsupportedOperationException(); + } + + public MessageProducer createProducer(Destination destination) throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public MessageConsumer createConsumer(Destination destination) throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public MessageConsumer createConsumer(Destination destination, String string) throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public MessageConsumer createConsumer(Destination destination, String string, boolean b) throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Queue createQueue(String string) throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Topic createTopic(String string) throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public TopicSubscriber createDurableSubscriber(Topic topic, String string) throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public TopicSubscriber createDurableSubscriber(Topic topic, String string, String string1, boolean b) throws + JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public QueueBrowser createBrowser(Queue queue) throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public QueueBrowser createBrowser(Queue queue, String string) throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public TemporaryQueue createTemporaryQueue() throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public TemporaryTopic createTemporaryTopic() throws JMSException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + /** + * Unsubscribes a durable subscription that has been created by a client. + * + * <P>This method deletes the state being maintained on behalf of the + * subscriber by its provider. + * + * <P>It is erroneous for a client to delete a durable subscription + * while there is an active <CODE>TopicSubscriber</CODE> for the + * subscription, or while a consumed message is part of a pending + * transaction or has not been acknowledged in the session. + * + * @param name the name used to identify this subscription + * + * @exception JMSException if the session fails to unsubscribe to the durable subscription due to some internal error. + * @exception InvalidDestinationException if an invalid subscription name + * is specified. + */ + public void unsubscribe(String name) throws JMSException + { + checkNotClosed(); + + } + + //----- Protected methods + /** + * Notify this session that a message is processed + * @param message The processed message. + */ + protected void preProcessMessage(QpidMessage message) + { + _inRecovery = false; + } + + /** + * Indicate whether this session is recovering . + * + * @return true if this session is recovering. + */ + protected boolean isInRecovery() + { + return _inRecovery; + } + + /** + * Validate that the Session is not closed. + * <p/> + * If the Session has been closed, throw a IllegalStateException. This behaviour is + * required by the JMS specification. + * + * @throws IllegalStateException If the session is closed. + */ + protected void checkNotClosed() throws IllegalStateException + { + if (_isClosed) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Session has been closed. Cannot invoke any further operations."); + } + throw new javax.jms.IllegalStateException("Session has been closed. Cannot invoke any further operations."); + } + } + + /** + * A session keeps the list of unack messages only when the ack mode is + * set to client ack mode. Otherwise messages are always ack. + * <p> We can use an ack heuristic for dups ok mode where bunch of messages are ack. + * This has to be done. + * + * @param message The message to be acknowledged. + * @throws JMSException If the message cannot be acknowledged due to an internal error. + */ + protected void acknowledgeMessage(QpidMessage message) throws JMSException + { + if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + // messages will be acknowldeged by the client application. + // store this message for acknowledging it afterward + _unacknowledgedMessages.add(message); + } + else + { + // acknowledge this message + // TODO: message.acknowledgeQpidMessge(); + } + //TODO: Implement DUPS OK heuristic + } + + /** + * This method is called when a message is acked. + * <p/> + * <P>Acknowledgment of a message automatically acknowledges all + * messages previously received by the session. Clients may + * individually acknowledge messages or they may choose to acknowledge + * messages in application defined groups (which is done by acknowledging + * the last received message in the group). + * + * @throws JMSException If this method is called on a closed session. + */ + protected void acknowledge() throws JMSException + { + checkNotClosed(); + if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + for (QpidMessage message : _unacknowledgedMessages) + { + // acknowledge this message + // TODO: message.acknowledgeQpidMessge(); + } + //empty the list of unack messages + _unacknowledgedMessages.clear(); + } + //else there is no effect + } + + //------ Private Methods + + /** + * Close the producer and the consumers of this session + * + * @throws JMSException If one of the MessaeActor cannot be closed due to some internal error. + */ + private void closeAllActors() throws JMSException + { + for (MessageActor messageActor : _messageProducers) + { + messageActor.closeMessageActor(); + } + for (MessageActor messageActor : _messageConsumers) + { + messageActor.closeMessageActor(); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java index fe511fa398..db2515d562 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java @@ -20,9 +20,7 @@ package org.apache.qpid.nclient.jms; import org.apache.qpid.nclient.jms.SessionImpl; /** - * Created by Arnaud Simon - * Date: 25-Jul-2007 - * Time: 13:39:35 + * */ public class TopicSessionImpl extends SessionImpl { diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesMessage.java new file mode 100644 index 0000000000..5963a3780f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesMessage.java @@ -0,0 +1,151 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import java.io.IOException; +import java.nio.charset.Charset; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; + +/** + * @author Apache Software Foundation + */ +public abstract class AbstractBytesMessage extends AbstractJMSMessage +{ + + /** + * The default initial size of the buffer. The buffer expands automatically. + */ + private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024; + + AbstractBytesMessage() + { + this(null); + } + + /** + * Construct a bytes message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + AbstractBytesMessage(ByteBuffer data) + { + super(data); // this instanties a content header + getContentHeaderProperties().setContentType(getMimeTypeAsShortString()); + + if (_data == null) + { + allocateInitialBuffer(); + } + } + + protected void allocateInitialBuffer() + { + _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); + _data.setAutoExpand(true); + } + + AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException + { + // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea + super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); + getContentHeaderProperties().setContentType(getMimeTypeAsShortString()); + } + + public void clearBodyImpl() throws JMSException + { + allocateInitialBuffer(); + } + + public String toBodyString() throws JMSException + { + checkReadable(); + try + { + return getText(); + } + catch (IOException e) + { + JMSException jmse = new JMSException(e.toString()); + jmse.setLinkedException(e); + throw jmse; + } + } + + /** + * We reset the stream before and after reading the data. This means that toString() will always output + * the entire message and also that the caller can then immediately start reading as if toString() had + * never been called. + * + * @return + * @throws IOException + */ + private String getText() throws IOException + { + // this will use the default platform encoding + if (_data == null) + { + return null; + } + + int pos = _data.position(); + _data.rewind(); + // one byte left is for the end of frame marker + if (_data.remaining() == 0) + { + // this is really redundant since pos must be zero + _data.position(pos); + + return null; + } + else + { + String data = _data.getString(Charset.forName("UTF8").newDecoder()); + _data.position(pos); + + return data; + } + } + + /** + * Check that there is at least a certain number of bytes available to read + * + * @param len the number of bytes + * @throws javax.jms.MessageEOFException if there are less than len bytes available to read + */ + protected void checkAvailable(int len) throws MessageEOFException + { + if (_data.remaining() < len) + { + throw new MessageEOFException("Unable to read " + len + " bytes"); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesTypedMessage.java new file mode 100644 index 0000000000..e7d51a6797 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractBytesTypedMessage.java @@ -0,0 +1,801 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.nclient.jms.message; + +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +/** + * @author Apache Software Foundation + */ +public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage +{ + + protected static final byte BOOLEAN_TYPE = (byte) 1; + + protected static final byte BYTE_TYPE = (byte) 2; + + protected static final byte BYTEARRAY_TYPE = (byte) 3; + + protected static final byte SHORT_TYPE = (byte) 4; + + protected static final byte CHAR_TYPE = (byte) 5; + + protected static final byte INT_TYPE = (byte) 6; + + protected static final byte LONG_TYPE = (byte) 7; + + protected static final byte FLOAT_TYPE = (byte) 8; + + protected static final byte DOUBLE_TYPE = (byte) 9; + + protected static final byte STRING_TYPE = (byte) 10; + + protected static final byte NULL_STRING_TYPE = (byte) 11; + + /** + * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read + * a byte array in multiple chunks, hence this is used to track how much is left to be read + */ + private int _byteArrayRemaining = -1; + + AbstractBytesTypedMessage() + { + this(null); + } + + /** + * Construct a stream message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + AbstractBytesTypedMessage(ByteBuffer data) + { + super(data); // this instanties a content header + } + + + AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException + { + super(messageNbr, contentHeader, exchange, routingKey, data); + } + + + protected byte readWireType() throws MessageFormatException, MessageEOFException, + MessageNotReadableException + { + checkReadable(); + checkAvailable(1); + return _data.get(); + } + + protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException + { + checkWritable(); + _data.put(type); + _changedData = true; + } + + protected boolean readBoolean() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + boolean result; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private boolean readBooleanImpl() + { + return _data.get() != 0; + } + + protected byte readByte() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + byte result; + try + { + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + private byte readByteImpl() + { + return _data.get(); + } + + protected short readShort() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + short result; + try + { + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + private short readShortImpl() + { + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws javax.jms.JMSException + */ + protected char readChar() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + try + { + if(wireType == NULL_STRING_TYPE){ + throw new NullPointerException(); + } + + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private char readCharImpl() + { + return _data.getChar(); + } + + protected int readInt() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + int result; + try + { + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected int readIntImpl() + { + return _data.getInt(); + } + + protected long readLong() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + long result; + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private long readLongImpl() + { + return _data.getLong(); + } + + protected float readFloat() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + float result; + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private float readFloatImpl() + { + return _data.getFloat(); + } + + protected double readDouble() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + double result; + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private double readDoubleImpl() + { + return _data.getDouble(); + } + + protected String readString() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + String result; + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + throw new NullPointerException("data is null"); + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected String readStringImpl() throws JMSException + { + try + { + return _data.getString(Charset.forName("UTF-8").newDecoder()); + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + je.setLinkedException(e); + throw je; + } + } + + protected int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + checkReadable(); + // first call + if (_byteArrayRemaining == -1) + { + // type discriminator checked separately so you get a MessageFormatException rather than + // an EOF even in the case where both would be applicable + checkAvailable(1); + byte wireType = readWireType(); + if (wireType != BYTEARRAY_TYPE) + { + throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); + } + checkAvailable(4); + int size = _data.getInt(); + // length of -1 indicates null + if (size == -1) + { + return -1; + } + else + { + if (size > _data.remaining()) + { + throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " + + _data.remaining() + " bytes"); + } + else + { + _byteArrayRemaining = size; + } + } + } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } + + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; + } + + private int readBytesImpl(byte[] bytes) + { + int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); + _byteArrayRemaining -= count; + + if (count == 0) + { + return 0; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + protected Object readObject() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + Object result = null; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + byte[] bytesResult = new byte[size]; + readBytesImpl(bytesResult); + result = bytesResult; + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected void writeBoolean(boolean b) throws JMSException + { + writeTypeDiscriminator(BOOLEAN_TYPE); + _data.put(b ? (byte) 1 : (byte) 0); + } + + protected void writeByte(byte b) throws JMSException + { + writeTypeDiscriminator(BYTE_TYPE); + _data.put(b); + } + + protected void writeShort(short i) throws JMSException + { + writeTypeDiscriminator(SHORT_TYPE); + _data.putShort(i); + } + + protected void writeChar(char c) throws JMSException + { + writeTypeDiscriminator(CHAR_TYPE); + _data.putChar(c); + } + + protected void writeInt(int i) throws JMSException + { + writeTypeDiscriminator(INT_TYPE); + writeIntImpl(i); + } + + protected void writeIntImpl(int i) + { + _data.putInt(i); + } + + protected void writeLong(long l) throws JMSException + { + writeTypeDiscriminator(LONG_TYPE); + _data.putLong(l); + } + + protected void writeFloat(float v) throws JMSException + { + writeTypeDiscriminator(FLOAT_TYPE); + _data.putFloat(v); + } + + protected void writeDouble(double v) throws JMSException + { + writeTypeDiscriminator(DOUBLE_TYPE); + _data.putDouble(v); + } + + protected void writeString(String string) throws JMSException + { + if (string == null) + { + writeTypeDiscriminator(NULL_STRING_TYPE); + } + else + { + writeTypeDiscriminator(STRING_TYPE); + try + { + writeStringImpl(string); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } + } + } + + protected void writeStringImpl(String string) + throws CharacterCodingException + { + _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must write the null terminator ourselves + _data.put((byte) 0); + } + + protected void writeBytes(byte[] bytes) throws JMSException + { + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); + } + + protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + writeTypeDiscriminator(BYTEARRAY_TYPE); + if (bytes == null) + { + _data.putInt(-1); + } + else + { + _data.putInt(length); + _data.put(bytes, offset, length); + } + } + + protected void writeObject(Object object) throws JMSException + { + checkWritable(); + Class clazz; + + if (object == null) + { + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); + } + + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeString((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java new file mode 100644 index 0000000000..5ea830b2cc --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java @@ -0,0 +1,685 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import org.apache.commons.collections.map.ReferenceMap; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.*; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; + +import java.util.Collections; +import java.util.Enumeration; +import java.util.Map; +import java.util.UUID; + +public abstract class AbstractJMSMessage extends QpidMessage implements org.apache.qpid.jms.Message +{ + private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); + + protected boolean _redelivered; + + protected ByteBuffer _data; + private boolean _readableProperties = false; + protected boolean _readableMessage = false; + protected boolean _changedData; + private Destination _destination; + private JMSHeaderAdapter _headerAdapter; + private BasicMessageConsumer _consumer; + private boolean _strictAMQP; + + protected AbstractJMSMessage(ByteBuffer data) + { + super(new BasicContentHeaderProperties()); + _data = data; + if (_data != null) + { + _data.acquire(); + } + + _readableProperties = false; + _readableMessage = (data != null); + _changedData = (data == null); + _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); + + _strictAMQP = + Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); + } + + protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException + { + this(contentHeader, deliveryTag); + + Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); + + AMQDestination dest; + + if (AMQDestination.QUEUE_TYPE.equals(type)) + { + dest = new AMQQueue(exchange, routingKey, routingKey); + } + else if (AMQDestination.TOPIC_TYPE.equals(type)) + { + dest = new AMQTopic(exchange, routingKey, null); + } + else + { + dest = new AMQUndefinedDestination(exchange, routingKey, null); + } + // Destination dest = AMQDestination.createDestination(url); + setJMSDestination(dest); + + _data = data; + if (_data != null) + { + _data.acquire(); + } + + _readableMessage = data != null; + + } + + protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) + { + super(contentHeader, deliveryTag); + _readableProperties = (_contentHeaderProperties != null); + _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); + } + + public String getJMSMessageID() throws JMSException + { + if (getContentHeaderProperties().getMessageIdAsString() == null) + { + getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID()); + } + + return getContentHeaderProperties().getMessageIdAsString(); + } + + public void setJMSMessageID(String messageId) throws JMSException + { + getContentHeaderProperties().setMessageId(messageId); + } + + public long getJMSTimestamp() throws JMSException + { + return getContentHeaderProperties().getTimestamp(); + } + + public void setJMSTimestamp(long timestamp) throws JMSException + { + getContentHeaderProperties().setTimestamp(timestamp); + } + + public byte[] getJMSCorrelationIDAsBytes() throws JMSException + { + return getContentHeaderProperties().getCorrelationIdAsString().getBytes(); + } + + public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException + { + getContentHeaderProperties().setCorrelationId(new String(bytes)); + } + + public void setJMSCorrelationID(String correlationId) throws JMSException + { + getContentHeaderProperties().setCorrelationId(correlationId); + } + + public String getJMSCorrelationID() throws JMSException + { + return getContentHeaderProperties().getCorrelationIdAsString(); + } + + public Destination getJMSReplyTo() throws JMSException + { + String replyToEncoding = getContentHeaderProperties().getReplyToAsString(); + if (replyToEncoding == null) + { + return null; + } + else + { + Destination dest = (Destination) _destinationCache.get(replyToEncoding); + if (dest == null) + { + try + { + BindingURL binding = new AMQBindingURL(replyToEncoding); + dest = AMQDestination.createDestination(binding); + } + catch (URLSyntaxException e) + { + throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e); + } + + _destinationCache.put(replyToEncoding, dest); + } + + return dest; + } + } + + public void setJMSReplyTo(Destination destination) throws JMSException + { + if (destination == null) + { + throw new IllegalArgumentException("Null destination not allowed"); + } + + if (!(destination instanceof AMQDestination)) + { + throw new IllegalArgumentException( + "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); + } + + final AMQDestination amqd = (AMQDestination) destination; + + final AMQShortString encodedDestination = amqd.getEncodedName(); + _destinationCache.put(encodedDestination, destination); + getContentHeaderProperties().setReplyTo(encodedDestination); + } + + public Destination getJMSDestination() throws JMSException + { + return _destination; + } + + public void setJMSDestination(Destination destination) + { + _destination = destination; + } + + public int getJMSDeliveryMode() throws JMSException + { + return getContentHeaderProperties().getDeliveryMode(); + } + + public void setJMSDeliveryMode(int i) throws JMSException + { + getContentHeaderProperties().setDeliveryMode((byte) i); + } + + public BasicContentHeaderProperties getContentHeaderProperties() + { + return (BasicContentHeaderProperties) _contentHeaderProperties; + } + + public boolean getJMSRedelivered() throws JMSException + { + return _redelivered; + } + + public void setJMSRedelivered(boolean b) throws JMSException + { + _redelivered = b; + } + + public String getJMSType() throws JMSException + { + return getContentHeaderProperties().getTypeAsString(); + } + + public void setJMSType(String string) throws JMSException + { + getContentHeaderProperties().setType(string); + } + + public long getJMSExpiration() throws JMSException + { + return getContentHeaderProperties().getExpiration(); + } + + public void setJMSExpiration(long l) throws JMSException + { + getContentHeaderProperties().setExpiration(l); + } + + public int getJMSPriority() throws JMSException + { + return getContentHeaderProperties().getPriority(); + } + + public void setJMSPriority(int i) throws JMSException + { + getContentHeaderProperties().setPriority((byte) i); + } + + public void clearProperties() throws JMSException + { + getJmsHeaders().clear(); + + _readableProperties = false; + } + + public void clearBody() throws JMSException + { + clearBodyImpl(); + _readableMessage = false; + } + + public boolean propertyExists(AMQShortString propertyName) throws JMSException + { + return getJmsHeaders().propertyExists(propertyName); + } + + public boolean propertyExists(String propertyName) throws JMSException + { + return getJmsHeaders().propertyExists(propertyName); + } + + public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getBoolean(propertyName); + } + + public boolean getBooleanProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getBoolean(propertyName); + } + + public byte getByteProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getByte(propertyName); + } + + public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getBytes(propertyName); + } + + public short getShortProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getShort(propertyName); + } + + public int getIntProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getInteger(propertyName); + } + + public long getLongProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getLong(propertyName); + } + + public float getFloatProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getFloat(propertyName); + } + + public double getDoubleProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getDouble(propertyName); + } + + public String getStringProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getString(propertyName); + } + + public Object getObjectProperty(String propertyName) throws JMSException + { + return getJmsHeaders().getObject(propertyName); + } + + public Enumeration getPropertyNames() throws JMSException + { + return getJmsHeaders().getPropertyNames(); + } + + public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setBoolean(propertyName, b); + } + + public void setBooleanProperty(String propertyName, boolean b) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setBoolean(propertyName, b); + } + + public void setByteProperty(String propertyName, byte b) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setByte(propertyName, new Byte(b)); + } + + public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setBytes(propertyName, bytes); + } + + public void setShortProperty(String propertyName, short i) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setShort(propertyName, new Short(i)); + } + + public void setIntProperty(String propertyName, int i) throws JMSException + { + checkWritableProperties(); + JMSHeaderAdapter.checkPropertyName(propertyName); + super.setIntProperty(new AMQShortString(propertyName), new Integer(i)); + } + + public void setLongProperty(String propertyName, long l) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setLong(propertyName, new Long(l)); + } + + public void setFloatProperty(String propertyName, float f) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setFloat(propertyName, new Float(f)); + } + + public void setDoubleProperty(String propertyName, double v) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setDouble(propertyName, new Double(v)); + } + + public void setStringProperty(String propertyName, String value) throws JMSException + { + checkWritableProperties(); + JMSHeaderAdapter.checkPropertyName(propertyName); + super.setLongStringProperty(new AMQShortString(propertyName), value); + } + + public void setObjectProperty(String propertyName, Object object) throws JMSException + { + checkWritableProperties(); + getJmsHeaders().setObject(propertyName, object); + } + + protected void removeProperty(AMQShortString propertyName) throws JMSException + { + getJmsHeaders().remove(propertyName); + } + + protected void removeProperty(String propertyName) throws JMSException + { + getJmsHeaders().remove(propertyName); + } + + public void acknowledgeThis() throws JMSException + { + // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge + // is not specified. In our case, we only set the session field where client acknowledge mode is specified. + if (_session != null) + { + if (_session.getAMQConnection().isClosed()) + { + throw new javax.jms.IllegalStateException("Connection is already closed"); + } + + // we set multiple to true here since acknowledgement implies acknowledge of all previous messages + // received on the session + _session.acknowledgeMessage(_deliveryTag, true); + } + } + + public void acknowledge() throws JMSException + { + if (_session != null) + { + _session.acknowledge(); + } + } + + /** + * This forces concrete classes to implement clearBody() + * + * @throws JMSException + */ + public abstract void clearBodyImpl() throws JMSException; + + /** + * Get a String representation of the body of the message. Used in the toString() method which outputs this before + * message properties. + */ + public abstract String toBodyString() throws JMSException; + + public String getMimeType() + { + return getMimeTypeAsShortString().toString(); + } + + public abstract AMQShortString getMimeTypeAsShortString(); + + public String toString() + { + try + { + StringBuffer buf = new StringBuffer("Body:\n"); + buf.append(toBodyString()); + buf.append("\nJMS Correlation ID: ").append(getJMSCorrelationID()); + buf.append("\nJMS timestamp: ").append(getJMSTimestamp()); + buf.append("\nJMS expiration: ").append(getJMSExpiration()); + buf.append("\nJMS priority: ").append(getJMSPriority()); + buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode()); + buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); + buf.append("\nJMS Redelivered: ").append(_redelivered); + buf.append("\nJMS Destination: ").append(getJMSDestination()); + buf.append("\nJMS Type: ").append(getJMSType()); + buf.append("\nJMS MessageID: ").append(getJMSMessageID()); + buf.append("\nAMQ message number: ").append(_deliveryTag); + + buf.append("\nProperties:"); + if (getJmsHeaders().isEmpty()) + { + buf.append("<NONE>"); + } + else + { + buf.append('\n').append(getJmsHeaders().getHeaders()); + } + + return buf.toString(); + } + catch (JMSException e) + { + return e.toString(); + } + } + + public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties) + { + getContentHeaderProperties().setHeaders(messageProperties); + } + + public JMSHeaderAdapter getJmsHeaders() + { + return _headerAdapter; + } + + public ByteBuffer getData() + { + // make sure we rewind the data just in case any method has moved the + // position beyond the start + if (_data != null) + { + reset(); + } + + return _data; + } + + protected void checkReadable() throws MessageNotReadableException + { + if (!_readableMessage) + { + throw new MessageNotReadableException("You need to call reset() to make the message readable"); + } + } + + protected void checkWritable() throws MessageNotWriteableException + { + if (_readableMessage) + { + throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); + } + } + + protected void checkWritableProperties() throws MessageNotWriteableException + { + if (_readableProperties) + { + throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable"); + } + } + + public boolean isReadable() + { + return _readableMessage; + } + + public boolean isWritable() + { + return !_readableMessage; + } + + public void reset() + { + if (!_changedData) + { + _data.rewind(); + } + else + { + _data.flip(); + _changedData = false; + } + } + + public void setConsumer(BasicMessageConsumer basicMessageConsumer) + { + _consumer = basicMessageConsumer; + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessageFactory.java new file mode 100644 index 0000000000..39924f52f9 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessageFactory.java @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; + +import java.util.Iterator; +import java.util.List; + +public abstract class AbstractJMSMessageFactory implements MessageFactory +{ + private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class); + + protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange, + AMQShortString routingKey, ContentHeaderBody contentHeader) throws AMQException; + + protected AbstractJMSMessage createMessageWithBody(long messageNbr, ContentHeaderBody contentHeader, + AMQShortString exchange, AMQShortString routingKey, List bodies) throws AMQException + { + ByteBuffer data; + final boolean debug = _logger.isDebugEnabled(); + + // we optimise the non-fragmented case to avoid copying + if ((bodies != null) && (bodies.size() == 1)) + { + if (debug) + { + _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")"); + } + + data = ((ContentBody) bodies.get(0)).payload; + } + else if (bodies != null) + { + if (debug) + { + _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize + + ")"); + } + + data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem? + final Iterator it = bodies.iterator(); + while (it.hasNext()) + { + ContentBody cb = (ContentBody) it.next(); + data.put(cb.payload); + cb.payload.release(); + } + + data.flip(); + } + else // bodies == null + { + data = ByteBuffer.allocate(0); + } + + if (debug) + { + _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + + data.remaining()); + } + + return createMessage(messageNbr, data, exchange, routingKey, contentHeader); + } + + public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader, + AMQShortString exchange, AMQShortString routingKey, List bodies) throws JMSException, AMQException + { + final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); + msg.setJMSRedelivered(redelivered); + + return msg; + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessage.java new file mode 100644 index 0000000000..adea0f96ba --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessage.java @@ -0,0 +1,388 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage +{ + public static final String MIME_TYPE = "application/octet-stream"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + + + public JMSBytesMessage() + { + this(null); + } + + /** + * Construct a bytes message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + JMSBytesMessage(ByteBuffer data) + { + super(data); // this instanties a content header + } + + JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException + { + super(messageNbr, contentHeader, exchange, routingKey, data); + } + + public void reset() + { + super.reset(); + _readableMessage = true; + } + + public AMQShortString getMimeTypeAsShortString() + { + return MIME_TYPE_SHORT_STRING; + } + + public long getBodyLength() throws JMSException + { + checkReadable(); + return _data.limit(); + } + + public boolean readBoolean() throws JMSException + { + checkReadable(); + checkAvailable(1); + return _data.get() != 0; + } + + public byte readByte() throws JMSException + { + checkReadable(); + checkAvailable(1); + return _data.get(); + } + + public int readUnsignedByte() throws JMSException + { + checkReadable(); + checkAvailable(1); + return _data.getUnsigned(); + } + + public short readShort() throws JMSException + { + checkReadable(); + checkAvailable(2); + return _data.getShort(); + } + + public int readUnsignedShort() throws JMSException + { + checkReadable(); + checkAvailable(2); + return _data.getUnsignedShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws JMSException + */ + public char readChar() throws JMSException + { + checkReadable(); + checkAvailable(2); + return _data.getChar(); + } + + public int readInt() throws JMSException + { + checkReadable(); + checkAvailable(4); + return _data.getInt(); + } + + public long readLong() throws JMSException + { + checkReadable(); + checkAvailable(8); + return _data.getLong(); + } + + public float readFloat() throws JMSException + { + checkReadable(); + checkAvailable(4); + return _data.getFloat(); + } + + public double readDouble() throws JMSException + { + checkReadable(); + checkAvailable(8); + return _data.getDouble(); + } + + public String readUTF() throws JMSException + { + checkReadable(); + // we check only for one byte since theoretically the string could be only a + // single byte when using UTF-8 encoding + + try + { + short length = readShort(); + if(length == 0) + { + return ""; + } + else + { + CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); + ByteBuffer encodedString = _data.slice(); + encodedString.limit(length); + _data.position(_data.position()+length); + CharBuffer string = decoder.decode(encodedString.buf()); + + return string.toString(); + } + + + + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + je.setLinkedException(e); + throw je; + } + } + + public int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + checkReadable(); + int count = (_data.remaining() >= bytes.length ? bytes.length : _data.remaining()); + if (count == 0) + { + return -1; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + public int readBytes(byte[] bytes, int maxLength) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + if (maxLength > bytes.length) + { + throw new IllegalArgumentException("maxLength must be <= bytes.length"); + } + checkReadable(); + int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining()); + if (count == 0) + { + return -1; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + public void writeBoolean(boolean b) throws JMSException + { + checkWritable(); + _changedData = true; + _data.put(b ? (byte) 1 : (byte) 0); + } + + public void writeByte(byte b) throws JMSException + { + checkWritable(); + _changedData = true; + _data.put(b); + } + + public void writeShort(short i) throws JMSException + { + checkWritable(); + _changedData = true; + _data.putShort(i); + } + + public void writeChar(char c) throws JMSException + { + checkWritable(); + _changedData = true; + _data.putChar(c); + } + + public void writeInt(int i) throws JMSException + { + checkWritable(); + _changedData = true; + _data.putInt(i); + } + + public void writeLong(long l) throws JMSException + { + checkWritable(); + _changedData = true; + _data.putLong(l); + } + + public void writeFloat(float v) throws JMSException + { + checkWritable(); + _changedData = true; + _data.putFloat(v); + } + + public void writeDouble(double v) throws JMSException + { + checkWritable(); + _changedData = true; + _data.putDouble(v); + } + + public void writeUTF(String string) throws JMSException + { + checkWritable(); + try + { + CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder(); + java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); + + _data.putShort((short)encodedString.limit()); + _data.put(encodedString); + _changedData = true; + //_data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must add the null terminator manually + //_data.put((byte)0); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } + } + + public void writeBytes(byte[] bytes) throws JMSException + { + checkWritable(); + _data.put(bytes); + _changedData = true; + } + + public void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + checkWritable(); + _data.put(bytes, offset, length); + _changedData = true; + } + + public void writeObject(Object object) throws JMSException + { + checkWritable(); + if (object == null) + { + throw new NullPointerException("Argument must not be null"); + } + Class clazz = object.getClass(); + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeUTF((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } + } + + public String toString() + { + return String.valueOf(System.identityHashCode(this)); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessageFactory.java new file mode 100644 index 0000000000..2c59aeebe1 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSBytesMessageFactory.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSBytesMessageFactory extends AbstractJMSMessageFactory +{ + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException + { + return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data); + } + + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSBytesMessage(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSHeaderAdapter.java new file mode 100644 index 0000000000..c19c0b3290 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSHeaderAdapter.java @@ -0,0 +1,552 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import java.util.Enumeration; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + + +public final class JMSHeaderAdapter +{ + private final FieldTable _headers; + + public JMSHeaderAdapter(FieldTable headers) + { + _headers = headers; + } + + + public FieldTable getHeaders() + { + return _headers; + } + + public boolean getBoolean(String string) throws JMSException + { + checkPropertyName(string); + Boolean b = getHeaders().getBoolean(string); + + if (b == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getBoolean can't use " + string + " item."); + } + else + { + return Boolean.valueOf((String) str); + } + } + else + { + b = Boolean.valueOf(null); + } + } + + return b; + } + + public boolean getBoolean(AMQShortString string) throws JMSException + { + checkPropertyName(string); + Boolean b = getHeaders().getBoolean(string); + + if (b == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getBoolean can't use " + string + " item."); + } + else + { + return Boolean.valueOf((String) str); + } + } + else + { + b = Boolean.valueOf(null); + } + } + + return b; + } + + public char getCharacter(String string) throws JMSException + { + checkPropertyName(string); + Character c = getHeaders().getCharacter(string); + + if (c == null) + { + if (getHeaders().isNullStringValue(string)) + { + throw new NullPointerException("Cannot convert null char"); + } + else + { + throw new MessageFormatException("getChar can't use " + string + " item."); + } + } + else + { + return (char) c; + } + } + + public byte[] getBytes(String string) throws JMSException + { + return getBytes(new AMQShortString(string)); + } + + public byte[] getBytes(AMQShortString string) throws JMSException + { + checkPropertyName(string); + + byte[] bs = getHeaders().getBytes(string); + + if (bs == null) + { + throw new MessageFormatException("getBytes can't use " + string + " item."); + } + else + { + return bs; + } + } + + public byte getByte(String string) throws JMSException + { + checkPropertyName(string); + Byte b = getHeaders().getByte(string); + if (b == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getByte can't use " + string + " item."); + } + else + { + return Byte.valueOf((String) str); + } + } + else + { + b = Byte.valueOf(null); + } + } + + return b; + } + + public short getShort(String string) throws JMSException + { + checkPropertyName(string); + Short s = getHeaders().getShort(string); + + if (s == null) + { + s = Short.valueOf(getByte(string)); + } + + return s; + } + + public int getInteger(String string) throws JMSException + { + checkPropertyName(string); + Integer i = getHeaders().getInteger(string); + + if (i == null) + { + i = Integer.valueOf(getShort(string)); + } + + return i; + } + + public long getLong(String string) throws JMSException + { + checkPropertyName(string); + Long l = getHeaders().getLong(string); + + if (l == null) + { + l = Long.valueOf(getInteger(string)); + } + + return l; + } + + public float getFloat(String string) throws JMSException + { + checkPropertyName(string); + Float f = getHeaders().getFloat(string); + + if (f == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getFloat can't use " + string + " item."); + } + else + { + return Float.valueOf((String) str); + } + } + else + { + f = Float.valueOf(null); + } + + } + + return f; + } + + public double getDouble(String string) throws JMSException + { + checkPropertyName(string); + Double d = getHeaders().getDouble(string); + + if (d == null) + { + d = Double.valueOf(getFloat(string)); + } + + return d; + } + + public String getString(String string) throws JMSException + { + checkPropertyName(string); + String s = getHeaders().getString(string); + + if (s == null) + { + if (getHeaders().containsKey(string)) + { + Object o = getHeaders().getObject(string); + if (o instanceof byte[]) + { + throw new MessageFormatException("getObject couldn't find " + string + " item."); + } + else + { + if (o == null) + { + return null; + } + else + { + s = String.valueOf(o); + } + } + }//else return s // null; + } + + return s; + } + + public Object getObject(String string) throws JMSException + { + checkPropertyName(string); + return getHeaders().getObject(string); + } + + public void setBoolean(AMQShortString string, boolean b) throws JMSException + { + checkPropertyName(string); + getHeaders().setBoolean(string, b); + } + + public void setBoolean(String string, boolean b) throws JMSException + { + checkPropertyName(string); + getHeaders().setBoolean(string, b); + } + + public void setChar(String string, char c) throws JMSException + { + checkPropertyName(string); + getHeaders().setChar(string, c); + } + + public Object setBytes(AMQShortString string, byte[] bytes) + { + checkPropertyName(string); + return getHeaders().setBytes(string, bytes); + } + + public Object setBytes(String string, byte[] bytes) + { + checkPropertyName(string); + return getHeaders().setBytes(string, bytes); + } + + public Object setBytes(String string, byte[] bytes, int start, int length) + { + checkPropertyName(string); + return getHeaders().setBytes(string, bytes, start, length); + } + + public void setByte(String string, byte b) throws JMSException + { + checkPropertyName(string); + getHeaders().setByte(string, b); + } + + public void setByte(AMQShortString string, byte b) throws JMSException + { + checkPropertyName(string); + getHeaders().setByte(string, b); + } + + + public void setShort(String string, short i) throws JMSException + { + checkPropertyName(string); + getHeaders().setShort(string, i); + } + + public void setInteger(String string, int i) throws JMSException + { + checkPropertyName(string); + getHeaders().setInteger(string, i); + } + + public void setInteger(AMQShortString string, int i) throws JMSException + { + checkPropertyName(string); + getHeaders().setInteger(string, i); + } + + public void setLong(String string, long l) throws JMSException + { + checkPropertyName(string); + getHeaders().setLong(string, l); + } + + public void setFloat(String string, float v) throws JMSException + { + checkPropertyName(string); + getHeaders().setFloat(string, v); + } + + public void setDouble(String string, double v) throws JMSException + { + checkPropertyName(string); + getHeaders().setDouble(string, v); + } + + public void setString(String string, String string1) throws JMSException + { + checkPropertyName(string); + getHeaders().setString(string, string1); + } + + public void setString(AMQShortString string, String string1) throws JMSException + { + checkPropertyName(string); + getHeaders().setString(string, string1); + } + + public void setObject(String string, Object object) throws JMSException + { + checkPropertyName(string); + try + { + getHeaders().setObject(string, object); + } + catch (AMQPInvalidClassException aice) + { + MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass()); + mfe.setLinkedException(aice); + throw mfe; + } + } + + public boolean itemExists(String string) throws JMSException + { + checkPropertyName(string); + return getHeaders().containsKey(string); + } + + public Enumeration getPropertyNames() + { + return getHeaders().getPropertyNames(); + } + + public void clear() + { + getHeaders().clear(); + } + + public boolean propertyExists(AMQShortString propertyName) + { + checkPropertyName(propertyName); + return getHeaders().propertyExists(propertyName); + } + + public boolean propertyExists(String propertyName) + { + checkPropertyName(propertyName); + return getHeaders().propertyExists(propertyName); + } + + public Object put(Object key, Object value) + { + checkPropertyName(key.toString()); + return getHeaders().setObject(key.toString(), value); + } + + public Object remove(AMQShortString propertyName) + { + checkPropertyName(propertyName); + return getHeaders().remove(propertyName); + } + + public Object remove(String propertyName) + { + checkPropertyName(propertyName); + return getHeaders().remove(propertyName); + } + + public boolean isEmpty() + { + return getHeaders().isEmpty(); + } + + public void writeToBuffer(ByteBuffer data) + { + getHeaders().writeToBuffer(data); + } + + public Enumeration getMapNames() + { + return getPropertyNames(); + } + + protected static void checkPropertyName(CharSequence propertyName) + { + if (propertyName == null) + { + throw new IllegalArgumentException("Property name must not be null"); + } + else if (propertyName.length() == 0) + { + throw new IllegalArgumentException("Property name must not be the empty string"); + } + + checkIdentiferFormat(propertyName); + } + + protected static void checkIdentiferFormat(CharSequence propertyName) + { +// JMS requirements 3.5.1 Property Names +// Identifiers: +// - An identifier is an unlimited-length character sequence that must begin +// with a Java identifier start character; all following characters must be Java +// identifier part characters. An identifier start character is any character for +// which the method Character.isJavaIdentifierStart returns true. This includes +// '_' and '$'. An identifier part character is any character for which the +// method Character.isJavaIdentifierPart returns true. +// - Identifiers cannot be the names NULL, TRUE, or FALSE. +// � Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or +// ESCAPE. +// � Identifiers are either header field references or property references. The +// type of a property value in a message selector corresponds to the type +// used to set the property. If a property that does not exist in a message is +// referenced, its value is NULL. The semantics of evaluating NULL values +// in a selector are described in Section 3.8.1.2, �Null Values.� +// � The conversions that apply to the get methods for properties do not +// apply when a property is used in a message selector expression. For +// example, suppose you set a property as a string value, as in the +// following: +// myMessage.setStringProperty("NumberOfOrders", "2"); +// The following expression in a message selector would evaluate to false, +// because a string cannot be used in an arithmetic expression: +// "NumberOfOrders > 1" +// � Identifiers are case sensitive. +// � Message header field references are restricted to JMSDeliveryMode, +// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and +// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be +// null and if so are treated as a NULL value. + + if (Boolean.getBoolean("strict-jms")) + { + // JMS start character + if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); + } + + // JMS part character + int length = propertyName.length(); + for (int c = 1; c < length; c++) + { + if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); + } + } + + // JMS invalid names + if ((propertyName.equals("NULL") + || propertyName.equals("TRUE") + || propertyName.equals("FALSE") + || propertyName.equals("NOT") + || propertyName.equals("AND") + || propertyName.equals("OR") + || propertyName.equals("BETWEEN") + || propertyName.equals("LIKE") + || propertyName.equals("IN") + || propertyName.equals("IS") + || propertyName.equals("ESCAPE"))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); + } + } + + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessage.java new file mode 100644 index 0000000000..2d90157410 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessage.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.nclient.jms.message; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + +import java.nio.charset.CharacterCodingException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; + +public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage +{ + private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class); + + public static final String MIME_TYPE = "jms/map-message"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + + private Map<String, Object> _map = new HashMap<String, Object>(); + + public JMSMapMessage() throws JMSException + { + this(null); + } + + JMSMapMessage(ByteBuffer data) throws JMSException + { + super(data); // this instantiates a content header + populateMapFromData(); + } + + JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, + ByteBuffer data) throws AMQException + { + super(messageNbr, contentHeader, exchange, routingKey, data); + try + { + populateMapFromData(); + } + catch (JMSException je) + { + throw new AMQException(null, "Error populating MapMessage from ByteBuffer", je); + + } + + } + + public String toBodyString() throws JMSException + { + return _map.toString(); + } + + public AMQShortString getMimeTypeAsShortString() + { + return MIME_TYPE_SHORT_STRING; + } + + public ByteBuffer getData() + { + // What if _data is null? + writeMapToData(); + + return super.getData(); + } + + @Override + public void clearBodyImpl() throws JMSException + { + super.clearBodyImpl(); + _map.clear(); + } + + public boolean getBoolean(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Boolean) + { + return ((Boolean) value).booleanValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Boolean.valueOf((String) value); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to boolean."); + } + + } + + public byte getByte(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Byte) + { + return ((Byte) value).byteValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Byte.valueOf((String) value).byteValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to byte."); + } + } + + public short getShort(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Short) + { + return ((Short) value).shortValue(); + } + else if (value instanceof Byte) + { + return ((Byte) value).shortValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Short.valueOf((String) value).shortValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to short."); + } + + } + + public int getInt(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Integer) + { + return ((Integer) value).intValue(); + } + else if (value instanceof Short) + { + return ((Short) value).intValue(); + } + else if (value instanceof Byte) + { + return ((Byte) value).intValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Integer.valueOf((String) value).intValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to int."); + } + + } + + public long getLong(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Long) + { + return ((Long) value).longValue(); + } + else if (value instanceof Integer) + { + return ((Integer) value).longValue(); + } + + if (value instanceof Short) + { + return ((Short) value).longValue(); + } + + if (value instanceof Byte) + { + return ((Byte) value).longValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Long.valueOf((String) value).longValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to long."); + } + + } + + public char getChar(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (!_map.containsKey(propName)) + { + throw new MessageFormatException("Property " + propName + " not present"); + } + else if (value instanceof Character) + { + return ((Character) value).charValue(); + } + else if (value == null) + { + throw new NullPointerException("Property " + propName + " has null value and therefore cannot " + + "be converted to char."); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to boolan."); + } + + } + + public float getFloat(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Float) + { + return ((Float) value).floatValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Float.valueOf((String) value).floatValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to float."); + } + } + + public double getDouble(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Double) + { + return ((Double) value).doubleValue(); + } + else if (value instanceof Float) + { + return ((Float) value).doubleValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Double.valueOf((String) value).doubleValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to double."); + } + } + + public String getString(String propName) throws JMSException + { + Object value = _map.get(propName); + + if ((value instanceof String) || (value == null)) + { + return (String) value; + } + else if (value instanceof byte[]) + { + throw new MessageFormatException("Property " + propName + " of type byte[] " + "cannot be converted to String."); + } + else + { + return value.toString(); + } + + } + + public byte[] getBytes(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (!_map.containsKey(propName)) + { + throw new MessageFormatException("Property " + propName + " not present"); + } + else if ((value instanceof byte[]) || (value == null)) + { + return (byte[]) value; + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to byte[]."); + } + } + + public Object getObject(String propName) throws JMSException + { + return _map.get(propName); + } + + public Enumeration getMapNames() throws JMSException + { + return Collections.enumeration(_map.keySet()); + } + + public void setBoolean(String propName, boolean b) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, b); + } + + public void setByte(String propName, byte b) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, b); + } + + public void setShort(String propName, short i) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, i); + } + + public void setChar(String propName, char c) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, c); + } + + public void setInt(String propName, int i) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, i); + } + + public void setLong(String propName, long l) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, l); + } + + public void setFloat(String propName, float v) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, v); + } + + public void setDouble(String propName, double v) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, v); + } + + public void setString(String propName, String string1) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, string1); + } + + public void setBytes(String propName, byte[] bytes) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, bytes); + } + + public void setBytes(String propName, byte[] bytes, int offset, int length) throws JMSException + { + if ((offset == 0) && (length == bytes.length)) + { + setBytes(propName, bytes); + } + else + { + byte[] newBytes = new byte[length]; + System.arraycopy(bytes, offset, newBytes, 0, length); + setBytes(propName, newBytes); + } + } + + public void setObject(String propName, Object value) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer) + || (value instanceof Long) || (value instanceof Character) || (value instanceof Float) + || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null)) + { + _map.put(propName, value); + } + else + { + throw new MessageFormatException("Cannot set property " + propName + " to value " + value + "of type " + + value.getClass().getName() + "."); + } + } + + private void checkPropertyName(String propName) + { + if ((propName == null) || propName.equals("")) + { + throw new IllegalArgumentException("Property name cannot be null, or the empty String."); + } + } + + public boolean itemExists(String propName) throws JMSException + { + return _map.containsKey(propName); + } + + private void populateMapFromData() throws JMSException + { + if (_data != null) + { + _data.rewind(); + + final int entries = readIntImpl(); + for (int i = 0; i < entries; i++) + { + String propName = readStringImpl(); + Object value = readObject(); + _map.put(propName, value); + } + } + else + { + _map.clear(); + } + } + + private void writeMapToData() + { + allocateInitialBuffer(); + final int size = _map.size(); + writeIntImpl(size); + for (Map.Entry<String, Object> entry : _map.entrySet()) + { + try + { + writeStringImpl(entry.getKey()); + } + catch (CharacterCodingException e) + { + throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e); + + } + + try + { + writeObject(entry.getValue()); + } + catch (JMSException e) + { + Object value = entry.getValue(); + throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + " value : " + value + + " (type: " + value.getClass().getName() + ").", e); + } + } + + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessageFactory.java new file mode 100644 index 0000000000..c5594862e2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessageFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.nclient.jms.message; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSMapMessageFactory extends AbstractJMSMessageFactory +{ + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSMapMessage(); + } + + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException + { + return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessage.java new file mode 100644 index 0000000000..4afb7f050f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessage.java @@ -0,0 +1,197 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.ObjectMessage; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage +{ + public static final String MIME_TYPE = "application/java-object-stream"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + + private static final int DEFAULT_BUFFER_SIZE = 1024; + + /** + * Creates empty, writable message for use by producers + */ + public JMSObjectMessage() + { + this(null); + } + + private JMSObjectMessage(ByteBuffer data) + { + super(data); + if (data == null) + { + _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + _data.setAutoExpand(true); + } + + getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); + } + + /** + * Creates read only message for delivery to consumers + */ + JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, + ByteBuffer data) throws AMQException + { + super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); + } + + public void clearBodyImpl() throws JMSException + { + if (_data != null) + { + _data.release(); + } + + _data = null; + + } + + public String toBodyString() throws JMSException + { + return toString(_data); + } + + public AMQShortString getMimeTypeAsShortString() + { + return MIME_TYPE_SHORT_STRING; + } + + public void setObject(Serializable serializable) throws JMSException + { + checkWritable(); + + if (_data == null) + { + _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + _data.setAutoExpand(true); + } + else + { + _data.rewind(); + } + + try + { + ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream()); + out.writeObject(serializable); + out.flush(); + out.close(); + } + catch (IOException e) + { + MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e); + mfe.setLinkedException(e); + throw mfe; + } + + } + + public Serializable getObject() throws JMSException + { + ObjectInputStream in = null; + if (_data == null) + { + return null; + } + + try + { + _data.rewind(); + in = new ObjectInputStream(_data.asInputStream()); + + return (Serializable) in.readObject(); + } + catch (IOException e) + { + MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); + mfe.setLinkedException(e); + throw mfe; + } + catch (ClassNotFoundException e) + { + MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); + mfe.setLinkedException(e); + throw mfe; + } + finally + { + _data.rewind(); + close(in); + } + } + + private static void close(InputStream in) + { + try + { + if (in != null) + { + in.close(); + } + } + catch (IOException ignore) + { } + } + + private static String toString(ByteBuffer data) + { + if (data == null) + { + return null; + } + + int pos = data.position(); + try + { + return data.getString(Charset.forName("UTF8").newDecoder()); + } + catch (CharacterCodingException e) + { + return null; + } + finally + { + data.position(pos); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessageFactory.java new file mode 100644 index 0000000000..764984eea5 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessageFactory.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSObjectMessageFactory extends AbstractJMSMessageFactory +{ + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException + { + return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data); + } + + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSObjectMessage(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessage.java new file mode 100644 index 0000000000..e889936d20 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessage.java @@ -0,0 +1,204 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import javax.jms.JMSException; +import javax.jms.StreamMessage; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +/** + * @author Apache Software Foundation + */ +public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage +{ + public static final String MIME_TYPE="jms/stream-message"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + + + /** + * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read + * a byte array in multiple chunks, hence this is used to track how much is left to be read + */ + private int _byteArrayRemaining = -1; + + public JMSStreamMessage() + { + this(null); + } + + /** + * Construct a stream message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + JMSStreamMessage(ByteBuffer data) + { + super(data); // this instanties a content header + } + + + JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException + { + super(messageNbr, contentHeader, exchange, routingKey, data); + } + + public void reset() + { + super.reset(); + _readableMessage = true; + } + + public AMQShortString getMimeTypeAsShortString() + { + return MIME_TYPE_SHORT_STRING; + } + + + + public boolean readBoolean() throws JMSException + { + return super.readBoolean(); + } + + + public byte readByte() throws JMSException + { + return super.readByte(); + } + + public short readShort() throws JMSException + { + return super.readShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws JMSException + */ + public char readChar() throws JMSException + { + return super.readChar(); + } + + public int readInt() throws JMSException + { + return super.readInt(); + } + + public long readLong() throws JMSException + { + return super.readLong(); + } + + public float readFloat() throws JMSException + { + return super.readFloat(); + } + + public double readDouble() throws JMSException + { + return super.readDouble(); + } + + public String readString() throws JMSException + { + return super.readString(); + } + + public int readBytes(byte[] bytes) throws JMSException + { + return super.readBytes(bytes); + } + + + public Object readObject() throws JMSException + { + return super.readObject(); + } + + public void writeBoolean(boolean b) throws JMSException + { + super.writeBoolean(b); + } + + public void writeByte(byte b) throws JMSException + { + super.writeByte(b); + } + + public void writeShort(short i) throws JMSException + { + super.writeShort(i); + } + + public void writeChar(char c) throws JMSException + { + super.writeChar(c); + } + + public void writeInt(int i) throws JMSException + { + super.writeInt(i); + } + + public void writeLong(long l) throws JMSException + { + super.writeLong(l); + } + + public void writeFloat(float v) throws JMSException + { + super.writeFloat(v); + } + + public void writeDouble(double v) throws JMSException + { + super.writeDouble(v); + } + + public void writeString(String string) throws JMSException + { + super.writeString(string); + } + + public void writeBytes(byte[] bytes) throws JMSException + { + super.writeBytes(bytes); + } + + public void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + super.writeBytes(bytes,offset,length); + } + + public void writeObject(Object object) throws JMSException + { + super.writeObject(object); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessageFactory.java new file mode 100644 index 0000000000..40fe6da228 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessageFactory.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSStreamMessageFactory extends AbstractJMSMessageFactory +{ + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException + { + return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data); + } + + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSStreamMessage(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessage.java new file mode 100644 index 0000000000..83edc3dea6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessage.java @@ -0,0 +1,201 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import java.io.UnsupportedEncodingException; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; + +public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage +{ + private static final String MIME_TYPE = "text/plain"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + + + private String _decodedValue; + + /** + * This constant represents the name of a property that is set when the message payload is null. + */ + private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName(); + private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); + + public JMSTextMessage() throws JMSException + { + this(null, null); + } + + JMSTextMessage(ByteBuffer data, String encoding) throws JMSException + { + super(data); // this instantiates a content header + getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); + getContentHeaderProperties().setEncoding(encoding); + } + + JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) + throws AMQException + { + super(deliveryTag, contentHeader, exchange, routingKey, data); + contentHeader.setContentType(MIME_TYPE_SHORT_STRING); + _data = data; + } + + JMSTextMessage(ByteBuffer data) throws JMSException + { + this(data, null); + } + + JMSTextMessage(String text) throws JMSException + { + super((ByteBuffer) null); + setText(text); + } + + public void clearBodyImpl() throws JMSException + { + if (_data != null) + { + _data.release(); + } + _data = null; + _decodedValue = null; + } + + public String toBodyString() throws JMSException + { + return getText(); + } + + public void setData(ByteBuffer data) + { + _data = data; + } + + public AMQShortString getMimeTypeAsShortString() + { + return MIME_TYPE_SHORT_STRING; + } + + public void setText(String text) throws JMSException + { + checkWritable(); + + clearBody(); + try + { + if (text != null) + { + _data = ByteBuffer.allocate(text.length()); + _data.limit(text.length()) ; + //_data.sweep(); + _data.setAutoExpand(true); + final String encoding = getContentHeaderProperties().getEncodingAsString(); + if (encoding == null) + { + _data.put(text.getBytes(DEFAULT_CHARSET.name())); + } + else + { + _data.put(text.getBytes(encoding)); + } + _changedData=true; + } + _decodedValue = text; + } + catch (UnsupportedEncodingException e) + { + // should never occur + JMSException jmse = new JMSException("Unable to decode text data"); + jmse.setLinkedException(e); + } + } + + public String getText() throws JMSException + { + if (_data == null && _decodedValue == null) + { + return null; + } + else if (_decodedValue != null) + { + return _decodedValue; + } + else + { + _data.rewind(); + + if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY)) + { + return null; + } + if (getContentHeaderProperties().getEncodingAsString() != null) + { + try + { + _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder()); + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Could not decode string data: " + e); + je.setLinkedException(e); + throw je; + } + } + else + { + try + { + _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder()); + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Could not decode string data: " + e); + je.setLinkedException(e); + throw je; + } + } + return _decodedValue; + } + } + + @Override + public void prepareForSending() throws JMSException + { + super.prepareForSending(); + if (_data == null) + { + setBooleanProperty(PAYLOAD_NULL_PROPERTY, true); + } + else + { + removeProperty(PAYLOAD_NULL_PROPERTY); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessageFactory.java new file mode 100644 index 0000000000..7d2422d483 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessageFactory.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSTextMessageFactory extends AbstractJMSMessageFactory +{ + + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSTextMessage(); + } + + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException + { + return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, + exchange, routingKey, data); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageConverter.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageConverter.java new file mode 100644 index 0000000000..734b7c51ec --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageConverter.java @@ -0,0 +1,202 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageEOFException; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import java.util.Enumeration; + +public class MessageConverter +{ + + /** + * Log4J logger + */ + protected final Logger _logger = LoggerFactory.getLogger(getClass()); + + /** + * AbstractJMSMessage which will hold the converted message + */ + private AbstractJMSMessage _newMessage; + + public MessageConverter(AbstractJMSMessage message) throws JMSException + { + _newMessage = message; + } + + public MessageConverter(BytesMessage message) throws JMSException + { + BytesMessage bytesMessage = (BytesMessage) message; + bytesMessage.reset(); + + JMSBytesMessage nativeMsg = new JMSBytesMessage(); + + byte[] buf = new byte[1024]; + + int len; + + while ((len = bytesMessage.readBytes(buf)) != -1) + { + nativeMsg.writeBytes(buf, 0, len); + } + + _newMessage = nativeMsg; + setMessageProperties(message); + } + + public MessageConverter(MapMessage message) throws JMSException + { + MapMessage nativeMessage = new JMSMapMessage(); + + Enumeration mapNames = message.getMapNames(); + while (mapNames.hasMoreElements()) + { + String name = (String) mapNames.nextElement(); + nativeMessage.setObject(name, message.getObject(name)); + } + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + + public MessageConverter(ObjectMessage message) throws JMSException + { + ObjectMessage origMessage = (ObjectMessage) message; + ObjectMessage nativeMessage = new JMSObjectMessage(); + + nativeMessage.setObject(origMessage.getObject()); + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + + } + + public MessageConverter(TextMessage message) throws JMSException + { + TextMessage nativeMessage = new JMSTextMessage(); + + nativeMessage.setText(message.getText()); + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + + public MessageConverter(StreamMessage message) throws JMSException + { + StreamMessage nativeMessage = new JMSStreamMessage(); + + try + { + message.reset(); + while (true) + { + nativeMessage.writeObject(message.readObject()); + } + } + catch (MessageEOFException e) + { + // we're at the end so don't mind the exception + } + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + + public MessageConverter(Message message) throws JMSException + { + // Send a message with just properties. + // Throwing away content + BytesMessage nativeMessage = new JMSBytesMessage(); + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + + public AbstractJMSMessage getConvertedMessage() + { + return _newMessage; + } + + /** + * Sets all message properties + */ + protected void setMessageProperties(Message message) throws JMSException + { + setNonJMSProperties(message); + setJMSProperties(message); + } + + /** + * Sets all non-JMS defined properties on converted message + */ + protected void setNonJMSProperties(Message message) throws JMSException + { + Enumeration propertyNames = message.getPropertyNames(); + while (propertyNames.hasMoreElements()) + { + String propertyName = String.valueOf(propertyNames.nextElement()); + // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them + if (!propertyName.startsWith("JMSX_")) + { + Object value = message.getObjectProperty(propertyName); + _newMessage.setObjectProperty(propertyName, value); + } + } + } + + /** + * Exposed JMS defined properties on converted message: + * JMSDestination - we don't set here + * JMSDeliveryMode - set + * JMSExpiration - we don't set here + * JMSPriority - we don't set here + * JMSMessageID - we don't set here + * JMSTimestamp - we don't set here + * JMSCorrelationID - set + * JMSReplyTo - set + * JMSType - set + * JMSRedlivered - we don't set here + */ + protected void setJMSProperties(Message message) throws JMSException + { + _newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode()); + + if (message.getJMSReplyTo() != null) + { + _newMessage.setJMSReplyTo(message.getJMSReplyTo()); + } + + _newMessage.setJMSType(message.getJMSType()); + + _newMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactory.java new file mode 100644 index 0000000000..728766f579 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import java.util.List; + +import javax.jms.JMSException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + + +public interface MessageFactory +{ + AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, + ContentHeaderBody contentHeader, + AMQShortString exchange, AMQShortString routingKey, + List bodies) + throws JMSException, AMQException; + + AbstractJMSMessage createMessage() throws JMSException; +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactoryRegistry.java new file mode 100644 index 0000000000..a6225c2616 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactoryRegistry.java @@ -0,0 +1,127 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.JMSException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; + +public class MessageFactoryRegistry +{ + private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>(); + private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = + new HashMap<AMQShortString, MessageFactory>(); + + /** + * Construct a new registry with the default message factories registered + * @return a message factory registry + */ + public static MessageFactoryRegistry newDefaultRegistry() + { + MessageFactoryRegistry mf = new MessageFactoryRegistry(); + mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory()); + mf.registerFactory("text/plain", new JMSTextMessageFactory()); + mf.registerFactory("text/xml", new JMSTextMessageFactory()); + mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory()); + mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); + mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); + mf.registerFactory(null, new JMSBytesMessageFactory()); + + return mf; + } + + public void registerFactory(String mimeType, MessageFactory mf) + { + if (mf == null) + { + throw new IllegalArgumentException("Message factory must not be null"); + } + + _mimeStringToFactoryMap.put(mimeType, mf); + _mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf); + } + + public MessageFactory deregisterFactory(String mimeType) + { + _mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType)); + + return _mimeStringToFactoryMap.remove(mimeType); + } + + /** + * Create a message. This looks up the MIME type from the content header and instantiates the appropriate + * concrete message type. + * @param deliveryTag the AMQ message id + * @param redelivered true if redelivered + * @param contentHeader the content header that was received + * @param bodies a list of ContentBody instances + * @return the message. + * @throws AMQException + * @throws JMSException + */ + public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange, + AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies) + throws AMQException, JMSException + { + BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties; + + // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over + // AMQP. When the type is null, it can only be assumed that the message is a byte message. + AMQShortString contentTypeShortString = properties.getContentType(); + contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE) + : contentTypeShortString; + + MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString); + if (mf == null) + { + throw new AMQException(null, "Unsupport MIME type of " + properties.getContentTypeAsString(), null); + } + else + { + return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies); + } + } + + public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException + { + if (mimeType == null) + { + throw new IllegalArgumentException("Mime type must not be null"); + } + + MessageFactory mf = _mimeStringToFactoryMap.get(mimeType); + if (mf == null) + { + throw new AMQException(null, "Unsupport MIME type of " + mimeType, null); + } + else + { + return mf.createMessage(); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java new file mode 100644 index 0000000000..92a32f9bf4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java @@ -0,0 +1,135 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import javax.jms.JMSException; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.ContentHeaderProperties; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +import java.math.BigDecimal; + +public class QpidMessage +{ + protected ContentHeaderProperties _contentHeaderProperties; + + /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ + protected AMQSession _session; + + protected final long _deliveryTag; + + public QpidMessage(ContentHeaderProperties properties, long deliveryTag) + { + _contentHeaderProperties = properties; + _deliveryTag = deliveryTag; + } + + public QpidMessage(ContentHeaderProperties properties) + { + this(properties, -1); + } + + /** + * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls + * acknowledge() + * + * @param s the AMQ session that delivered this message + */ + public void setAMQSession(AMQSession s) + { + _session = s; + } + + public AMQSession getAMQSession() + { + return _session; + } + + /** + * Get the AMQ message number assigned to this message + * + * @return the message number + */ + public long getDeliveryTag() + { + return _deliveryTag; + } + + /** Invoked prior to sending the message. Allows the message to be modified if necessary before sending. */ + public void prepareForSending() throws JMSException + { + } + + public FieldTable getPropertyHeaders() + { + return ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders(); + } + + public void setDecimalProperty(AMQShortString propertyName, BigDecimal bd) throws JMSException + { + getPropertyHeaders().setDecimal(propertyName, bd); + } + + public void setIntProperty(AMQShortString propertyName, int i) throws JMSException + { + getPropertyHeaders().setInteger(propertyName, new Integer(i)); + } + + public void setLongStringProperty(AMQShortString propertyName, String value) + { + getPropertyHeaders().setString(propertyName, value); + } + + public void setTimestampProperty(AMQShortString propertyName, long value) + { + getPropertyHeaders().setTimestamp(propertyName, value); + } + + public void setVoidProperty(AMQShortString propertyName) + { + getPropertyHeaders().setVoid(propertyName); + } + + //** Getters + + public BigDecimal getDecimalProperty(AMQShortString propertyName) throws JMSException + { + return getPropertyHeaders().getDecimal(propertyName); + } + + public int getIntegerProperty(AMQShortString propertyName) throws JMSException + { + return getPropertyHeaders().getInteger(propertyName); + } + + public String getLongStringProperty(AMQShortString propertyName) + { + return getPropertyHeaders().getString(propertyName); + } + + public Long getTimestampProperty(AMQShortString propertyName) + { + return getPropertyHeaders().getTimestamp(propertyName); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/UnprocessedMessage.java new file mode 100644 index 0000000000..6a14bc1117 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/UnprocessedMessage.java @@ -0,0 +1,131 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.jms.message; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.qpid.framing.BasicDeliverBody; +import org.apache.qpid.framing.BasicReturnBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; + +/** + * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and + * the content body/ies. + * + * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher + * thread in order to minimise the amount of work done in the MINA dispatcher thread. + */ +public class UnprocessedMessage +{ + private long _bytesReceived = 0; + + private final BasicDeliverBody _deliverBody; + private final BasicReturnBody _bounceBody; // TODO: check change (gustavo) + private final int _channelId; + private ContentHeaderBody _contentHeader; + + /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ + private List<ContentBody> _bodies; + + public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) + { + _deliverBody = deliverBody; + _channelId = channelId; + _bounceBody = null; + } + + + public UnprocessedMessage(int channelId, BasicReturnBody bounceBody) + { + _deliverBody = null; + _channelId = channelId; + _bounceBody = bounceBody; + } + + public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException + { + + if (body.payload != null) + { + final long payloadSize = body.payload.remaining(); + + if (_bodies == null) + { + if (payloadSize == getContentHeader().bodySize) + { + _bodies = Collections.singletonList(body); + } + else + { + _bodies = new ArrayList<ContentBody>(); + _bodies.add(body); + } + + } + else + { + _bodies.add(body); + } + _bytesReceived += payloadSize; + } + } + + public boolean isAllBodyDataReceived() + { + return _bytesReceived == getContentHeader().bodySize; + } + + public BasicDeliverBody getDeliverBody() + { + return _deliverBody; + } + + public BasicReturnBody getBounceBody() + { + return _bounceBody; + } + + + public int getChannelId() + { + return _channelId; + } + + + public ContentHeaderBody getContentHeader() + { + return _contentHeader; + } + + public void setContentHeader(ContentHeaderBody contentHeader) + { + this._contentHeader = contentHeader; + } + + public List<ContentBody> getBodies() + { + return _bodies; + } + +} |
