diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-02-06 09:57:35 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-06 09:57:35 +0000 |
| commit | 1f4e634e6b86a4aff2b2553d5b44b1a1d5855779 (patch) | |
| tree | 0550dd09b25591d0e0a76ce13bc0f98d36903e4e /qpid/java/client/src | |
| parent | 886ab23a9dfca81951ebe12ff1ccaabf05f61d8d (diff) | |
| download | qpid-python-1f4e634e6b86a4aff2b2553d5b44b1a1d5855779.tar.gz | |
Merged revisions 501413-503717 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/trunk
........
r501448 | kpvdr | 2007-01-30 16:27:47 +0000 (Tue, 30 Jan 2007) | 1 line
Fixed codegen bug in which fields added by second XML file duplicate ordinal values.
........
r501455 | rgreig | 2007-01-30 16:40:20 +0000 (Tue, 30 Jan 2007) | 1 line
(Submitted by Rupert Smith) Ping tests refactored. Unused ping test classes removed. JUnit-toolkit 0.5-SNAPSHOT added to the build.
........
r501457 | rgreig | 2007-01-30 16:42:37 +0000 (Tue, 30 Jan 2007) | 1 line
(Submitted by Rupert Smith) Added PingClient.java which was forgotten from last commit.
........
r501465 | rgreig | 2007-01-30 16:53:41 +0000 (Tue, 30 Jan 2007) | 1 line
(Submitted by Rupert Smith) Updated the README.txt to give a fuller explanation for the creation of the temporary local maven repository.
........
r501472 | kpvdr | 2007-01-30 16:59:38 +0000 (Tue, 30 Jan 2007) | 1 line
Small codegen code tidy-up
........
r501804 | rgreig | 2007-01-31 11:29:33 +0000 (Wed, 31 Jan 2007) | 3 lines
(Patch submitted by Rupert Smith)
Added a ping latency test.
Uploaded new junit-toolkit snapshot for self timed tests.
........
r501914 | ritchiem | 2007-01-31 17:25:42 +0000 (Wed, 31 Jan 2007) | 3 lines
QPID-334 WeakReferenceMessageHandle uses a singleton so when body is purged by gc it cannot be reset
Changed to use an Arraylist of size 1 as per JIRA entry.
........
r501917 | ritchiem | 2007-01-31 17:31:04 +0000 (Wed, 31 Jan 2007) | 6 lines
QPID-333 Message Properties on non Qpid Messages are not preserved
Updated MessageConverter to have a constructor that takes a Message type.
Updated MessageConverterTest to use the new NonQpidMessage to test it out.
JMSHeaderAdapter.java - whitespace changes and comment noting that null return is required.
........
r501920 | ritchiem | 2007-01-31 17:43:45 +0000 (Wed, 31 Jan 2007) | 1 line
Unused so removing
........
r501945 | vinoski | 2007-01-31 19:00:26 +0000 (Wed, 31 Jan 2007) | 1 line
patch from Jonathan Anstey for QPID-332
........
r502172 | ritchiem | 2007-02-01 09:37:39 +0000 (Thu, 01 Feb 2007) | 3 lines
QPID-333 Committed test class rename to stop it being picked up by Surefire
AMQTopic.java - whitespace
........
r502178 | bhupendrab | 2007-02-01 10:01:32 +0000 (Thu, 01 Feb 2007) | 1 line
virtual host string corrected
........
r502179 | rgreig | 2007-02-01 10:13:21 +0000 (Thu, 01 Feb 2007) | 1 line
(Submitted by Rupert Smith) Added comments as a reminder of improvements to be made to the tests.
........
r502180 | bhupendrab | 2007-02-01 10:13:55 +0000 (Thu, 01 Feb 2007) | 2 lines
QPID-331
and setting operation parameters to default values after executing the operation once.
........
r502182 | rgreig | 2007-02-01 10:18:36 +0000 (Thu, 01 Feb 2007) | 1 line
(Submitted by Rupert Smith) Added comments as a reminder of improvements to be made to the tests.
........
r502248 | ritchiem | 2007-02-01 15:47:17 +0000 (Thu, 01 Feb 2007) | 7 lines
QPID-339 Java client hangs when starting up (intermittently)
Patched the problem where the dispatcher would hang. The previous logic was flawed.
Patch worked on by Robert Godfrey and Martin Ritchie.
Added test to ensure that the connection is not automatically started.
........
r502249 | ritchiem | 2007-02-01 15:50:52 +0000 (Thu, 01 Feb 2007) | 3 lines
QPID-330 Clients occasionally fail to notice connect
The AMQConnection.java constructor now deals with the full connection process. The failover thread should not be started. This allows the connection method to be simplified and not Thread.sleep waiting for the connection.
........
r502253 | ritchiem | 2007-02-01 16:01:14 +0000 (Thu, 01 Feb 2007) | 11 lines
QPID-339 Java client hangs when starting up (intermittently)
Patched the problem where the dispatcher would hang. The previous logic was flawed.
Patch worked on by Robert Godfrey and Martin Ritchie.
Added test to ensure that the connection is not automatically started.
(Only added the test last time by mistake. This is the actual fix)
With a test for the DispatcherTest
........
r502261 | ritchiem | 2007-02-01 16:25:57 +0000 (Thu, 01 Feb 2007) | 2 lines
QPID-339 DispatcherTest.java was broker now it actually tests correctly.
Added test to Check changing message listeners
........
r502268 | ritchiem | 2007-02-01 16:32:56 +0000 (Thu, 01 Feb 2007) | 1 line
Increased logging on a failure to attain state
........
r502269 | bhupendrab | 2007-02-01 16:34:21 +0000 (Thu, 01 Feb 2007) | 1 line
some part commented, so that it lets users copy paste the host details on the new connection window
........
r502271 | ritchiem | 2007-02-01 16:36:54 +0000 (Thu, 01 Feb 2007) | 3 lines
QPID-341 When using Queues and Topics defined via JNDI settings are not preserved.
Removed extraction of destination/queue name and used BindingURL directly to create Destination.
........
r502273 | ritchiem | 2007-02-01 16:38:45 +0000 (Thu, 01 Feb 2007) | 2 lines
Added more intelij files to the ignore list
........
r502576 | ritchiem | 2007-02-02 11:13:13 +0000 (Fri, 02 Feb 2007) | 4 lines
QPID-343 Performance test suite doesn't output missing message count on failure.
Updated PingAsyncTestPerf to output missing messsage count.
Updated PingPongProducer so it doesn't use AMQShortStringx.
........
r502610 | bhupendrab | 2007-02-02 14:26:32 +0000 (Fri, 02 Feb 2007) | 2 lines
QPID-84
tests for FSContextFactory deleted.fscontext.jar is not part of apache svn.
........
r502620 | rgreig | 2007-02-02 15:09:08 +0000 (Fri, 02 Feb 2007) | 3 lines
(Submitted by Rupert Smith)
Perftests improved with better timeout handling. Shared/unique destinations to ping now an option.
TestRunner now runs all per-thread setups, synchs all threads, then runs tests, synchas all threads, then runs tear downs.
........
r502627 | rgreig | 2007-02-02 15:31:30 +0000 (Fri, 02 Feb 2007) | 2 lines
(Submitted by Rupert Smith)
Fixed problem with losing message results. Was not passing in self generated message correlation id in the async test, to match up replies with.
........
r502655 | rgreig | 2007-02-02 16:59:14 +0000 (Fri, 02 Feb 2007) | 1 line
(Submitted by Rupert Smith) Options moved to top of contructor. Were at bottom and not being used!
........
r503593 | ritchiem | 2007-02-05 08:58:30 +0000 (Mon, 05 Feb 2007) | 1 line
Fixed bug in stop(). If a connection is opened not start()ed then closed a NullPointerException will be thrown as the Dispatcher has not been created.
........
r503604 | rgreig | 2007-02-05 09:40:04 +0000 (Mon, 05 Feb 2007) | 1 line
QPID-326 : Patch supplied by Rob Godfrey - add oldest message on queue notification, and log notifications in log file
........
r503609 | ritchiem | 2007-02-05 09:49:59 +0000 (Mon, 05 Feb 2007) | 1 line
Update to performance testing to allow the use of shared destinations. This allows topics to have multiple consumers and the total message counts updated correctly.
........
r503637 | rgreig | 2007-02-05 11:17:08 +0000 (Mon, 05 Feb 2007) | 2 lines
(Submitted by Rupert Smith)
Junit-toolkit has now fully migrated onto sourceforge. Snapshot repository location updated.
........
r503646 | rgreig | 2007-02-05 11:28:57 +0000 (Mon, 05 Feb 2007) | 2 lines
(Submitted by Rupert Smith)
This local repository is no longer needed. JUnit-Toolkit snapshot repository is now hosted on sourceforge: http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/. A release is also in progress to the central maven repository.
........
r503706 | bhupendrab | 2007-02-05 14:45:18 +0000 (Mon, 05 Feb 2007) | 2 lines
QPID-213
Also the parameter selection of boolean type is made as check-boxes instead of a drop-down.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting_persistent@504056 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
21 files changed, 1325 insertions, 859 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index cc052f81df..50299fa9d5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -215,12 +215,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Exception lastException = new Exception(); lastException.initCause(new ConnectException()); - while (lastException != null && checkException(lastException) && _failoverPolicy.failoverAllowed()) + while (!_connected && _failoverPolicy.failoverAllowed()) { try { makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); lastException = null; + _connected = true; } catch (Exception e) { @@ -232,34 +233,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.debug("Are we connected:" + _connected); - // Then the Failover Thread will handle conneciton - if (_failoverPolicy.failoverAllowed()) - { - //TODO this needs to be redone so that we are not spinning. - // A suitable object should be set that is then waited on - // and only notified when a connection is made or when - // the AMQConnection gets closed. - while (!_connected && !_closed.get()) - { - try - { - _logger.debug("Sleeping."); - Thread.sleep(100); - } - catch (InterruptedException ie) - { - _logger.debug("Woken up."); - } - } - if (!_failoverPolicy.failoverAllowed() || _failoverPolicy.getCurrentBrokerDetails() == null) - { - if (_lastAMQException != null) - { - throw _lastAMQException; - } - } - } - else + if (!_connected) { String message = null; @@ -318,7 +292,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void setVirtualHost(String virtualHost) { - if(virtualHost.startsWith("/")) + if (virtualHost.startsWith("/")) { virtualHost = virtualHost.substring(1); } @@ -403,7 +377,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean failoverAllowed() { - return _failoverPolicy.failoverAllowed(); + if (!_connected) + { + return false; + } + else + { + return _failoverPolicy.failoverAllowed(); + } } public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException @@ -815,6 +796,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _protocolHandler; } + public boolean started() + { + return _started; + } + public void bytesSent(long writtenBytes) { if (_connectionListener != null) @@ -1031,4 +1017,5 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect AMQConnectionFactory.class.getName(), null); // factory location } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java index b2d2d2bec3..17af3702a4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java @@ -62,7 +62,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF String clientName, String virtualHost) throws URLSyntaxException { this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + clientName + + username + ":" + password + "@" + clientName + "/" + virtualHost + "?brokerlist='" + broker + "'")); } @@ -334,7 +334,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF if (addr != null) { - return new AMQQueue(new AMQBindingURL((String) addr.getContent()).getQueueName()); + return new AMQQueue(new AMQBindingURL((String) addr.getContent())); } } @@ -344,7 +344,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF if (addr != null) { - return new AMQTopic(new AMQBindingURL((String) addr.getContent()).getDestinationName()); + return new AMQTopic(new AMQBindingURL((String) addr.getContent())); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b6429972df..35530b39c9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -93,8 +93,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final FlowControllingBlockingQueue _queue; - private final java.util.Queue<MessageConsumerPair> _reprocessQueue; - private Dispatcher _dispatcher; private MessageFactoryRegistry _messageFactoryRegistry; @@ -136,20 +134,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private long _nextProducerId; - /** - * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. - */ - private volatile AtomicBoolean _stopped = new AtomicBoolean(true); - - /** - * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer - */ - private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false); - - /** - * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer - */ - private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false); /** * Set when recover is called. This is to handle the case where recover() is called by application code @@ -157,14 +141,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private boolean _inRecovery; + private boolean _connectionStopped; + private boolean _hasMessageListeners; /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - + private class Dispatcher extends Thread { + + /** + * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. + */ + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private final Object _lock = new Object(); + public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -173,12 +167,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void run() { UnprocessedMessage message; - _stopped.set(false); + try { - while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null) + while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null) { - dispatchMessage(message); + synchronized (_lock) + { + + while (connectionStopped()) + { + _lock.wait(); + } + + dispatchMessage(message); + + while (connectionStopped()) + { + _lock.wait(); + } + + } + } } catch (InterruptedException e) @@ -189,6 +199,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _logger.info("Dispatcher thread terminating for channel " + _channelId); } + // only call while holding lock + final boolean connectionStopped() + { + return _connectionStopped; + } + + void setConnectionStopped(boolean connectionStopped) + { + synchronized (_lock) + { + _connectionStopped = connectionStopped; + _lock.notify(); + } + } + private void dispatchMessage(UnprocessedMessage message) { if (message.getDeliverBody() != null) @@ -246,15 +271,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - public void stopDispatcher() + public void close() { - _stopped.set(true); + _closed.set(true); interrupt(); + + //fixme awaitTermination + } } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry) { @@ -285,8 +312,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _defaultPrefetchHighMark = defaultPrefetchHighMark; _defaultPrefetchLowMark = defaultPrefetchLowMark; - _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>(); - if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, @@ -446,7 +471,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - + public void rollback() throws JMSException { checkTransacted(); @@ -654,7 +679,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_dispatcher != null) { - _dispatcher.stopDispatcher(); + _dispatcher.close(); + _dispatcher = null; } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception @@ -680,7 +706,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_dispatcher != null) { - _dispatcher.stopDispatcher(); + _dispatcher.close(); + _dispatcher = null; } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception @@ -712,8 +739,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // TODO: Be aware of possible changes to parameter order as versions change. getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false)); // requeue + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + false)); // requeue } boolean isInRecovery() @@ -743,37 +770,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageListener getMessageListener() throws JMSException { - checkNotClosed(); +// checkNotClosed(); return _messageListener; } public void setMessageListener(MessageListener listener) throws JMSException { - checkNotClosed(); - - if (!isStopped()) - { - throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); - } - - // We are stopped - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - BasicMessageConsumer consumer = i.next(); - - if (consumer.isReceiving()) - { - throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); - } - } - - _messageListener = listener; - - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - i.next().setMessageListener(_messageListener); - } - +// checkNotClosed(); +// +// if (_dispatcher != null && !_dispatcher.connectionStopped()) +// { +// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); +// } +// +// // We are stopped +// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) +// { +// BasicMessageConsumer consumer = i.next(); +// +// if (consumer.isReceiving()) +// { +// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); +// } +// } +// +// _messageListener = listener; +// +// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) +// { +// i.next().setMessageListener(_messageListener); +// } } @@ -1582,13 +1608,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi void start() { + //fixme This should be controlled by _stopped as it pairs with the stop method + //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled. + //will result in sending Flow messages for each subsequent call to flow.. only need to do this + // if we have called stop. if (_startedAtLeastOnce.getAndSet(true)) { //then we stopped this and are restarting, so signal server to resume delivery unsuspendChannel(); } - if(hasMessageListeners() && _dispatcher == null) + if (hasMessageListeners()) { startDistpatcherIfNecessary(); } @@ -1606,26 +1636,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { - if(_dispatcher == null) + if (_dispatcher == null) { _dispatcher = new Dispatcher(); _dispatcher.setDaemon(true); _dispatcher.start(); } + else + { + _dispatcher.setConnectionStopped(false); + } } void stop() { //stop the server delivering messages to this session suspendChannel(); - - //stop the dispatcher thread - _stopped.set(true); - } - - boolean isStopped() - { - return _stopped.get(); + + if (_dispatcher != null) + { + _dispatcher.setConnectionStopped(true); + } } /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index f50b0390c5..c82187b2e7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -28,12 +28,12 @@ import javax.jms.JMSException; import javax.jms.Topic; public class AMQTopic extends AMQDestination implements Topic - { +{ /** - * Constructor for use in creating a topic using a BindingURL. + * Constructor for use in creating a topic using a BindingURL. * * @param binding The binding url object. - */ + */ public AMQTopic(BindingURL binding) { super(binding); @@ -78,7 +78,7 @@ public class AMQTopic extends AMQDestination implements Topic return super.getDestinationName().toString(); } - public AMQShortString getRoutingKey() + public AMQShortString getRoutingKey() { return getDestinationName(); } @@ -93,7 +93,7 @@ public class AMQTopic extends AMQDestination implements Topic * Override since the queue is always private and we must ensure it remains null. If not, * reuse of the topic when registering consumers will make all consumers listen on the same (private) queue rather * than getting their own (private) queue. - * + * <p/> * This is relatively nasty but it is difficult to come up with a more elegant solution, given * the requirement in the case on AMQQueue and possibly other AMQDestination subclasses to * use the underlying queue name even where it is server generated. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 4ffec6fb41..832c312634 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -212,16 +212,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer //i.e. it is only valid to call this method if // - // (a) the session is stopped, in which case the dispatcher is not running + // (a) the connection is stopped, in which case the dispatcher is not running // OR // (b) the listener is null AND we are not receiving synchronously at present // - if (_session.isStopped()) + if (!_session.getAMQConnection().started()) { _messageListener.set(messageListener); _session.setHasMessageListeners(); - _session.startDistpatcherIfNecessary(); if (_logger.isDebugEnabled()) { @@ -248,7 +247,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer synchronized (_session) { - _messageListener.set(messageListener); _session.setHasMessageListeners(); _session.startDistpatcherIfNecessary(); @@ -329,12 +327,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { - _session.startDistpatcherIfNecessary(); checkPreConditions(); acquireReceiving(); + _session.startDistpatcherIfNecessary(); + try { if (closeOnAutoClose()) @@ -385,12 +384,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receiveNoWait() throws JMSException { - _session.startDistpatcherIfNecessary(); - checkPreConditions(); acquireReceiving(); + _session.startDistpatcherIfNecessary(); + try { if (closeOnAutoClose()) @@ -560,7 +559,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - //This shouldn't be possible. _synchronousQueue.put(jmsMessage); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 4426a7deee..f56fc40360 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -388,9 +388,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } else { - //TODO; Do we really want to create an empty message here ? - newMessage = (AbstractJMSMessage) _session.createMessage(); - return new MessageConverter(newMessage).getConvertedMessage(); + newMessage = new MessageConverter(message).getConvertedMessage(); } if (newMessage != null) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index 3086e5b90a..a1e2640f21 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -269,7 +269,7 @@ public final class JMSHeaderAdapter s = String.valueOf(o);
}
}
- }
+ }//else return s // null;
}
return s;
@@ -508,16 +508,16 @@ public final class JMSHeaderAdapter // JMS invalid names
if ((propertyName.equals("NULL")
- || propertyName.equals("TRUE")
- || propertyName.equals("FALSE")
- || propertyName.equals("NOT")
- || propertyName.equals("AND")
- || propertyName.equals("OR")
- || propertyName.equals("BETWEEN")
- || propertyName.equals("LIKE")
- || propertyName.equals("IN")
- || propertyName.equals("IS")
- || propertyName.equals("ESCAPE")))
+ || propertyName.equals("TRUE")
+ || propertyName.equals("FALSE")
+ || propertyName.equals("NOT")
+ || propertyName.equals("AND")
+ || propertyName.equals("OR")
+ || propertyName.equals("BETWEEN")
+ || propertyName.equals("LIKE")
+ || propertyName.equals("IN")
+ || propertyName.equals("IS")
+ || propertyName.equals("ESCAPE")))
{
throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java index 58089f595b..6bb9b9912b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java @@ -25,7 +25,8 @@ import org.apache.log4j.Logger; import javax.jms.*; import java.util.Enumeration; -public class MessageConverter { +public class MessageConverter +{ /** * Log4J logger @@ -118,6 +119,16 @@ public class MessageConverter { setMessageProperties(message); } + public MessageConverter(Message message) throws JMSException + { + //Send a message with just properties. + // Throwing away content + BytesMessage nativeMessage = new JMSBytesMessage(); + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + public AbstractJMSMessage getConvertedMessage() { return _newMessage; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 7f42faccb8..ca9a1b88f3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -225,6 +225,7 @@ public class AMQStateManager implements AMQMethodListener } if(_currentState != s) { + _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); } } diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java index cdb00240b6..5ab5722146 100644 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java +++ b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java @@ -123,7 +123,7 @@ public class PropertiesFileInitialContextFactoryTest extends TestCase try { AMQQueue queue = (AMQQueue) ctx.lookup("MyQueue"); - assertEquals("example.MyQueue", queue.getRoutingKey()); + assertEquals("example.MyQueue", queue.getRoutingKey().toString()); } catch (NamingException ne) { @@ -133,7 +133,7 @@ public class PropertiesFileInitialContextFactoryTest extends TestCase try { AMQTopic topic = (AMQTopic) ctx.lookup("ibmStocks"); - assertEquals("stocks.nyse.ibm", topic.getTopicName()); + assertEquals("stocks.nyse.ibm", topic.getTopicName().toString()); } catch (Exception ne) { @@ -143,7 +143,7 @@ public class PropertiesFileInitialContextFactoryTest extends TestCase try { AMQQueue direct = (AMQQueue) ctx.lookup("direct"); - assertEquals("directQueue", direct.getRoutingKey()); + assertEquals("directQueue", direct.getRoutingKey().toString()); } catch (NamingException ne) { diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java deleted file mode 100644 index db871281bf..0000000000 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.jndi.referenceabletest; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NameAlreadyBoundException; -import javax.naming.NamingException; -import javax.naming.NoInitialContextException; -import java.io.File; -import java.util.Hashtable; - -import junit.framework.TestCase; - -/** - * Usage: To run these you need to have the sun JNDI SPI for the FileSystem. - * This can be downloaded from sun here: - * http://java.sun.com/products/jndi/downloads/index.html - * Click : Download JNDI 1.2.1 & More button - * Download: File System Service Provider, 1.2 Beta 3 - * and add the two jars in the lib dir to your class path. - * <p/> - * Also you need to create the directory /temp/qpid-jndi-test - */ -class Bind extends TestCase -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest" + System.currentTimeMillis(); - public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; - public String PROVIDER_URL = DEFAULT_PROVIDER_URL; - - String _connectionFactoryString = ""; - - String _connectionString = "amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'"; - Topic _topic = null; - - boolean _bound = false; - - public Bind() throws NameAlreadyBoundException, NoInitialContextException - { - this(false, DEFAULT_PROVIDER_URL); - } - - public Bind(boolean output) throws NameAlreadyBoundException, NoInitialContextException - { - this(output, DEFAULT_PROVIDER_URL); - } - - public Bind(boolean output, String providerURL) throws NameAlreadyBoundException, NoInitialContextException - { - PROVIDER_URL = providerURL; - - // Set up the environment for creating the initial context - Hashtable env = new Hashtable(11); - env.put(Context.INITIAL_CONTEXT_FACTORY, - "com.sun.jndi.fscontext.RefFSContextFactory"); - - - env.put(Context.PROVIDER_URL, PROVIDER_URL); - - - File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3)); - - if (file.exists() && !file.isDirectory()) - { - System.out.println("Couldn't make directory file already exists"); - return; - } - else - { - if (!file.exists()) - { - if (!file.mkdirs()) - { - System.out.println("Couldn't make directory"); - return; - } - } - } - - Connection connection = null; - try - { - // Create the initial context - Context ctx = new InitialContext(env); - - // Create the connection factory to be bound - ConnectionFactory connectionFactory = null; - // Create the Connection to be bound - - - try - { - connectionFactory = new AMQConnectionFactory(_connectionString); - connection = connectionFactory.createConnection(); - - _connectionFactoryString = ((AMQConnectionFactory) connectionFactory).getConnectionURL().getURL(); - } - catch (JMSException jmsqe) - { - fail("Unable to create Connection:" + jmsqe); - } - catch (URLSyntaxException urlse) - { - fail("Unable to create Connection:" + urlse); - } - - try - { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _topic = session.createTopic("Fruity"); - } - catch (JMSException jmse) - { - - } - // Perform the binds - ctx.bind("ConnectionFactory", connectionFactory); - if (output) - { - System.out.println("Bound factory\n" + ((AMQConnectionFactory) connectionFactory).getConnectionURL()); - } - ctx.bind("Connection", connection); - if (output) - { - System.out.println("Bound Connection\n" + ((AMQConnection) connection).toURL()); - } - ctx.bind("Topic", _topic); - if (output) - { - System.out.println("Bound Topic:\n" + ((AMQTopic) _topic).toURL()); - } - _bound = true; - - // Check that it is bound - //Object obj = ctx.lookup("Connection"); - //System.out.println(((AMQConnection)obj).toURL()); - - // Close the context when we're done - ctx.close(); - } - catch (NamingException e) - { - System.out.println("Operation failed: " + e); - if (e instanceof NameAlreadyBoundException) - { - throw(NameAlreadyBoundException) e; - } - - if (e instanceof NoInitialContextException) - { - throw(NoInitialContextException) e; - } - } - finally - { - try - { - if (connection != null) - { - connection.close(); - } - } - catch (JMSException e) - { - //ignore just want it closed - } - } - } - - public String connectionFactoryValue() - { - if (_connectionFactoryString != null) - { - return _connectionFactoryString; - } - else - { - return ""; - } - } - - public String connectionValue() - { - if (_connectionString != null) - { - return _connectionString; - } - else - { - return ""; - } - } - - public String topicValue() - { - if (_topic != null) - { - return ((AMQTopic) _topic).toURL(); - } - else - { - return ""; - } - - } - - public boolean bound() - { - return _bound; - } - - public String getProviderURL() - { - return PROVIDER_URL; - } - - public static void main(String[] args) throws NameAlreadyBoundException, NoInitialContextException - { - new Bind(true); - } -} diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java deleted file mode 100644 index 9fc186f19a..0000000000 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.jndi.referenceabletest; - -import junit.framework.TestCase; -//import org.apache.qpid.testutil.VMBrokerSetup; - -import javax.naming.NameAlreadyBoundException; -import javax.naming.NoInitialContextException; - -/** - * Usage: To run these you need to have the sun JNDI SPI for the FileSystem. - * This can be downloaded from sun here: - * http://java.sun.com/products/jndi/downloads/index.html - * Click : Download JNDI 1.2.1 & More button - * Download: File System Service Provider, 1.2 Beta 3 - * and add the two jars in the lib dir to your class path. - * <p/> - * Also you need to create the directory /temp/qpid-jndi-test - */ -public class JNDIReferenceableTest extends TestCase -{ -/* // FIXME FSContext has been removed from repository. This needs redone with the PropertiesFileInitialContextFactory. QPID-84 - public void testReferenceable() - { - Bind b = null; - try - { - try - { - b = new Bind(); - } - catch (NameAlreadyBoundException e) - { - if (new Unbind().unbound()) - { - try - { - b = new Bind(); - } - catch (NameAlreadyBoundException ee) - { - fail("Unable to clear bound objects for test."); - } - } - else - { - fail("Unable to clear bound objects for test."); - } - } - } - catch (NoInitialContextException e) - { - fail("You don't have the File System SPI on you class path.\n" + - "This can be downloaded from sun here:\n" + - "http://java.sun.com/products/jndi/downloads/index.html\n" + - "Click : Download JNDI 1.2.1 & More button\n" + - "Download: File System Service Provider, 1.2 Beta 3\n" + - "and add the two jars in the lib dir to your class path."); - } - - assertTrue(b.bound()); - - Lookup l = new Lookup(b.getProviderURL()); - - assertTrue(l.connectionFactoryValue().equals(b.connectionFactoryValue())); - - assertTrue(l.connectionValue().equals(b.connectionValue())); - - assertTrue(l.topicValue().equals(b.topicValue())); - - - Unbind u = new Unbind(); - - assertTrue(u.unbound()); - - } - - public static junit.framework.Test suite() - { - return new VMBrokerSetup(new junit.framework.TestSuite(JNDIReferenceableTest.class)); - } - */ -} diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java deleted file mode 100644 index b804ccb30c..0000000000 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.jndi.referenceabletest; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQTopic; - -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import javax.jms.JMSException; -import java.io.File; -import java.util.Hashtable; - - -/** - * Usage: To run these you need to have the sun JNDI SPI for the FileSystem. - * This can be downloaded from sun here: - * http://java.sun.com/products/jndi/downloads/index.html - * Click : Download JNDI 1.2.1 & More button - * Download: File System Service Provider, 1.2 Beta 3 - * and add the two jars in the lib dir to your class path. - * <p/> - * Also you need to create the directory /temp/qpid-jndi-test - */ -class Lookup -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest"; - public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; - public String PROVIDER_URL = DEFAULT_PROVIDER_URL; - - AMQTopic _topic = null; - AMQConnection _connection = null; - AMQConnectionFactory _connectionFactory = null; - private String _connectionURL; - - - public Lookup() - { - this(DEFAULT_PROVIDER_URL); - } - - public Lookup(String providerURL) - { - - PROVIDER_URL = providerURL; - - // Set up the environment for creating the initial context - Hashtable env = new Hashtable(11); - env.put(Context.INITIAL_CONTEXT_FACTORY, - "com.sun.jndi.fscontext.RefFSContextFactory"); - - env.put(Context.PROVIDER_URL, PROVIDER_URL); - - File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3)); - - if (file.exists() && !file.isDirectory()) - { - System.out.println("Couldn't make directory file already exists"); - return; - } - else - { - if (!file.exists()) - { - if (!file.mkdirs()) - { - System.out.println("Couldn't make directory"); - return; - } - } - } - - try - { - // Create the initial context - Context ctx = new InitialContext(env); - - _topic = (AMQTopic) ctx.lookup("Topic"); - - _connection = (AMQConnection) ctx.lookup("Connection"); - - _connectionURL = _connection.toURL(); - - _connectionFactory = (AMQConnectionFactory) ctx.lookup("ConnectionFactory"); - //System.out.println(topic); - - // Close the context when we're done - ctx.close(); - } - catch (NamingException e) - { - System.out.println("Operation failed: " + e); - } - finally - { - try - { - if (_connection != null) - { - _connection.close(); - } - } - catch (JMSException e) - { - //ignore just need to close - } - } - } - - public String connectionFactoryValue() - { - if (_connectionFactory != null) - { - return _connectionFactory.getConnectionURL().toString(); - } - return ""; - } - - public String connectionValue() - { - if (_connectionURL != null) - { - return _connectionURL; - } - return ""; - } - - public String topicValue() - { - if (_topic != null) - { - return _topic.toURL(); - } - return ""; - } - - public String getProviderURL() - { - return PROVIDER_URL; - } - - public static void main(String[] args) - { - new Lookup(); - } -} - diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java deleted file mode 100644 index 869bc55d8f..0000000000 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.jndi.referenceabletest; - -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NameNotFoundException; -import javax.naming.NamingException; -import javax.jms.Connection; -import javax.jms.JMSException; -import java.io.File; -import java.util.Hashtable; - -/** - * Usage: To run these you need to have the sun JNDI SPI for the FileSystem. - * This can be downloaded from sun here: - * http://java.sun.com/products/jndi/downloads/index.html - * Click : Download JNDI 1.2.1 & More button - * Download: File System Service Provider, 1.2 Beta 3 - * and add the two jars in the lib dir to your class path. - * <p/> - * Also you need to create the directory /temp/qpid-jndi-test - */ -class Unbind -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest" + System.currentTimeMillis(); - public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; - public String PROVIDER_URL = DEFAULT_PROVIDER_URL; - - boolean _unbound = false; - - public Unbind() - { - this(false, DEFAULT_PROVIDER_URL); - } - - public Unbind(Boolean output) - { - this(output, DEFAULT_PROVIDER_URL); - } - - public Unbind(String provider) - { - this(false, provider); - } - - public Unbind(boolean output, String providerURL) - { - PROVIDER_URL = providerURL; - // Set up the environment for creating the initial context - Hashtable env = new Hashtable(11); - env.put(Context.INITIAL_CONTEXT_FACTORY, - "com.sun.jndi.fscontext.RefFSContextFactory"); - env.put(Context.PROVIDER_URL, PROVIDER_URL); - - File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3)); - - if (file.exists() && !file.isDirectory()) - { - System.out.println("Couldn't make directory file already exists"); - return; - } - else - { - if (!file.exists()) - { - if (!file.mkdirs()) - { - System.out.println("Couldn't make directory"); - return; - } - } - } - - try - { - // Create the initial context - Context ctx = new InitialContext(env); - - // Remove the binding - ctx.unbind("ConnectionFactory"); - ctx.unbind("Connection"); - ctx.unbind("Topic"); - - // Check that it is gone - Object obj = null; - try - { - obj = ctx.lookup("ConnectionFactory"); - } - catch (NameNotFoundException ne) - { - if (output) - { - System.out.println("unbind ConnectionFactory successful"); - } - try - { - obj = ctx.lookup("Connection"); - try - { - ((Connection) obj).close(); - } - catch (JMSException e) - { - //ignore just need to close - } - } - catch (NameNotFoundException ne2) - { - if (output) - { - System.out.println("unbind Connection successful"); - } - - try - { - obj = ctx.lookup("Topic"); - } - catch (NameNotFoundException ne3) - { - if (output) - { - System.out.println("unbind Topic successful"); - } - _unbound = true; - } - } - } - - //System.out.println("unbind failed; object still there: " + obj); - - // Close the context when we're done - - ctx.close(); - - } - catch (NamingException e) - { - System.out.println("Operation failed: " + e); - } - } - - public boolean unbound() - { - return _unbound; - } - - public static void main(String[] args) - { - - new Unbind(true); - } -} - diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java new file mode 100644 index 0000000000..165059946c --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue + * <p/> + * The message delivery process: + * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s + * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start + * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a + * session can run in any order and a synchronous put/poll will block the dispatcher). + * <p/> + * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered + * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class DispatcherTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(DispatcherTest.class); + + Context _context; + + private static final int MSG_COUNT = 6; + private int _receivedCount = 0; + private int _receivedCountWhileStopped = 0; + private Connection _clientConnection, _producerConnection; + private MessageConsumer _consumer; + MessageProducer _producer; + Session _clientSession, _producerSession; + + private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1); //all messages Sent Lock + private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1); //all messages Sent Lock + + private volatile boolean _connectionStopped = false; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'"); + env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client 1 + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer = _clientSession.createConsumer(queue); + + //Create Producer + _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _producerConnection.start(); + + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producer = _producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(_producerSession.createTextMessage("Message " + msg)); + } + + } + + protected void tearDown() throws Exception + { + + _clientConnection.close(); + + _producerConnection.close(); + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testAsynchronousRecieve() + { + + _logger.info("Test Start"); + + + assertTrue(!((AMQConnection) _clientConnection).started()); + + //Set default Message Listener + try + { + _consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message); + + _receivedCount++; + + if (_receivedCount == MSG_COUNT) + { + _allFirstMessagesSent.countDown(); + } + + if (_connectionStopped) + { + _logger.info("Running with Message:" + _receivedCount); + } + + if (_connectionStopped && _allFirstMessagesSent.getCount() == 0) + { + _receivedCountWhileStopped++; + } + + if (_allFirstMessagesSent.getCount() == 0) + { + if (_receivedCount == MSG_COUNT * 2) + { + _allSecondMessagesSent.countDown(); + } + } + } + }); + + assertTrue("Connecion should not be started", !((AMQConnection) _clientConnection).started()); + _clientConnection.start(); + } + catch (JMSException e) + { + _logger.error("Error Setting Default ML on consumer1"); + } + + + try + { + _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + + try + { + assertTrue("Connecion should be started", ((AMQConnection) _clientConnection).started()); + _clientConnection.stop(); + _connectionStopped = true; + } + catch (JMSException e) + { + _logger.error("Error stopping connection"); + } + + + try + { + _logger.error("Send additional messages"); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(_producerSession.createTextMessage("Message " + msg)); + } + } + catch (JMSException e) + { + _logger.error("Unable to send additional messages", e); + } + + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) + { + //ignore + } + + try + { + _logger.info("Restarting connection"); + + _connectionStopped = false; + _clientConnection.start(); + } + catch (JMSException e) + { + _logger.error("Error Setting Better ML on consumer1", e); + } + + + _logger.info("Waiting upto 2 seconds for messages"); + + try + { + _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + + assertEquals("Messages not received correctly", 0, _allFirstMessagesSent.getCount()); + assertEquals("Messages not received correctly", 0, _allSecondMessagesSent.getCount()); + assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount); + assertEquals("Messages received while stopped is not 0", 0, _receivedCountWhileStopped); + + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(DispatcherTest.class); + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java new file mode 100644 index 0000000000..28bb2b614b --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue + * <p/> + * The message delivery process: + * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s + * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start + * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a + * session can run in any order and a synchronous put/poll will block the dispatcher). + * <p/> + * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered + * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class ResetMessageListenerTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(ResetMessageListenerTest.class); + + Context _context; + + private static final int MSG_COUNT = 6; + private int receivedCount1ML1 = 0; + private int receivedCount1ML2 = 0; + private int receivedCount2 = 0; + private Connection _clientConnection, _producerConnection; + private MessageConsumer _consumer1; + private MessageConsumer _consumer2; + MessageProducer _producer; + Session _clientSession, _producerSession; + + private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(2); //all messages Sent Lock + private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(2); //all messages Sent Lock + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'"); + env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client 1 + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer1 = _clientSession.createConsumer(queue); + + //Create Client 2 on same session + _consumer2 = _clientSession.createConsumer(queue); + + //Create Producer + _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _producerConnection.start(); + + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producer = _producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(_producerSession.createTextMessage("Message " + msg)); + } + + } + + protected void tearDown() throws Exception + { + assertEquals("First batch of messages not received correctly", 0, _allFirstMessagesSent.getCount()); + assertEquals("Second batch of messages not received correctly", 0, _allSecondMessagesSent.getCount()); + assertEquals("Client 1 ML1 didn't get all messages", MSG_COUNT / 2, receivedCount1ML1); + assertEquals("Client 2 didn't get all messages", MSG_COUNT, receivedCount2); + assertEquals("Client 1 ML2 didn't get all messages", MSG_COUNT / 2, receivedCount1ML2); + + _clientConnection.close(); + + _producerConnection.close(); + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testAsynchronousRecieve() + { + + _logger.info("Test Start"); + + //Set default Message Listener + try + { + _consumer1.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 1 ML 1 Received Message(" + receivedCount1ML1 + "):" + message); + + receivedCount1ML1++; + if (receivedCount1ML1 == MSG_COUNT / 2) + { + _allFirstMessagesSent.countDown(); + } + } + }); + } + catch (JMSException e) + { + _logger.error("Error Setting Default ML on consumer1"); + } + + + try + { + _consumer2.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message); + + receivedCount2++; + if (receivedCount2 == MSG_COUNT / 2) + { + _logger.info("Client 2 received all its messages1"); + _allFirstMessagesSent.countDown(); + } + + if (receivedCount2 == MSG_COUNT) + { + _logger.info("Client 2 received all its messages2"); + _allSecondMessagesSent.countDown(); + } + } + }); + + _clientConnection.start(); + } + catch (JMSException e) + { + _logger.error("Error Setting Default ML on consumer2"); + + } + + + try + { + _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS); + _logger.info("Received first batch of messages"); + } + catch (InterruptedException e) + { + //do nothing + } + + try + { + _clientConnection.stop(); + } + catch (JMSException e) + { + _logger.error("Error stopping connection"); + } + + _logger.info("Reset Message Listener to better listener while connection stopped, will restart session"); + try + { + _consumer1.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 1 ML2 Received Message(" + receivedCount1ML1 + "):" + message); + + receivedCount1ML2++; + if (receivedCount1ML2 == MSG_COUNT / 2) + { + _allSecondMessagesSent.countDown(); + } + } + }); + + _clientConnection.start(); + } + catch (javax.jms.IllegalStateException e) + { + _logger.error("Connection not stopped while setting ML", e); + fail("Unable to change message listener:" + e.getCause()); + } + catch (JMSException e) + { + _logger.error("Error Setting Better ML on consumer1", e); + } + + try + { + _logger.error("Send additional messages"); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(_producerSession.createTextMessage("Message " + msg)); + } + } + catch (JMSException e) + { + _logger.error("Unable to send additional messages", e); + } + + _logger.info("Waiting upto 2 seconds for messages"); + + try + { + _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(ResetMessageListenerTest.class); + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java b/qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java index f7bea1b36a..c3434164d8 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java @@ -24,7 +24,7 @@ import javax.jms.*; import java.util.Enumeration; import java.io.Serializable; -public class TestNonQpidTextMessage implements ObjectMessage { +public class NonQpidObjectMessage implements ObjectMessage { private JMSObjectMessage _realMessage; private String _contentString; @@ -34,7 +34,7 @@ public class TestNonQpidTextMessage implements ObjectMessage { * does not inherit from the Qpid message superclasses * and expand our unit testing of MessageConverter et al */ - public TestNonQpidTextMessage() + public NonQpidObjectMessage() { _realMessage = new JMSObjectMessage(); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java new file mode 100644 index 0000000000..3287314a44 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java @@ -0,0 +1,165 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQAuthenticationException; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.message.AMQMessage; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.AMQConnectionFailureException; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class ConnectionStartTest extends TestCase +{ + + String _broker = "vm://:1"; + + AMQConnection _connection; + private Session _consumerSess; + private MessageConsumer _consumer; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + try + { + AMQQueue queue = new AMQQueue("ConnectionStartTest"); + + AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + + Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + + MessageProducer pub = pubSess.createProducer(queue); + + pub.send(pubSess.createTextMessage("Initial Message")); + + _connection = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + + _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + + _consumer = _consumerSess.createConsumer(queue); + + pubCon.close(); + + } + catch (Exception e) + { + fail("Connection to " + _broker + " should succeed. Reason: " + e); + } + } + + protected void tearDown() throws Exception + { + _connection.close(); + TransportConnection.killVMBroker(1); + } + + public void testSimpleReceiveConnection() + { + try + { + assertTrue("Connection should not be started", !_connection.started()); + //Note that this next line will start the dispatcher in the session + // should really not be called before _connection start + assertTrue("There should not be messages waiting for the consumer", _consumer.receiveNoWait() == null); + _connection.start(); + assertTrue("There should be messages waiting for the consumer", _consumer.receiveNoWait() == null); + assertTrue("Connection should be started", _connection.started()); + + } + catch (JMSException e) + { + fail("An error occured during test because:" + e); + } + + } + + public void testMessageListenerConnection() + { + final CountDownLatch _gotMessage = new CountDownLatch(1); + + try + { + assertTrue("Connection should not be started", !_connection.started()); + _consumerSess.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + try + { + assertTrue("Connection should be started", _connection.started()); + assertEquals("Mesage Received", "Initial Message", ((TextMessage) message).getText()); + _gotMessage.countDown(); + } + catch (JMSException e) + { + fail("Couldn't get message text because:" + e.getCause()); + } + } + }); + + assertTrue("Connection should not be started", !_connection.started()); + _connection.start(); + assertTrue("Connection should be started", _connection.started()); + + try + { + _gotMessage.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + fail("Timed out awaiting message via onMessage"); + } + + } + catch (JMSException e) + { + fail("Failed because:" + e.getCause()); + } + + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(ConnectionStartTest.class); + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java index c09d2504eb..2d61ceb00f 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -26,7 +26,7 @@ import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.message.TestNonQpidTextMessage; +import org.apache.qpid.client.message.NonQpidObjectMessage; import org.apache.qpid.framing.AMQShortString; import javax.jms.*; @@ -71,7 +71,7 @@ public class JMSPropertiesTest extends TestCase MessageProducer producer = producerSession.createProducer(queue); //create a test message to send - ObjectMessage sentMsg = new TestNonQpidTextMessage(); + ObjectMessage sentMsg = new NonQpidObjectMessage(); sentMsg.setJMSCorrelationID(JMS_CORR_ID); sentMsg.setJMSDeliveryMode(JMS_DELIV_MODE); sentMsg.setJMSType(JMS_TYPE); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java index 6a335b8627..a8a5c7d8b2 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java @@ -31,10 +31,12 @@ import javax.jms.Message; import javax.jms.Destination; import javax.jms.TextMessage; import javax.jms.MapMessage; +import javax.jms.JMSException; import java.util.HashMap; -public class MessageConverterTest extends TestCase { +public class MessageConverterTest extends TestCase +{ public static final String JMS_CORR_ID = "QPIDID_01"; public static final int JMS_DELIV_MODE = 1; @@ -50,53 +52,79 @@ public class MessageConverterTest extends TestCase { super.setUp(); testTextMessage = new JMSTextMessage(); - //Add JMSProperties - testTextMessage.setJMSCorrelationID(JMS_CORR_ID); - testTextMessage.setJMSDeliveryMode(JMS_DELIV_MODE); - testTextMessage.setJMSType(JMS_TYPE); - testTextMessage.setJMSReplyTo(JMS_REPLY_TO); + //Set Message Text testTextMessage.setText("testTextMessage text"); - - //Add non-JMS properties - testTextMessage.setStringProperty("testProp1","testValue1"); - testTextMessage.setDoubleProperty("testProp2",Double.MIN_VALUE); + setMessageProperties(testTextMessage); testMapMessage = new JMSMapMessage(); - testMapMessage.setString("testMapString","testMapStringValue"); - testMapMessage.setDouble("testMapDouble",Double.MAX_VALUE); + testMapMessage.setString("testMapString", "testMapStringValue"); + testMapMessage.setDouble("testMapDouble", Double.MAX_VALUE); } public void testSetProperties() throws Exception { - AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage(); - - //check JMS prop values on newMessage match - assertEquals("JMS Correlation ID mismatch",testTextMessage.getJMSCorrelationID(),newMessage.getJMSCorrelationID()); - assertEquals("JMS Delivery mode mismatch",testTextMessage.getJMSDeliveryMode(),newMessage.getJMSDeliveryMode()); - assertEquals("JMS Type mismatch",testTextMessage.getJMSType(),newMessage.getJMSType()); - assertEquals("JMS Reply To mismatch",testTextMessage.getJMSReplyTo(),newMessage.getJMSReplyTo()); - - //check non-JMS standard props ok too - assertEquals("Test String prop value mismatch",testTextMessage.getStringProperty("testProp1"), - newMessage.getStringProperty("testProp1")); - assertEquals("Test Double prop value mismatch",testTextMessage.getDoubleProperty("testProp2"), - newMessage.getDoubleProperty("testProp2")); + AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage(); + mesagePropertiesTest(testTextMessage, newMessage); } public void testJMSTextMessageConversion() throws Exception { - AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage(); - assertEquals("Converted message text mismatch",((JMSTextMessage)newMessage).getText(),testTextMessage.getText()); + AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage(); + assertEquals("Converted message text mismatch", ((JMSTextMessage) newMessage).getText(), testTextMessage.getText()); } public void testJMSMapMessageConversion() throws Exception { - AbstractJMSMessage newMessage = new MessageConverter((MapMessage)testMapMessage).getConvertedMessage(); - assertEquals("Converted map message String mismatch",((JMSMapMessage)newMessage).getString("testMapString"), - testMapMessage.getString("testMapString")); - assertEquals("Converted map message Double mismatch",((JMSMapMessage)newMessage).getDouble("testMapDouble"), - testMapMessage.getDouble("testMapDouble")); + AbstractJMSMessage newMessage = new MessageConverter((MapMessage) testMapMessage).getConvertedMessage(); + assertEquals("Converted map message String mismatch", ((JMSMapMessage) newMessage).getString("testMapString"), + testMapMessage.getString("testMapString")); + assertEquals("Converted map message Double mismatch", ((JMSMapMessage) newMessage).getDouble("testMapDouble"), + testMapMessage.getDouble("testMapDouble")); + + } + + public void testMessageConversion() throws Exception + { + Message newMessage = new NonQpidMessage(); + setMessageProperties(newMessage); + mesagePropertiesTest(testTextMessage, newMessage); + } + + private void setMessageProperties(Message message) throws JMSException + { + message.setJMSCorrelationID(JMS_CORR_ID); + message.setJMSDeliveryMode(JMS_DELIV_MODE); + message.setJMSType(JMS_TYPE); + message.setJMSReplyTo(JMS_REPLY_TO); + //Add non-JMS properties + message.setStringProperty("testProp1", "testValue1"); + message.setDoubleProperty("testProp2", Double.MIN_VALUE); + } + + + private void mesagePropertiesTest(Message expectedMessage, Message actualMessage) + { + try + { + //check JMS prop values on newMessage match + assertEquals("JMS Correlation ID mismatch", expectedMessage.getJMSCorrelationID(), actualMessage.getJMSCorrelationID()); + assertEquals("JMS Delivery mode mismatch", expectedMessage.getJMSDeliveryMode(), actualMessage.getJMSDeliveryMode()); + assertEquals("JMS Type mismatch", expectedMessage.getJMSType(), actualMessage.getJMSType()); + assertEquals("JMS Reply To mismatch", expectedMessage.getJMSReplyTo(), actualMessage.getJMSReplyTo()); + + //check non-JMS standard props ok too + assertEquals("Test String prop value mismatch", expectedMessage.getStringProperty("testProp1"), + actualMessage.getStringProperty("testProp1")); + + assertEquals("Test Double prop value mismatch", expectedMessage.getDoubleProperty("testProp2"), + actualMessage.getDoubleProperty("testProp2")); + } + catch (JMSException e) + { + fail("An error occured testing the property values" + e.getCause()); + e.printStackTrace(); + } } protected void tearDown() throws Exception diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java new file mode 100644 index 0000000000..e992290513 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.message; + +import javax.jms.Message; +import javax.jms.JMSException; +import javax.jms.Destination; +import java.util.Enumeration; +import java.util.Hashtable; + +public class NonQpidMessage implements Message +{ + private String _JMSMessageID; + private long _JMSTimestamp; + private byte[] _JMSCorrelationIDAsBytes; + private String _JMSCorrelationID; + private Destination _JMSReplyTo; + private Destination _JMSDestination; + private int _JMSDeliveryMode; + private boolean _JMSRedelivered; + private String _JMSType; + private long _JMSExpiration; + private int _JMSPriority; + private Hashtable _properties; + + public NonQpidMessage() + { + _properties = new Hashtable(); + _JMSPriority = javax.jms.Message.DEFAULT_PRIORITY; + _JMSDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE; + } + + public String getJMSMessageID() throws JMSException + { + return _JMSMessageID; + } + + public void setJMSMessageID(String string) throws JMSException + { + _JMSMessageID = string; + } + + public long getJMSTimestamp() throws JMSException + { + return _JMSTimestamp; + } + + public void setJMSTimestamp(long l) throws JMSException + { + _JMSTimestamp = l; + } + + public byte[] getJMSCorrelationIDAsBytes() throws JMSException + { + return _JMSCorrelationIDAsBytes; + } + + public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException + { + _JMSCorrelationIDAsBytes = bytes; + } + + public void setJMSCorrelationID(String string) throws JMSException + { + _JMSCorrelationID = string; + } + + public String getJMSCorrelationID() throws JMSException + { + return _JMSCorrelationID; + } + + public Destination getJMSReplyTo() throws JMSException + { + return _JMSReplyTo; + } + + public void setJMSReplyTo(Destination destination) throws JMSException + { + _JMSReplyTo = destination; + } + + public Destination getJMSDestination() throws JMSException + { + return _JMSDestination; + } + + public void setJMSDestination(Destination destination) throws JMSException + { + _JMSDestination = destination; + } + + public int getJMSDeliveryMode() throws JMSException + { + return _JMSDeliveryMode; + } + + public void setJMSDeliveryMode(int i) throws JMSException + { + _JMSDeliveryMode = i; + } + + public boolean getJMSRedelivered() throws JMSException + { + return _JMSRedelivered; + } + + public void setJMSRedelivered(boolean b) throws JMSException + { + _JMSRedelivered = b; + } + + public String getJMSType() throws JMSException + { + return _JMSType; + } + + public void setJMSType(String string) throws JMSException + { + _JMSType = string; + } + + public long getJMSExpiration() throws JMSException + { + return _JMSExpiration; + } + + public void setJMSExpiration(long l) throws JMSException + { + _JMSExpiration = l; + } + + public int getJMSPriority() throws JMSException + { + return _JMSPriority; + } + + public void setJMSPriority(int i) throws JMSException + { + _JMSPriority = i; + } + + public void clearProperties() throws JMSException + { + _properties.clear(); + } + + public boolean propertyExists(String string) throws JMSException + { + return _properties.containsKey(string); + } + + public boolean getBooleanProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Boolean) + { + return (Boolean) o; + } + else + { + return Boolean.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public byte getByteProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Byte) + { + return (Byte) o; + } + else + { + return Byte.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public short getShortProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Short) + { + return (Short) o; + } + else + { + return Short.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public int getIntProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Integer) + { + return (Integer) o; + } + else + { + return Integer.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public long getLongProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Long) + { + return (Long) o; + } + else + { + return Long.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public float getFloatProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Float) + { + return (Float) o; + } + else + { + return Float.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public double getDoubleProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Double) + { + return (Double) o; + } + else + { + return Double.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public String getStringProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof String) + { + return (String) o; + } + else + { + return null; + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public Object getObjectProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Boolean) + { + return (Boolean) o; + } + else + { + return Boolean.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public Enumeration getPropertyNames() throws JMSException + { + return _properties.keys(); + } + + public void setBooleanProperty(String string, boolean b) throws JMSException + { + _properties.put(string, b); + } + + public void setByteProperty(String string, byte b) throws JMSException + { + _properties.put(string, b); + } + + public void setShortProperty(String string, short i) throws JMSException + { + _properties.put(string, i); + } + + public void setIntProperty(String string, int i) throws JMSException + { + _properties.put(string, i); + } + + public void setLongProperty(String string, long l) throws JMSException + { + _properties.put(string, l); + } + + public void setFloatProperty(String string, float v) throws JMSException + { + _properties.put(string, v); + } + + public void setDoubleProperty(String string, double v) throws JMSException + { + _properties.put(string, v); + } + + public void setStringProperty(String string, String string1) throws JMSException + { + _properties.put(string, string1); + } + + public void setObjectProperty(String string, Object object) throws JMSException + { + _properties.put(string, object); + } + + public void acknowledge() throws JMSException + { + + } + + public void clearBody() throws JMSException + { + + } +} |
