summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java49
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java163
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java22
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java1
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java6
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java250
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java102
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java167
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java173
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java258
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java271
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java (renamed from qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java)4
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java165
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java4
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java92
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java410
21 files changed, 1325 insertions, 859 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index cc052f81df..50299fa9d5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -215,12 +215,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Exception lastException = new Exception();
lastException.initCause(new ConnectException());
- while (lastException != null && checkException(lastException) && _failoverPolicy.failoverAllowed())
+ while (!_connected && _failoverPolicy.failoverAllowed())
{
try
{
makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
lastException = null;
+ _connected = true;
}
catch (Exception e)
{
@@ -232,34 +233,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.debug("Are we connected:" + _connected);
- // Then the Failover Thread will handle conneciton
- if (_failoverPolicy.failoverAllowed())
- {
- //TODO this needs to be redone so that we are not spinning.
- // A suitable object should be set that is then waited on
- // and only notified when a connection is made or when
- // the AMQConnection gets closed.
- while (!_connected && !_closed.get())
- {
- try
- {
- _logger.debug("Sleeping.");
- Thread.sleep(100);
- }
- catch (InterruptedException ie)
- {
- _logger.debug("Woken up.");
- }
- }
- if (!_failoverPolicy.failoverAllowed() || _failoverPolicy.getCurrentBrokerDetails() == null)
- {
- if (_lastAMQException != null)
- {
- throw _lastAMQException;
- }
- }
- }
- else
+ if (!_connected)
{
String message = null;
@@ -318,7 +292,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private void setVirtualHost(String virtualHost)
{
- if(virtualHost.startsWith("/"))
+ if (virtualHost.startsWith("/"))
{
virtualHost = virtualHost.substring(1);
}
@@ -403,7 +377,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public boolean failoverAllowed()
{
- return _failoverPolicy.failoverAllowed();
+ if (!_connected)
+ {
+ return false;
+ }
+ else
+ {
+ return _failoverPolicy.failoverAllowed();
+ }
}
public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
@@ -815,6 +796,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _protocolHandler;
}
+ public boolean started()
+ {
+ return _started;
+ }
+
public void bytesSent(long writtenBytes)
{
if (_connectionListener != null)
@@ -1031,4 +1017,5 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
AMQConnectionFactory.class.getName(),
null); // factory location
}
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
index b2d2d2bec3..17af3702a4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
@@ -62,7 +62,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
String clientName, String virtualHost) throws URLSyntaxException
{
this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" + clientName +
+ username + ":" + password + "@" + clientName + "/" +
virtualHost + "?brokerlist='" + broker + "'"));
}
@@ -334,7 +334,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
if (addr != null)
{
- return new AMQQueue(new AMQBindingURL((String) addr.getContent()).getQueueName());
+ return new AMQQueue(new AMQBindingURL((String) addr.getContent()));
}
}
@@ -344,7 +344,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
if (addr != null)
{
- return new AMQTopic(new AMQBindingURL((String) addr.getContent()).getDestinationName());
+ return new AMQTopic(new AMQBindingURL((String) addr.getContent()));
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b6429972df..35530b39c9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -93,8 +93,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final FlowControllingBlockingQueue _queue;
- private final java.util.Queue<MessageConsumerPair> _reprocessQueue;
-
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
@@ -136,20 +134,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private long _nextProducerId;
- /**
- * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
- */
- private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
-
- /**
- * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
- */
- private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false);
-
- /**
- * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
- */
- private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false);
/**
* Set when recover is called. This is to handle the case where recover() is called by application code
@@ -157,14 +141,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private boolean _inRecovery;
+ private boolean _connectionStopped;
+
private boolean _hasMessageListeners;
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
-
+
private class Dispatcher extends Thread
{
+
+ /**
+ * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
+ */
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private final Object _lock = new Object();
+
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
@@ -173,12 +167,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void run()
{
UnprocessedMessage message;
- _stopped.set(false);
+
try
{
- while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null)
+ while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null)
{
- dispatchMessage(message);
+ synchronized (_lock)
+ {
+
+ while (connectionStopped())
+ {
+ _lock.wait();
+ }
+
+ dispatchMessage(message);
+
+ while (connectionStopped())
+ {
+ _lock.wait();
+ }
+
+ }
+
}
}
catch (InterruptedException e)
@@ -189,6 +199,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_logger.info("Dispatcher thread terminating for channel " + _channelId);
}
+ // only call while holding lock
+ final boolean connectionStopped()
+ {
+ return _connectionStopped;
+ }
+
+ void setConnectionStopped(boolean connectionStopped)
+ {
+ synchronized (_lock)
+ {
+ _connectionStopped = connectionStopped;
+ _lock.notify();
+ }
+ }
+
private void dispatchMessage(UnprocessedMessage message)
{
if (message.getDeliverBody() != null)
@@ -246,15 +271,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- public void stopDispatcher()
+ public void close()
{
- _stopped.set(true);
+ _closed.set(true);
interrupt();
+
+ //fixme awaitTermination
+
}
}
-
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry)
{
@@ -285,8 +312,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_defaultPrefetchHighMark = defaultPrefetchHighMark;
_defaultPrefetchLowMark = defaultPrefetchLowMark;
- _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>();
-
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
@@ -446,7 +471,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
-
+
public void rollback() throws JMSException
{
checkTransacted();
@@ -654,7 +679,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_dispatcher != null)
{
- _dispatcher.stopDispatcher();
+ _dispatcher.close();
+ _dispatcher = null;
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
@@ -680,7 +706,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_dispatcher != null)
{
- _dispatcher.stopDispatcher();
+ _dispatcher.close();
+ _dispatcher = null;
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
@@ -712,8 +739,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// TODO: Be aware of possible changes to parameter order as versions change.
getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- false)); // requeue
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ false)); // requeue
}
boolean isInRecovery()
@@ -743,37 +770,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageListener getMessageListener() throws JMSException
{
- checkNotClosed();
+// checkNotClosed();
return _messageListener;
}
public void setMessageListener(MessageListener listener) throws JMSException
{
- checkNotClosed();
-
- if (!isStopped())
- {
- throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
- }
-
- // We are stopped
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- {
- BasicMessageConsumer consumer = i.next();
-
- if (consumer.isReceiving())
- {
- throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
- }
- }
-
- _messageListener = listener;
-
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- {
- i.next().setMessageListener(_messageListener);
- }
-
+// checkNotClosed();
+//
+// if (_dispatcher != null && !_dispatcher.connectionStopped())
+// {
+// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
+// }
+//
+// // We are stopped
+// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+// {
+// BasicMessageConsumer consumer = i.next();
+//
+// if (consumer.isReceiving())
+// {
+// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+// }
+// }
+//
+// _messageListener = listener;
+//
+// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+// {
+// i.next().setMessageListener(_messageListener);
+// }
}
@@ -1582,13 +1608,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
void start()
{
+ //fixme This should be controlled by _stopped as it pairs with the stop method
+ //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled.
+ //will result in sending Flow messages for each subsequent call to flow.. only need to do this
+ // if we have called stop.
if (_startedAtLeastOnce.getAndSet(true))
{
//then we stopped this and are restarting, so signal server to resume delivery
unsuspendChannel();
}
- if(hasMessageListeners() && _dispatcher == null)
+ if (hasMessageListeners())
{
startDistpatcherIfNecessary();
}
@@ -1606,26 +1636,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized void startDistpatcherIfNecessary()
{
- if(_dispatcher == null)
+ if (_dispatcher == null)
{
_dispatcher = new Dispatcher();
_dispatcher.setDaemon(true);
_dispatcher.start();
}
+ else
+ {
+ _dispatcher.setConnectionStopped(false);
+ }
}
void stop()
{
//stop the server delivering messages to this session
suspendChannel();
-
- //stop the dispatcher thread
- _stopped.set(true);
- }
-
- boolean isStopped()
- {
- return _stopped.get();
+
+ if (_dispatcher != null)
+ {
+ _dispatcher.setConnectionStopped(true);
+ }
}
/**
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index f50b0390c5..c82187b2e7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -28,12 +28,12 @@ import javax.jms.JMSException;
import javax.jms.Topic;
public class AMQTopic extends AMQDestination implements Topic
- {
+{
/**
- * Constructor for use in creating a topic using a BindingURL.
+ * Constructor for use in creating a topic using a BindingURL.
*
* @param binding The binding url object.
- */
+ */
public AMQTopic(BindingURL binding)
{
super(binding);
@@ -78,7 +78,7 @@ public class AMQTopic extends AMQDestination implements Topic
return super.getDestinationName().toString();
}
- public AMQShortString getRoutingKey()
+ public AMQShortString getRoutingKey()
{
return getDestinationName();
}
@@ -93,7 +93,7 @@ public class AMQTopic extends AMQDestination implements Topic
* Override since the queue is always private and we must ensure it remains null. If not,
* reuse of the topic when registering consumers will make all consumers listen on the same (private) queue rather
* than getting their own (private) queue.
- *
+ * <p/>
* This is relatively nasty but it is difficult to come up with a more elegant solution, given
* the requirement in the case on AMQQueue and possibly other AMQDestination subclasses to
* use the underlying queue name even where it is server generated.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 4ffec6fb41..832c312634 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -212,16 +212,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
//i.e. it is only valid to call this method if
//
- // (a) the session is stopped, in which case the dispatcher is not running
+ // (a) the connection is stopped, in which case the dispatcher is not running
// OR
// (b) the listener is null AND we are not receiving synchronously at present
//
- if (_session.isStopped())
+ if (!_session.getAMQConnection().started())
{
_messageListener.set(messageListener);
_session.setHasMessageListeners();
- _session.startDistpatcherIfNecessary();
if (_logger.isDebugEnabled())
{
@@ -248,7 +247,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
synchronized (_session)
{
-
_messageListener.set(messageListener);
_session.setHasMessageListeners();
_session.startDistpatcherIfNecessary();
@@ -329,12 +327,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receive(long l) throws JMSException
{
- _session.startDistpatcherIfNecessary();
checkPreConditions();
acquireReceiving();
+ _session.startDistpatcherIfNecessary();
+
try
{
if (closeOnAutoClose())
@@ -385,12 +384,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receiveNoWait() throws JMSException
{
- _session.startDistpatcherIfNecessary();
-
checkPreConditions();
acquireReceiving();
+ _session.startDistpatcherIfNecessary();
+
try
{
if (closeOnAutoClose())
@@ -560,7 +559,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
else
{
- //This shouldn't be possible.
_synchronousQueue.put(jmsMessage);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 4426a7deee..f56fc40360 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -388,9 +388,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
else
{
- //TODO; Do we really want to create an empty message here ?
- newMessage = (AbstractJMSMessage) _session.createMessage();
- return new MessageConverter(newMessage).getConvertedMessage();
+ newMessage = new MessageConverter(message).getConvertedMessage();
}
if (newMessage != null)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
index 3086e5b90a..a1e2640f21 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
@@ -269,7 +269,7 @@ public final class JMSHeaderAdapter
s = String.valueOf(o);
}
}
- }
+ }//else return s // null;
}
return s;
@@ -508,16 +508,16 @@ public final class JMSHeaderAdapter
// JMS invalid names
if ((propertyName.equals("NULL")
- || propertyName.equals("TRUE")
- || propertyName.equals("FALSE")
- || propertyName.equals("NOT")
- || propertyName.equals("AND")
- || propertyName.equals("OR")
- || propertyName.equals("BETWEEN")
- || propertyName.equals("LIKE")
- || propertyName.equals("IN")
- || propertyName.equals("IS")
- || propertyName.equals("ESCAPE")))
+ || propertyName.equals("TRUE")
+ || propertyName.equals("FALSE")
+ || propertyName.equals("NOT")
+ || propertyName.equals("AND")
+ || propertyName.equals("OR")
+ || propertyName.equals("BETWEEN")
+ || propertyName.equals("LIKE")
+ || propertyName.equals("IN")
+ || propertyName.equals("IS")
+ || propertyName.equals("ESCAPE")))
{
throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
index 58089f595b..6bb9b9912b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
@@ -25,7 +25,8 @@ import org.apache.log4j.Logger;
import javax.jms.*;
import java.util.Enumeration;
-public class MessageConverter {
+public class MessageConverter
+{
/**
* Log4J logger
@@ -118,6 +119,16 @@ public class MessageConverter {
setMessageProperties(message);
}
+ public MessageConverter(Message message) throws JMSException
+ {
+ //Send a message with just properties.
+ // Throwing away content
+ BytesMessage nativeMessage = new JMSBytesMessage();
+
+ _newMessage = (AbstractJMSMessage) nativeMessage;
+ setMessageProperties(message);
+ }
+
public AbstractJMSMessage getConvertedMessage()
{
return _newMessage;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 7f42faccb8..ca9a1b88f3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -225,6 +225,7 @@ public class AMQStateManager implements AMQMethodListener
}
if(_currentState != s)
{
+ _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
}
}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
index cdb00240b6..5ab5722146 100644
--- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
@@ -123,7 +123,7 @@ public class PropertiesFileInitialContextFactoryTest extends TestCase
try
{
AMQQueue queue = (AMQQueue) ctx.lookup("MyQueue");
- assertEquals("example.MyQueue", queue.getRoutingKey());
+ assertEquals("example.MyQueue", queue.getRoutingKey().toString());
}
catch (NamingException ne)
{
@@ -133,7 +133,7 @@ public class PropertiesFileInitialContextFactoryTest extends TestCase
try
{
AMQTopic topic = (AMQTopic) ctx.lookup("ibmStocks");
- assertEquals("stocks.nyse.ibm", topic.getTopicName());
+ assertEquals("stocks.nyse.ibm", topic.getTopicName().toString());
}
catch (Exception ne)
{
@@ -143,7 +143,7 @@ public class PropertiesFileInitialContextFactoryTest extends TestCase
try
{
AMQQueue direct = (AMQQueue) ctx.lookup("direct");
- assertEquals("directQueue", direct.getRoutingKey());
+ assertEquals("directQueue", direct.getRoutingKey().toString());
}
catch (NamingException ne)
{
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java
deleted file mode 100644
index db871281bf..0000000000
--- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.jndi.referenceabletest;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.url.URLSyntaxException;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NameAlreadyBoundException;
-import javax.naming.NamingException;
-import javax.naming.NoInitialContextException;
-import java.io.File;
-import java.util.Hashtable;
-
-import junit.framework.TestCase;
-
-/**
- * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
- * This can be downloaded from sun here:
- * http://java.sun.com/products/jndi/downloads/index.html
- * Click : Download JNDI 1.2.1 & More button
- * Download: File System Service Provider, 1.2 Beta 3
- * and add the two jars in the lib dir to your class path.
- * <p/>
- * Also you need to create the directory /temp/qpid-jndi-test
- */
-class Bind extends TestCase
-{
- public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest" + System.currentTimeMillis();
- public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
- public String PROVIDER_URL = DEFAULT_PROVIDER_URL;
-
- String _connectionFactoryString = "";
-
- String _connectionString = "amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'";
- Topic _topic = null;
-
- boolean _bound = false;
-
- public Bind() throws NameAlreadyBoundException, NoInitialContextException
- {
- this(false, DEFAULT_PROVIDER_URL);
- }
-
- public Bind(boolean output) throws NameAlreadyBoundException, NoInitialContextException
- {
- this(output, DEFAULT_PROVIDER_URL);
- }
-
- public Bind(boolean output, String providerURL) throws NameAlreadyBoundException, NoInitialContextException
- {
- PROVIDER_URL = providerURL;
-
- // Set up the environment for creating the initial context
- Hashtable env = new Hashtable(11);
- env.put(Context.INITIAL_CONTEXT_FACTORY,
- "com.sun.jndi.fscontext.RefFSContextFactory");
-
-
- env.put(Context.PROVIDER_URL, PROVIDER_URL);
-
-
- File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3));
-
- if (file.exists() && !file.isDirectory())
- {
- System.out.println("Couldn't make directory file already exists");
- return;
- }
- else
- {
- if (!file.exists())
- {
- if (!file.mkdirs())
- {
- System.out.println("Couldn't make directory");
- return;
- }
- }
- }
-
- Connection connection = null;
- try
- {
- // Create the initial context
- Context ctx = new InitialContext(env);
-
- // Create the connection factory to be bound
- ConnectionFactory connectionFactory = null;
- // Create the Connection to be bound
-
-
- try
- {
- connectionFactory = new AMQConnectionFactory(_connectionString);
- connection = connectionFactory.createConnection();
-
- _connectionFactoryString = ((AMQConnectionFactory) connectionFactory).getConnectionURL().getURL();
- }
- catch (JMSException jmsqe)
- {
- fail("Unable to create Connection:" + jmsqe);
- }
- catch (URLSyntaxException urlse)
- {
- fail("Unable to create Connection:" + urlse);
- }
-
- try
- {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- _topic = session.createTopic("Fruity");
- }
- catch (JMSException jmse)
- {
-
- }
- // Perform the binds
- ctx.bind("ConnectionFactory", connectionFactory);
- if (output)
- {
- System.out.println("Bound factory\n" + ((AMQConnectionFactory) connectionFactory).getConnectionURL());
- }
- ctx.bind("Connection", connection);
- if (output)
- {
- System.out.println("Bound Connection\n" + ((AMQConnection) connection).toURL());
- }
- ctx.bind("Topic", _topic);
- if (output)
- {
- System.out.println("Bound Topic:\n" + ((AMQTopic) _topic).toURL());
- }
- _bound = true;
-
- // Check that it is bound
- //Object obj = ctx.lookup("Connection");
- //System.out.println(((AMQConnection)obj).toURL());
-
- // Close the context when we're done
- ctx.close();
- }
- catch (NamingException e)
- {
- System.out.println("Operation failed: " + e);
- if (e instanceof NameAlreadyBoundException)
- {
- throw(NameAlreadyBoundException) e;
- }
-
- if (e instanceof NoInitialContextException)
- {
- throw(NoInitialContextException) e;
- }
- }
- finally
- {
- try
- {
- if (connection != null)
- {
- connection.close();
- }
- }
- catch (JMSException e)
- {
- //ignore just want it closed
- }
- }
- }
-
- public String connectionFactoryValue()
- {
- if (_connectionFactoryString != null)
- {
- return _connectionFactoryString;
- }
- else
- {
- return "";
- }
- }
-
- public String connectionValue()
- {
- if (_connectionString != null)
- {
- return _connectionString;
- }
- else
- {
- return "";
- }
- }
-
- public String topicValue()
- {
- if (_topic != null)
- {
- return ((AMQTopic) _topic).toURL();
- }
- else
- {
- return "";
- }
-
- }
-
- public boolean bound()
- {
- return _bound;
- }
-
- public String getProviderURL()
- {
- return PROVIDER_URL;
- }
-
- public static void main(String[] args) throws NameAlreadyBoundException, NoInitialContextException
- {
- new Bind(true);
- }
-}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java
deleted file mode 100644
index 9fc186f19a..0000000000
--- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.jndi.referenceabletest;
-
-import junit.framework.TestCase;
-//import org.apache.qpid.testutil.VMBrokerSetup;
-
-import javax.naming.NameAlreadyBoundException;
-import javax.naming.NoInitialContextException;
-
-/**
- * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
- * This can be downloaded from sun here:
- * http://java.sun.com/products/jndi/downloads/index.html
- * Click : Download JNDI 1.2.1 & More button
- * Download: File System Service Provider, 1.2 Beta 3
- * and add the two jars in the lib dir to your class path.
- * <p/>
- * Also you need to create the directory /temp/qpid-jndi-test
- */
-public class JNDIReferenceableTest extends TestCase
-{
-/* // FIXME FSContext has been removed from repository. This needs redone with the PropertiesFileInitialContextFactory. QPID-84
- public void testReferenceable()
- {
- Bind b = null;
- try
- {
- try
- {
- b = new Bind();
- }
- catch (NameAlreadyBoundException e)
- {
- if (new Unbind().unbound())
- {
- try
- {
- b = new Bind();
- }
- catch (NameAlreadyBoundException ee)
- {
- fail("Unable to clear bound objects for test.");
- }
- }
- else
- {
- fail("Unable to clear bound objects for test.");
- }
- }
- }
- catch (NoInitialContextException e)
- {
- fail("You don't have the File System SPI on you class path.\n" +
- "This can be downloaded from sun here:\n" +
- "http://java.sun.com/products/jndi/downloads/index.html\n" +
- "Click : Download JNDI 1.2.1 & More button\n" +
- "Download: File System Service Provider, 1.2 Beta 3\n" +
- "and add the two jars in the lib dir to your class path.");
- }
-
- assertTrue(b.bound());
-
- Lookup l = new Lookup(b.getProviderURL());
-
- assertTrue(l.connectionFactoryValue().equals(b.connectionFactoryValue()));
-
- assertTrue(l.connectionValue().equals(b.connectionValue()));
-
- assertTrue(l.topicValue().equals(b.topicValue()));
-
-
- Unbind u = new Unbind();
-
- assertTrue(u.unbound());
-
- }
-
- public static junit.framework.Test suite()
- {
- return new VMBrokerSetup(new junit.framework.TestSuite(JNDIReferenceableTest.class));
- }
- */
-}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java
deleted file mode 100644
index b804ccb30c..0000000000
--- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.jndi.referenceabletest;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQTopic;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.jms.JMSException;
-import java.io.File;
-import java.util.Hashtable;
-
-
-/**
- * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
- * This can be downloaded from sun here:
- * http://java.sun.com/products/jndi/downloads/index.html
- * Click : Download JNDI 1.2.1 & More button
- * Download: File System Service Provider, 1.2 Beta 3
- * and add the two jars in the lib dir to your class path.
- * <p/>
- * Also you need to create the directory /temp/qpid-jndi-test
- */
-class Lookup
-{
- public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest";
- public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
- public String PROVIDER_URL = DEFAULT_PROVIDER_URL;
-
- AMQTopic _topic = null;
- AMQConnection _connection = null;
- AMQConnectionFactory _connectionFactory = null;
- private String _connectionURL;
-
-
- public Lookup()
- {
- this(DEFAULT_PROVIDER_URL);
- }
-
- public Lookup(String providerURL)
- {
-
- PROVIDER_URL = providerURL;
-
- // Set up the environment for creating the initial context
- Hashtable env = new Hashtable(11);
- env.put(Context.INITIAL_CONTEXT_FACTORY,
- "com.sun.jndi.fscontext.RefFSContextFactory");
-
- env.put(Context.PROVIDER_URL, PROVIDER_URL);
-
- File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3));
-
- if (file.exists() && !file.isDirectory())
- {
- System.out.println("Couldn't make directory file already exists");
- return;
- }
- else
- {
- if (!file.exists())
- {
- if (!file.mkdirs())
- {
- System.out.println("Couldn't make directory");
- return;
- }
- }
- }
-
- try
- {
- // Create the initial context
- Context ctx = new InitialContext(env);
-
- _topic = (AMQTopic) ctx.lookup("Topic");
-
- _connection = (AMQConnection) ctx.lookup("Connection");
-
- _connectionURL = _connection.toURL();
-
- _connectionFactory = (AMQConnectionFactory) ctx.lookup("ConnectionFactory");
- //System.out.println(topic);
-
- // Close the context when we're done
- ctx.close();
- }
- catch (NamingException e)
- {
- System.out.println("Operation failed: " + e);
- }
- finally
- {
- try
- {
- if (_connection != null)
- {
- _connection.close();
- }
- }
- catch (JMSException e)
- {
- //ignore just need to close
- }
- }
- }
-
- public String connectionFactoryValue()
- {
- if (_connectionFactory != null)
- {
- return _connectionFactory.getConnectionURL().toString();
- }
- return "";
- }
-
- public String connectionValue()
- {
- if (_connectionURL != null)
- {
- return _connectionURL;
- }
- return "";
- }
-
- public String topicValue()
- {
- if (_topic != null)
- {
- return _topic.toURL();
- }
- return "";
- }
-
- public String getProviderURL()
- {
- return PROVIDER_URL;
- }
-
- public static void main(String[] args)
- {
- new Lookup();
- }
-}
-
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java
deleted file mode 100644
index 869bc55d8f..0000000000
--- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.jndi.referenceabletest;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
-import javax.naming.NamingException;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import java.io.File;
-import java.util.Hashtable;
-
-/**
- * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
- * This can be downloaded from sun here:
- * http://java.sun.com/products/jndi/downloads/index.html
- * Click : Download JNDI 1.2.1 & More button
- * Download: File System Service Provider, 1.2 Beta 3
- * and add the two jars in the lib dir to your class path.
- * <p/>
- * Also you need to create the directory /temp/qpid-jndi-test
- */
-class Unbind
-{
- public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest" + System.currentTimeMillis();
- public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
- public String PROVIDER_URL = DEFAULT_PROVIDER_URL;
-
- boolean _unbound = false;
-
- public Unbind()
- {
- this(false, DEFAULT_PROVIDER_URL);
- }
-
- public Unbind(Boolean output)
- {
- this(output, DEFAULT_PROVIDER_URL);
- }
-
- public Unbind(String provider)
- {
- this(false, provider);
- }
-
- public Unbind(boolean output, String providerURL)
- {
- PROVIDER_URL = providerURL;
- // Set up the environment for creating the initial context
- Hashtable env = new Hashtable(11);
- env.put(Context.INITIAL_CONTEXT_FACTORY,
- "com.sun.jndi.fscontext.RefFSContextFactory");
- env.put(Context.PROVIDER_URL, PROVIDER_URL);
-
- File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3));
-
- if (file.exists() && !file.isDirectory())
- {
- System.out.println("Couldn't make directory file already exists");
- return;
- }
- else
- {
- if (!file.exists())
- {
- if (!file.mkdirs())
- {
- System.out.println("Couldn't make directory");
- return;
- }
- }
- }
-
- try
- {
- // Create the initial context
- Context ctx = new InitialContext(env);
-
- // Remove the binding
- ctx.unbind("ConnectionFactory");
- ctx.unbind("Connection");
- ctx.unbind("Topic");
-
- // Check that it is gone
- Object obj = null;
- try
- {
- obj = ctx.lookup("ConnectionFactory");
- }
- catch (NameNotFoundException ne)
- {
- if (output)
- {
- System.out.println("unbind ConnectionFactory successful");
- }
- try
- {
- obj = ctx.lookup("Connection");
- try
- {
- ((Connection) obj).close();
- }
- catch (JMSException e)
- {
- //ignore just need to close
- }
- }
- catch (NameNotFoundException ne2)
- {
- if (output)
- {
- System.out.println("unbind Connection successful");
- }
-
- try
- {
- obj = ctx.lookup("Topic");
- }
- catch (NameNotFoundException ne3)
- {
- if (output)
- {
- System.out.println("unbind Topic successful");
- }
- _unbound = true;
- }
- }
- }
-
- //System.out.println("unbind failed; object still there: " + obj);
-
- // Close the context when we're done
-
- ctx.close();
-
- }
- catch (NamingException e)
- {
- System.out.println("Operation failed: " + e);
- }
- }
-
- public boolean unbound()
- {
- return _unbound;
- }
-
- public static void main(String[] args)
- {
-
- new Unbind(true);
- }
-}
-
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
new file mode 100644
index 0000000000..165059946c
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class DispatcherTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(DispatcherTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 6;
+ private int _receivedCount = 0;
+ private int _receivedCountWhileStopped = 0;
+ private Connection _clientConnection, _producerConnection;
+ private MessageConsumer _consumer;
+ MessageProducer _producer;
+ Session _clientSession, _producerSession;
+
+ private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1); //all messages Sent Lock
+ private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1); //all messages Sent Lock
+
+ private volatile boolean _connectionStopped = false;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
+ env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client 1
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer = _clientSession.createConsumer(queue);
+
+ //Create Producer
+ _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _producerConnection.start();
+
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _producer = _producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(_producerSession.createTextMessage("Message " + msg));
+ }
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+
+ _clientConnection.close();
+
+ _producerConnection.close();
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ public void testAsynchronousRecieve()
+ {
+
+ _logger.info("Test Start");
+
+
+ assertTrue(!((AMQConnection) _clientConnection).started());
+
+ //Set default Message Listener
+ try
+ {
+ _consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message);
+
+ _receivedCount++;
+
+ if (_receivedCount == MSG_COUNT)
+ {
+ _allFirstMessagesSent.countDown();
+ }
+
+ if (_connectionStopped)
+ {
+ _logger.info("Running with Message:" + _receivedCount);
+ }
+
+ if (_connectionStopped && _allFirstMessagesSent.getCount() == 0)
+ {
+ _receivedCountWhileStopped++;
+ }
+
+ if (_allFirstMessagesSent.getCount() == 0)
+ {
+ if (_receivedCount == MSG_COUNT * 2)
+ {
+ _allSecondMessagesSent.countDown();
+ }
+ }
+ }
+ });
+
+ assertTrue("Connecion should not be started", !((AMQConnection) _clientConnection).started());
+ _clientConnection.start();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Default ML on consumer1");
+ }
+
+
+ try
+ {
+ _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+
+ try
+ {
+ assertTrue("Connecion should be started", ((AMQConnection) _clientConnection).started());
+ _clientConnection.stop();
+ _connectionStopped = true;
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error stopping connection");
+ }
+
+
+ try
+ {
+ _logger.error("Send additional messages");
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(_producerSession.createTextMessage("Message " + msg));
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Unable to send additional messages", e);
+ }
+
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+
+ try
+ {
+ _logger.info("Restarting connection");
+
+ _connectionStopped = false;
+ _clientConnection.start();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Better ML on consumer1", e);
+ }
+
+
+ _logger.info("Waiting upto 2 seconds for messages");
+
+ try
+ {
+ _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+
+ assertEquals("Messages not received correctly", 0, _allFirstMessagesSent.getCount());
+ assertEquals("Messages not received correctly", 0, _allSecondMessagesSent.getCount());
+ assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount);
+ assertEquals("Messages received while stopped is not 0", 0, _receivedCountWhileStopped);
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(DispatcherTest.class);
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
new file mode 100644
index 0000000000..28bb2b614b
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class ResetMessageListenerTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(ResetMessageListenerTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 6;
+ private int receivedCount1ML1 = 0;
+ private int receivedCount1ML2 = 0;
+ private int receivedCount2 = 0;
+ private Connection _clientConnection, _producerConnection;
+ private MessageConsumer _consumer1;
+ private MessageConsumer _consumer2;
+ MessageProducer _producer;
+ Session _clientSession, _producerSession;
+
+ private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(2); //all messages Sent Lock
+ private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(2); //all messages Sent Lock
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
+ env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client 1
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer1 = _clientSession.createConsumer(queue);
+
+ //Create Client 2 on same session
+ _consumer2 = _clientSession.createConsumer(queue);
+
+ //Create Producer
+ _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _producerConnection.start();
+
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _producer = _producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(_producerSession.createTextMessage("Message " + msg));
+ }
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ assertEquals("First batch of messages not received correctly", 0, _allFirstMessagesSent.getCount());
+ assertEquals("Second batch of messages not received correctly", 0, _allSecondMessagesSent.getCount());
+ assertEquals("Client 1 ML1 didn't get all messages", MSG_COUNT / 2, receivedCount1ML1);
+ assertEquals("Client 2 didn't get all messages", MSG_COUNT, receivedCount2);
+ assertEquals("Client 1 ML2 didn't get all messages", MSG_COUNT / 2, receivedCount1ML2);
+
+ _clientConnection.close();
+
+ _producerConnection.close();
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ public void testAsynchronousRecieve()
+ {
+
+ _logger.info("Test Start");
+
+ //Set default Message Listener
+ try
+ {
+ _consumer1.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 1 ML 1 Received Message(" + receivedCount1ML1 + "):" + message);
+
+ receivedCount1ML1++;
+ if (receivedCount1ML1 == MSG_COUNT / 2)
+ {
+ _allFirstMessagesSent.countDown();
+ }
+ }
+ });
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Default ML on consumer1");
+ }
+
+
+ try
+ {
+ _consumer2.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message);
+
+ receivedCount2++;
+ if (receivedCount2 == MSG_COUNT / 2)
+ {
+ _logger.info("Client 2 received all its messages1");
+ _allFirstMessagesSent.countDown();
+ }
+
+ if (receivedCount2 == MSG_COUNT)
+ {
+ _logger.info("Client 2 received all its messages2");
+ _allSecondMessagesSent.countDown();
+ }
+ }
+ });
+
+ _clientConnection.start();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Default ML on consumer2");
+
+ }
+
+
+ try
+ {
+ _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+ _logger.info("Received first batch of messages");
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+
+ try
+ {
+ _clientConnection.stop();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error stopping connection");
+ }
+
+ _logger.info("Reset Message Listener to better listener while connection stopped, will restart session");
+ try
+ {
+ _consumer1.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 1 ML2 Received Message(" + receivedCount1ML1 + "):" + message);
+
+ receivedCount1ML2++;
+ if (receivedCount1ML2 == MSG_COUNT / 2)
+ {
+ _allSecondMessagesSent.countDown();
+ }
+ }
+ });
+
+ _clientConnection.start();
+ }
+ catch (javax.jms.IllegalStateException e)
+ {
+ _logger.error("Connection not stopped while setting ML", e);
+ fail("Unable to change message listener:" + e.getCause());
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Better ML on consumer1", e);
+ }
+
+ try
+ {
+ _logger.error("Send additional messages");
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(_producerSession.createTextMessage("Message " + msg));
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Unable to send additional messages", e);
+ }
+
+ _logger.info("Waiting upto 2 seconds for messages");
+
+ try
+ {
+ _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ResetMessageListenerTest.class);
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java b/qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java
index f7bea1b36a..c3434164d8 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java
@@ -24,7 +24,7 @@ import javax.jms.*;
import java.util.Enumeration;
import java.io.Serializable;
-public class TestNonQpidTextMessage implements ObjectMessage {
+public class NonQpidObjectMessage implements ObjectMessage {
private JMSObjectMessage _realMessage;
private String _contentString;
@@ -34,7 +34,7 @@ public class TestNonQpidTextMessage implements ObjectMessage {
* does not inherit from the Qpid message superclasses
* and expand our unit testing of MessageConverter et al
*/
- public TestNonQpidTextMessage()
+ public NonQpidObjectMessage()
{
_realMessage = new JMSObjectMessage();
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
new file mode 100644
index 0000000000..3287314a44
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
@@ -0,0 +1,165 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.message.AMQMessage;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.AMQConnectionFailureException;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class ConnectionStartTest extends TestCase
+{
+
+ String _broker = "vm://:1";
+
+ AMQConnection _connection;
+ private Session _consumerSess;
+ private MessageConsumer _consumer;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ try
+ {
+ AMQQueue queue = new AMQQueue("ConnectionStartTest");
+
+ AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test");
+
+ Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+
+ MessageProducer pub = pubSess.createProducer(queue);
+
+ pub.send(pubSess.createTextMessage("Initial Message"));
+
+ _connection = new AMQConnection(_broker, "guest", "guest", "fred", "test");
+
+ _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+
+ _consumer = _consumerSess.createConsumer(queue);
+
+ pubCon.close();
+
+ }
+ catch (Exception e)
+ {
+ fail("Connection to " + _broker + " should succeed. Reason: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ TransportConnection.killVMBroker(1);
+ }
+
+ public void testSimpleReceiveConnection()
+ {
+ try
+ {
+ assertTrue("Connection should not be started", !_connection.started());
+ //Note that this next line will start the dispatcher in the session
+ // should really not be called before _connection start
+ assertTrue("There should not be messages waiting for the consumer", _consumer.receiveNoWait() == null);
+ _connection.start();
+ assertTrue("There should be messages waiting for the consumer", _consumer.receiveNoWait() == null);
+ assertTrue("Connection should be started", _connection.started());
+
+ }
+ catch (JMSException e)
+ {
+ fail("An error occured during test because:" + e);
+ }
+
+ }
+
+ public void testMessageListenerConnection()
+ {
+ final CountDownLatch _gotMessage = new CountDownLatch(1);
+
+ try
+ {
+ assertTrue("Connection should not be started", !_connection.started());
+ _consumerSess.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ assertTrue("Connection should be started", _connection.started());
+ assertEquals("Mesage Received", "Initial Message", ((TextMessage) message).getText());
+ _gotMessage.countDown();
+ }
+ catch (JMSException e)
+ {
+ fail("Couldn't get message text because:" + e.getCause());
+ }
+ }
+ });
+
+ assertTrue("Connection should not be started", !_connection.started());
+ _connection.start();
+ assertTrue("Connection should be started", _connection.started());
+
+ try
+ {
+ _gotMessage.await(1000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ fail("Timed out awaiting message via onMessage");
+ }
+
+ }
+ catch (JMSException e)
+ {
+ fail("Failed because:" + e.getCause());
+ }
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ConnectionStartTest.class);
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
index c09d2504eb..2d61ceb00f 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
@@ -26,7 +26,7 @@ import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.message.TestNonQpidTextMessage;
+import org.apache.qpid.client.message.NonQpidObjectMessage;
import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
@@ -71,7 +71,7 @@ public class JMSPropertiesTest extends TestCase
MessageProducer producer = producerSession.createProducer(queue);
//create a test message to send
- ObjectMessage sentMsg = new TestNonQpidTextMessage();
+ ObjectMessage sentMsg = new NonQpidObjectMessage();
sentMsg.setJMSCorrelationID(JMS_CORR_ID);
sentMsg.setJMSDeliveryMode(JMS_DELIV_MODE);
sentMsg.setJMSType(JMS_TYPE);
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
index 6a335b8627..a8a5c7d8b2 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
@@ -31,10 +31,12 @@ import javax.jms.Message;
import javax.jms.Destination;
import javax.jms.TextMessage;
import javax.jms.MapMessage;
+import javax.jms.JMSException;
import java.util.HashMap;
-public class MessageConverterTest extends TestCase {
+public class MessageConverterTest extends TestCase
+{
public static final String JMS_CORR_ID = "QPIDID_01";
public static final int JMS_DELIV_MODE = 1;
@@ -50,53 +52,79 @@ public class MessageConverterTest extends TestCase {
super.setUp();
testTextMessage = new JMSTextMessage();
- //Add JMSProperties
- testTextMessage.setJMSCorrelationID(JMS_CORR_ID);
- testTextMessage.setJMSDeliveryMode(JMS_DELIV_MODE);
- testTextMessage.setJMSType(JMS_TYPE);
- testTextMessage.setJMSReplyTo(JMS_REPLY_TO);
+ //Set Message Text
testTextMessage.setText("testTextMessage text");
-
- //Add non-JMS properties
- testTextMessage.setStringProperty("testProp1","testValue1");
- testTextMessage.setDoubleProperty("testProp2",Double.MIN_VALUE);
+ setMessageProperties(testTextMessage);
testMapMessage = new JMSMapMessage();
- testMapMessage.setString("testMapString","testMapStringValue");
- testMapMessage.setDouble("testMapDouble",Double.MAX_VALUE);
+ testMapMessage.setString("testMapString", "testMapStringValue");
+ testMapMessage.setDouble("testMapDouble", Double.MAX_VALUE);
}
public void testSetProperties() throws Exception
{
- AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage();
-
- //check JMS prop values on newMessage match
- assertEquals("JMS Correlation ID mismatch",testTextMessage.getJMSCorrelationID(),newMessage.getJMSCorrelationID());
- assertEquals("JMS Delivery mode mismatch",testTextMessage.getJMSDeliveryMode(),newMessage.getJMSDeliveryMode());
- assertEquals("JMS Type mismatch",testTextMessage.getJMSType(),newMessage.getJMSType());
- assertEquals("JMS Reply To mismatch",testTextMessage.getJMSReplyTo(),newMessage.getJMSReplyTo());
-
- //check non-JMS standard props ok too
- assertEquals("Test String prop value mismatch",testTextMessage.getStringProperty("testProp1"),
- newMessage.getStringProperty("testProp1"));
- assertEquals("Test Double prop value mismatch",testTextMessage.getDoubleProperty("testProp2"),
- newMessage.getDoubleProperty("testProp2"));
+ AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage();
+ mesagePropertiesTest(testTextMessage, newMessage);
}
public void testJMSTextMessageConversion() throws Exception
{
- AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage();
- assertEquals("Converted message text mismatch",((JMSTextMessage)newMessage).getText(),testTextMessage.getText());
+ AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage();
+ assertEquals("Converted message text mismatch", ((JMSTextMessage) newMessage).getText(), testTextMessage.getText());
}
public void testJMSMapMessageConversion() throws Exception
{
- AbstractJMSMessage newMessage = new MessageConverter((MapMessage)testMapMessage).getConvertedMessage();
- assertEquals("Converted map message String mismatch",((JMSMapMessage)newMessage).getString("testMapString"),
- testMapMessage.getString("testMapString"));
- assertEquals("Converted map message Double mismatch",((JMSMapMessage)newMessage).getDouble("testMapDouble"),
- testMapMessage.getDouble("testMapDouble"));
+ AbstractJMSMessage newMessage = new MessageConverter((MapMessage) testMapMessage).getConvertedMessage();
+ assertEquals("Converted map message String mismatch", ((JMSMapMessage) newMessage).getString("testMapString"),
+ testMapMessage.getString("testMapString"));
+ assertEquals("Converted map message Double mismatch", ((JMSMapMessage) newMessage).getDouble("testMapDouble"),
+ testMapMessage.getDouble("testMapDouble"));
+
+ }
+
+ public void testMessageConversion() throws Exception
+ {
+ Message newMessage = new NonQpidMessage();
+ setMessageProperties(newMessage);
+ mesagePropertiesTest(testTextMessage, newMessage);
+ }
+
+ private void setMessageProperties(Message message) throws JMSException
+ {
+ message.setJMSCorrelationID(JMS_CORR_ID);
+ message.setJMSDeliveryMode(JMS_DELIV_MODE);
+ message.setJMSType(JMS_TYPE);
+ message.setJMSReplyTo(JMS_REPLY_TO);
+ //Add non-JMS properties
+ message.setStringProperty("testProp1", "testValue1");
+ message.setDoubleProperty("testProp2", Double.MIN_VALUE);
+ }
+
+
+ private void mesagePropertiesTest(Message expectedMessage, Message actualMessage)
+ {
+ try
+ {
+ //check JMS prop values on newMessage match
+ assertEquals("JMS Correlation ID mismatch", expectedMessage.getJMSCorrelationID(), actualMessage.getJMSCorrelationID());
+ assertEquals("JMS Delivery mode mismatch", expectedMessage.getJMSDeliveryMode(), actualMessage.getJMSDeliveryMode());
+ assertEquals("JMS Type mismatch", expectedMessage.getJMSType(), actualMessage.getJMSType());
+ assertEquals("JMS Reply To mismatch", expectedMessage.getJMSReplyTo(), actualMessage.getJMSReplyTo());
+
+ //check non-JMS standard props ok too
+ assertEquals("Test String prop value mismatch", expectedMessage.getStringProperty("testProp1"),
+ actualMessage.getStringProperty("testProp1"));
+
+ assertEquals("Test Double prop value mismatch", expectedMessage.getDoubleProperty("testProp2"),
+ actualMessage.getDoubleProperty("testProp2"));
+ }
+ catch (JMSException e)
+ {
+ fail("An error occured testing the property values" + e.getCause());
+ e.printStackTrace();
+ }
}
protected void tearDown() throws Exception
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java
new file mode 100644
index 0000000000..e992290513
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.message;
+
+import javax.jms.Message;
+import javax.jms.JMSException;
+import javax.jms.Destination;
+import java.util.Enumeration;
+import java.util.Hashtable;
+
+public class NonQpidMessage implements Message
+{
+ private String _JMSMessageID;
+ private long _JMSTimestamp;
+ private byte[] _JMSCorrelationIDAsBytes;
+ private String _JMSCorrelationID;
+ private Destination _JMSReplyTo;
+ private Destination _JMSDestination;
+ private int _JMSDeliveryMode;
+ private boolean _JMSRedelivered;
+ private String _JMSType;
+ private long _JMSExpiration;
+ private int _JMSPriority;
+ private Hashtable _properties;
+
+ public NonQpidMessage()
+ {
+ _properties = new Hashtable();
+ _JMSPriority = javax.jms.Message.DEFAULT_PRIORITY;
+ _JMSDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
+ }
+
+ public String getJMSMessageID() throws JMSException
+ {
+ return _JMSMessageID;
+ }
+
+ public void setJMSMessageID(String string) throws JMSException
+ {
+ _JMSMessageID = string;
+ }
+
+ public long getJMSTimestamp() throws JMSException
+ {
+ return _JMSTimestamp;
+ }
+
+ public void setJMSTimestamp(long l) throws JMSException
+ {
+ _JMSTimestamp = l;
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+ {
+ return _JMSCorrelationIDAsBytes;
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
+ {
+ _JMSCorrelationIDAsBytes = bytes;
+ }
+
+ public void setJMSCorrelationID(String string) throws JMSException
+ {
+ _JMSCorrelationID = string;
+ }
+
+ public String getJMSCorrelationID() throws JMSException
+ {
+ return _JMSCorrelationID;
+ }
+
+ public Destination getJMSReplyTo() throws JMSException
+ {
+ return _JMSReplyTo;
+ }
+
+ public void setJMSReplyTo(Destination destination) throws JMSException
+ {
+ _JMSReplyTo = destination;
+ }
+
+ public Destination getJMSDestination() throws JMSException
+ {
+ return _JMSDestination;
+ }
+
+ public void setJMSDestination(Destination destination) throws JMSException
+ {
+ _JMSDestination = destination;
+ }
+
+ public int getJMSDeliveryMode() throws JMSException
+ {
+ return _JMSDeliveryMode;
+ }
+
+ public void setJMSDeliveryMode(int i) throws JMSException
+ {
+ _JMSDeliveryMode = i;
+ }
+
+ public boolean getJMSRedelivered() throws JMSException
+ {
+ return _JMSRedelivered;
+ }
+
+ public void setJMSRedelivered(boolean b) throws JMSException
+ {
+ _JMSRedelivered = b;
+ }
+
+ public String getJMSType() throws JMSException
+ {
+ return _JMSType;
+ }
+
+ public void setJMSType(String string) throws JMSException
+ {
+ _JMSType = string;
+ }
+
+ public long getJMSExpiration() throws JMSException
+ {
+ return _JMSExpiration;
+ }
+
+ public void setJMSExpiration(long l) throws JMSException
+ {
+ _JMSExpiration = l;
+ }
+
+ public int getJMSPriority() throws JMSException
+ {
+ return _JMSPriority;
+ }
+
+ public void setJMSPriority(int i) throws JMSException
+ {
+ _JMSPriority = i;
+ }
+
+ public void clearProperties() throws JMSException
+ {
+ _properties.clear();
+ }
+
+ public boolean propertyExists(String string) throws JMSException
+ {
+ return _properties.containsKey(string);
+ }
+
+ public boolean getBooleanProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Boolean)
+ {
+ return (Boolean) o;
+ }
+ else
+ {
+ return Boolean.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public byte getByteProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Byte)
+ {
+ return (Byte) o;
+ }
+ else
+ {
+ return Byte.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public short getShortProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Short)
+ {
+ return (Short) o;
+ }
+ else
+ {
+ return Short.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public int getIntProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Integer)
+ {
+ return (Integer) o;
+ }
+ else
+ {
+ return Integer.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public long getLongProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Long)
+ {
+ return (Long) o;
+ }
+ else
+ {
+ return Long.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public float getFloatProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Float)
+ {
+ return (Float) o;
+ }
+ else
+ {
+ return Float.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public double getDoubleProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Double)
+ {
+ return (Double) o;
+ }
+ else
+ {
+ return Double.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public String getStringProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof String)
+ {
+ return (String) o;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public Object getObjectProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Boolean)
+ {
+ return (Boolean) o;
+ }
+ else
+ {
+ return Boolean.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public Enumeration getPropertyNames() throws JMSException
+ {
+ return _properties.keys();
+ }
+
+ public void setBooleanProperty(String string, boolean b) throws JMSException
+ {
+ _properties.put(string, b);
+ }
+
+ public void setByteProperty(String string, byte b) throws JMSException
+ {
+ _properties.put(string, b);
+ }
+
+ public void setShortProperty(String string, short i) throws JMSException
+ {
+ _properties.put(string, i);
+ }
+
+ public void setIntProperty(String string, int i) throws JMSException
+ {
+ _properties.put(string, i);
+ }
+
+ public void setLongProperty(String string, long l) throws JMSException
+ {
+ _properties.put(string, l);
+ }
+
+ public void setFloatProperty(String string, float v) throws JMSException
+ {
+ _properties.put(string, v);
+ }
+
+ public void setDoubleProperty(String string, double v) throws JMSException
+ {
+ _properties.put(string, v);
+ }
+
+ public void setStringProperty(String string, String string1) throws JMSException
+ {
+ _properties.put(string, string1);
+ }
+
+ public void setObjectProperty(String string, Object object) throws JMSException
+ {
+ _properties.put(string, object);
+ }
+
+ public void acknowledge() throws JMSException
+ {
+
+ }
+
+ public void clearBody() throws JMSException
+ {
+
+ }
+}