diff options
Diffstat (limited to 'qpid/java/client/src')
21 files changed, 1325 insertions, 859 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index cc052f81df..50299fa9d5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -215,12 +215,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Exception lastException = new Exception(); lastException.initCause(new ConnectException()); - while (lastException != null && checkException(lastException) && _failoverPolicy.failoverAllowed()) + while (!_connected && _failoverPolicy.failoverAllowed()) { try { makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); lastException = null; + _connected = true; } catch (Exception e) { @@ -232,34 +233,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.debug("Are we connected:" + _connected); - // Then the Failover Thread will handle conneciton - if (_failoverPolicy.failoverAllowed()) - { - //TODO this needs to be redone so that we are not spinning. - // A suitable object should be set that is then waited on - // and only notified when a connection is made or when - // the AMQConnection gets closed. - while (!_connected && !_closed.get()) - { - try - { - _logger.debug("Sleeping."); - Thread.sleep(100); - } - catch (InterruptedException ie) - { - _logger.debug("Woken up."); - } - } - if (!_failoverPolicy.failoverAllowed() || _failoverPolicy.getCurrentBrokerDetails() == null) - { - if (_lastAMQException != null) - { - throw _lastAMQException; - } - } - } - else + if (!_connected) { String message = null; @@ -318,7 +292,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void setVirtualHost(String virtualHost) { - if(virtualHost.startsWith("/")) + if (virtualHost.startsWith("/")) { virtualHost = virtualHost.substring(1); } @@ -403,7 +377,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean failoverAllowed() { - return _failoverPolicy.failoverAllowed(); + if (!_connected) + { + return false; + } + else + { + return _failoverPolicy.failoverAllowed(); + } } public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException @@ -815,6 +796,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _protocolHandler; } + public boolean started() + { + return _started; + } + public void bytesSent(long writtenBytes) { if (_connectionListener != null) @@ -1031,4 +1017,5 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect AMQConnectionFactory.class.getName(), null); // factory location } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java index b2d2d2bec3..17af3702a4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java @@ -62,7 +62,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF String clientName, String virtualHost) throws URLSyntaxException { this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + clientName + + username + ":" + password + "@" + clientName + "/" + virtualHost + "?brokerlist='" + broker + "'")); } @@ -334,7 +334,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF if (addr != null) { - return new AMQQueue(new AMQBindingURL((String) addr.getContent()).getQueueName()); + return new AMQQueue(new AMQBindingURL((String) addr.getContent())); } } @@ -344,7 +344,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF if (addr != null) { - return new AMQTopic(new AMQBindingURL((String) addr.getContent()).getDestinationName()); + return new AMQTopic(new AMQBindingURL((String) addr.getContent())); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b6429972df..35530b39c9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -93,8 +93,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final FlowControllingBlockingQueue _queue; - private final java.util.Queue<MessageConsumerPair> _reprocessQueue; - private Dispatcher _dispatcher; private MessageFactoryRegistry _messageFactoryRegistry; @@ -136,20 +134,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private long _nextProducerId; - /** - * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. - */ - private volatile AtomicBoolean _stopped = new AtomicBoolean(true); - - /** - * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer - */ - private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false); - - /** - * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer - */ - private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false); /** * Set when recover is called. This is to handle the case where recover() is called by application code @@ -157,14 +141,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private boolean _inRecovery; + private boolean _connectionStopped; + private boolean _hasMessageListeners; /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - + private class Dispatcher extends Thread { + + /** + * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. + */ + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private final Object _lock = new Object(); + public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -173,12 +167,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void run() { UnprocessedMessage message; - _stopped.set(false); + try { - while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null) + while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null) { - dispatchMessage(message); + synchronized (_lock) + { + + while (connectionStopped()) + { + _lock.wait(); + } + + dispatchMessage(message); + + while (connectionStopped()) + { + _lock.wait(); + } + + } + } } catch (InterruptedException e) @@ -189,6 +199,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _logger.info("Dispatcher thread terminating for channel " + _channelId); } + // only call while holding lock + final boolean connectionStopped() + { + return _connectionStopped; + } + + void setConnectionStopped(boolean connectionStopped) + { + synchronized (_lock) + { + _connectionStopped = connectionStopped; + _lock.notify(); + } + } + private void dispatchMessage(UnprocessedMessage message) { if (message.getDeliverBody() != null) @@ -246,15 +271,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - public void stopDispatcher() + public void close() { - _stopped.set(true); + _closed.set(true); interrupt(); + + //fixme awaitTermination + } } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry) { @@ -285,8 +312,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _defaultPrefetchHighMark = defaultPrefetchHighMark; _defaultPrefetchLowMark = defaultPrefetchLowMark; - _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>(); - if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, @@ -446,7 +471,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - + public void rollback() throws JMSException { checkTransacted(); @@ -654,7 +679,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_dispatcher != null) { - _dispatcher.stopDispatcher(); + _dispatcher.close(); + _dispatcher = null; } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception @@ -680,7 +706,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_dispatcher != null) { - _dispatcher.stopDispatcher(); + _dispatcher.close(); + _dispatcher = null; } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception @@ -712,8 +739,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // TODO: Be aware of possible changes to parameter order as versions change. getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false)); // requeue + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + false)); // requeue } boolean isInRecovery() @@ -743,37 +770,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageListener getMessageListener() throws JMSException { - checkNotClosed(); +// checkNotClosed(); return _messageListener; } public void setMessageListener(MessageListener listener) throws JMSException { - checkNotClosed(); - - if (!isStopped()) - { - throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); - } - - // We are stopped - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - BasicMessageConsumer consumer = i.next(); - - if (consumer.isReceiving()) - { - throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); - } - } - - _messageListener = listener; - - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - i.next().setMessageListener(_messageListener); - } - +// checkNotClosed(); +// +// if (_dispatcher != null && !_dispatcher.connectionStopped()) +// { +// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); +// } +// +// // We are stopped +// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) +// { +// BasicMessageConsumer consumer = i.next(); +// +// if (consumer.isReceiving()) +// { +// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); +// } +// } +// +// _messageListener = listener; +// +// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) +// { +// i.next().setMessageListener(_messageListener); +// } } @@ -1582,13 +1608,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi void start() { + //fixme This should be controlled by _stopped as it pairs with the stop method + //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled. + //will result in sending Flow messages for each subsequent call to flow.. only need to do this + // if we have called stop. if (_startedAtLeastOnce.getAndSet(true)) { //then we stopped this and are restarting, so signal server to resume delivery unsuspendChannel(); } - if(hasMessageListeners() && _dispatcher == null) + if (hasMessageListeners()) { startDistpatcherIfNecessary(); } @@ -1606,26 +1636,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { - if(_dispatcher == null) + if (_dispatcher == null) { _dispatcher = new Dispatcher(); _dispatcher.setDaemon(true); _dispatcher.start(); } + else + { + _dispatcher.setConnectionStopped(false); + } } void stop() { //stop the server delivering messages to this session suspendChannel(); - - //stop the dispatcher thread - _stopped.set(true); - } - - boolean isStopped() - { - return _stopped.get(); + + if (_dispatcher != null) + { + _dispatcher.setConnectionStopped(true); + } } /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index f50b0390c5..c82187b2e7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -28,12 +28,12 @@ import javax.jms.JMSException; import javax.jms.Topic; public class AMQTopic extends AMQDestination implements Topic - { +{ /** - * Constructor for use in creating a topic using a BindingURL. + * Constructor for use in creating a topic using a BindingURL. * * @param binding The binding url object. - */ + */ public AMQTopic(BindingURL binding) { super(binding); @@ -78,7 +78,7 @@ public class AMQTopic extends AMQDestination implements Topic return super.getDestinationName().toString(); } - public AMQShortString getRoutingKey() + public AMQShortString getRoutingKey() { return getDestinationName(); } @@ -93,7 +93,7 @@ public class AMQTopic extends AMQDestination implements Topic * Override since the queue is always private and we must ensure it remains null. If not, * reuse of the topic when registering consumers will make all consumers listen on the same (private) queue rather * than getting their own (private) queue. - * + * <p/> * This is relatively nasty but it is difficult to come up with a more elegant solution, given * the requirement in the case on AMQQueue and possibly other AMQDestination subclasses to * use the underlying queue name even where it is server generated. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 4ffec6fb41..832c312634 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -212,16 +212,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer //i.e. it is only valid to call this method if // - // (a) the session is stopped, in which case the dispatcher is not running + // (a) the connection is stopped, in which case the dispatcher is not running // OR // (b) the listener is null AND we are not receiving synchronously at present // - if (_session.isStopped()) + if (!_session.getAMQConnection().started()) { _messageListener.set(messageListener); _session.setHasMessageListeners(); - _session.startDistpatcherIfNecessary(); if (_logger.isDebugEnabled()) { @@ -248,7 +247,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer synchronized (_session) { - _messageListener.set(messageListener); _session.setHasMessageListeners(); _session.startDistpatcherIfNecessary(); @@ -329,12 +327,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { - _session.startDistpatcherIfNecessary(); checkPreConditions(); acquireReceiving(); + _session.startDistpatcherIfNecessary(); + try { if (closeOnAutoClose()) @@ -385,12 +384,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receiveNoWait() throws JMSException { - _session.startDistpatcherIfNecessary(); - checkPreConditions(); acquireReceiving(); + _session.startDistpatcherIfNecessary(); + try { if (closeOnAutoClose()) @@ -560,7 +559,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - //This shouldn't be possible. _synchronousQueue.put(jmsMessage); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 4426a7deee..f56fc40360 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -388,9 +388,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } else { - //TODO; Do we really want to create an empty message here ? - newMessage = (AbstractJMSMessage) _session.createMessage(); - return new MessageConverter(newMessage).getConvertedMessage(); + newMessage = new MessageConverter(message).getConvertedMessage(); } if (newMessage != null) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index 3086e5b90a..a1e2640f21 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -269,7 +269,7 @@ public final class JMSHeaderAdapter s = String.valueOf(o);
}
}
- }
+ }//else return s // null;
}
return s;
@@ -508,16 +508,16 @@ public final class JMSHeaderAdapter // JMS invalid names
if ((propertyName.equals("NULL")
- || propertyName.equals("TRUE")
- || propertyName.equals("FALSE")
- || propertyName.equals("NOT")
- || propertyName.equals("AND")
- || propertyName.equals("OR")
- || propertyName.equals("BETWEEN")
- || propertyName.equals("LIKE")
- || propertyName.equals("IN")
- || propertyName.equals("IS")
- || propertyName.equals("ESCAPE")))
+ || propertyName.equals("TRUE")
+ || propertyName.equals("FALSE")
+ || propertyName.equals("NOT")
+ || propertyName.equals("AND")
+ || propertyName.equals("OR")
+ || propertyName.equals("BETWEEN")
+ || propertyName.equals("LIKE")
+ || propertyName.equals("IN")
+ || propertyName.equals("IS")
+ || propertyName.equals("ESCAPE")))
{
throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java index 58089f595b..6bb9b9912b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java @@ -25,7 +25,8 @@ import org.apache.log4j.Logger; import javax.jms.*; import java.util.Enumeration; -public class MessageConverter { +public class MessageConverter +{ /** * Log4J logger @@ -118,6 +119,16 @@ public class MessageConverter { setMessageProperties(message); } + public MessageConverter(Message message) throws JMSException + { + //Send a message with just properties. + // Throwing away content + BytesMessage nativeMessage = new JMSBytesMessage(); + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + public AbstractJMSMessage getConvertedMessage() { return _newMessage; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 7f42faccb8..ca9a1b88f3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -225,6 +225,7 @@ public class AMQStateManager implements AMQMethodListener } if(_currentState != s) { + _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); } } diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java index cdb00240b6..5ab5722146 100644 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java +++ b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java @@ -123,7 +123,7 @@ public class PropertiesFileInitialContextFactoryTest extends TestCase try { AMQQueue queue = (AMQQueue) ctx.lookup("MyQueue"); - assertEquals("example.MyQueue", queue.getRoutingKey()); + assertEquals("example.MyQueue", queue.getRoutingKey().toString()); } catch (NamingException ne) { @@ -133,7 +133,7 @@ public class PropertiesFileInitialContextFactoryTest extends TestCase try { AMQTopic topic = (AMQTopic) ctx.lookup("ibmStocks"); - assertEquals("stocks.nyse.ibm", topic.getTopicName()); + assertEquals("stocks.nyse.ibm", topic.getTopicName().toString()); } catch (Exception ne) { @@ -143,7 +143,7 @@ public class PropertiesFileInitialContextFactoryTest extends TestCase try { AMQQueue direct = (AMQQueue) ctx.lookup("direct"); - assertEquals("directQueue", direct.getRoutingKey()); + assertEquals("directQueue", direct.getRoutingKey().toString()); } catch (NamingException ne) { diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java deleted file mode 100644 index db871281bf..0000000000 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.jndi.referenceabletest; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NameAlreadyBoundException; -import javax.naming.NamingException; -import javax.naming.NoInitialContextException; -import java.io.File; -import java.util.Hashtable; - -import junit.framework.TestCase; - -/** - * Usage: To run these you need to have the sun JNDI SPI for the FileSystem. - * This can be downloaded from sun here: - * http://java.sun.com/products/jndi/downloads/index.html - * Click : Download JNDI 1.2.1 & More button - * Download: File System Service Provider, 1.2 Beta 3 - * and add the two jars in the lib dir to your class path. - * <p/> - * Also you need to create the directory /temp/qpid-jndi-test - */ -class Bind extends TestCase -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest" + System.currentTimeMillis(); - public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; - public String PROVIDER_URL = DEFAULT_PROVIDER_URL; - - String _connectionFactoryString = ""; - - String _connectionString = "amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'"; - Topic _topic = null; - - boolean _bound = false; - - public Bind() throws NameAlreadyBoundException, NoInitialContextException - { - this(false, DEFAULT_PROVIDER_URL); - } - - public Bind(boolean output) throws NameAlreadyBoundException, NoInitialContextException - { - this(output, DEFAULT_PROVIDER_URL); - } - - public Bind(boolean output, String providerURL) throws NameAlreadyBoundException, NoInitialContextException - { - PROVIDER_URL = providerURL; - - // Set up the environment for creating the initial context - Hashtable env = new Hashtable(11); - env.put(Context.INITIAL_CONTEXT_FACTORY, - "com.sun.jndi.fscontext.RefFSContextFactory"); - - - env.put(Context.PROVIDER_URL, PROVIDER_URL); - - - File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3)); - - if (file.exists() && !file.isDirectory()) - { - System.out.println("Couldn't make directory file already exists"); - return; - } - else - { - if (!file.exists()) - { - if (!file.mkdirs()) - { - System.out.println("Couldn't make directory"); - return; - } - } - } - - Connection connection = null; - try - { - // Create the initial context - Context ctx = new InitialContext(env); - - // Create the connection factory to be bound - ConnectionFactory connectionFactory = null; - // Create the Connection to be bound - - - try - { - connectionFactory = new AMQConnectionFactory(_connectionString); - connection = connectionFactory.createConnection(); - - _connectionFactoryString = ((AMQConnectionFactory) connectionFactory).getConnectionURL().getURL(); - } - catch (JMSException jmsqe) - { - fail("Unable to create Connection:" + jmsqe); - } - catch (URLSyntaxException urlse) - { - fail("Unable to create Connection:" + urlse); - } - - try - { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _topic = session.createTopic("Fruity"); - } - catch (JMSException jmse) - { - - } - // Perform the binds - ctx.bind("ConnectionFactory", connectionFactory); - if (output) - { - System.out.println("Bound factory\n" + ((AMQConnectionFactory) connectionFactory).getConnectionURL()); - } - ctx.bind("Connection", connection); - if (output) - { - System.out.println("Bound Connection\n" + ((AMQConnection) connection).toURL()); - } - ctx.bind("Topic", _topic); - if (output) - { - System.out.println("Bound Topic:\n" + ((AMQTopic) _topic).toURL()); - } - _bound = true; - - // Check that it is bound - //Object obj = ctx.lookup("Connection"); - //System.out.println(((AMQConnection)obj).toURL()); - - // Close the context when we're done - ctx.close(); - } - catch (NamingException e) - { - System.out.println("Operation failed: " + e); - if (e instanceof NameAlreadyBoundException) - { - throw(NameAlreadyBoundException) e; - } - - if (e instanceof NoInitialContextException) - { - throw(NoInitialContextException) e; - } - } - finally - { - try - { - if (connection != null) - { - connection.close(); - } - } - catch (JMSException e) - { - //ignore just want it closed - } - } - } - - public String connectionFactoryValue() - { - if (_connectionFactoryString != null) - { - return _connectionFactoryString; - } - else - { - return ""; - } - } - - public String connectionValue() - { - if (_connectionString != null) - { - return _connectionString; - } - else - { - return ""; - } - } - - public String topicValue() - { - if (_topic != null) - { - return ((AMQTopic) _topic).toURL(); - } - else - { - return ""; - } - - } - - public boolean bound() - { - return _bound; - } - - public String getProviderURL() - { - return PROVIDER_URL; - } - - public static void main(String[] args) throws NameAlreadyBoundException, NoInitialContextException - { - new Bind(true); - } -} diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java deleted file mode 100644 index 9fc186f19a..0000000000 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.jndi.referenceabletest; - -import junit.framework.TestCase; -//import org.apache.qpid.testutil.VMBrokerSetup; - -import javax.naming.NameAlreadyBoundException; -import javax.naming.NoInitialContextException; - -/** - * Usage: To run these you need to have the sun JNDI SPI for the FileSystem. - * This can be downloaded from sun here: - * http://java.sun.com/products/jndi/downloads/index.html - * Click : Download JNDI 1.2.1 & More button - * Download: File System Service Provider, 1.2 Beta 3 - * and add the two jars in the lib dir to your class path. - * <p/> - * Also you need to create the directory /temp/qpid-jndi-test - */ -public class JNDIReferenceableTest extends TestCase -{ -/* // FIXME FSContext has been removed from repository. This needs redone with the PropertiesFileInitialContextFactory. QPID-84 - public void testReferenceable() - { - Bind b = null; - try - { - try - { - b = new Bind(); - } - catch (NameAlreadyBoundException e) - { - if (new Unbind().unbound()) - { - try - { - b = new Bind(); - } - catch (NameAlreadyBoundException ee) - { - fail("Unable to clear bound objects for test."); - } - } - else - { - fail("Unable to clear bound objects for test."); - } - } - } - catch (NoInitialContextException e) - { - fail("You don't have the File System SPI on you class path.\n" + - "This can be downloaded from sun here:\n" + - "http://java.sun.com/products/jndi/downloads/index.html\n" + - "Click : Download JNDI 1.2.1 & More button\n" + - "Download: File System Service Provider, 1.2 Beta 3\n" + - "and add the two jars in the lib dir to your class path."); - } - - assertTrue(b.bound()); - - Lookup l = new Lookup(b.getProviderURL()); - - assertTrue(l.connectionFactoryValue().equals(b.connectionFactoryValue())); - - assertTrue(l.connectionValue().equals(b.connectionValue())); - - assertTrue(l.topicValue().equals(b.topicValue())); - - - Unbind u = new Unbind(); - - assertTrue(u.unbound()); - - } - - public static junit.framework.Test suite() - { - return new VMBrokerSetup(new junit.framework.TestSuite(JNDIReferenceableTest.class)); - } - */ -} diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java deleted file mode 100644 index b804ccb30c..0000000000 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.jndi.referenceabletest; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQTopic; - -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import javax.jms.JMSException; -import java.io.File; -import java.util.Hashtable; - - -/** - * Usage: To run these you need to have the sun JNDI SPI for the FileSystem. - * This can be downloaded from sun here: - * http://java.sun.com/products/jndi/downloads/index.html - * Click : Download JNDI 1.2.1 & More button - * Download: File System Service Provider, 1.2 Beta 3 - * and add the two jars in the lib dir to your class path. - * <p/> - * Also you need to create the directory /temp/qpid-jndi-test - */ -class Lookup -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest"; - public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; - public String PROVIDER_URL = DEFAULT_PROVIDER_URL; - - AMQTopic _topic = null; - AMQConnection _connection = null; - AMQConnectionFactory _connectionFactory = null; - private String _connectionURL; - - - public Lookup() - { - this(DEFAULT_PROVIDER_URL); - } - - public Lookup(String providerURL) - { - - PROVIDER_URL = providerURL; - - // Set up the environment for creating the initial context - Hashtable env = new Hashtable(11); - env.put(Context.INITIAL_CONTEXT_FACTORY, - "com.sun.jndi.fscontext.RefFSContextFactory"); - - env.put(Context.PROVIDER_URL, PROVIDER_URL); - - File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3)); - - if (file.exists() && !file.isDirectory()) - { - System.out.println("Couldn't make directory file already exists"); - return; - } - else - { - if (!file.exists()) - { - if (!file.mkdirs()) - { - System.out.println("Couldn't make directory"); - return; - } - } - } - - try - { - // Create the initial context - Context ctx = new InitialContext(env); - - _topic = (AMQTopic) ctx.lookup("Topic"); - - _connection = (AMQConnection) ctx.lookup("Connection"); - - _connectionURL = _connection.toURL(); - - _connectionFactory = (AMQConnectionFactory) ctx.lookup("ConnectionFactory"); - //System.out.println(topic); - - // Close the context when we're done - ctx.close(); - } - catch (NamingException e) - { - System.out.println("Operation failed: " + e); - } - finally - { - try - { - if (_connection != null) - { - _connection.close(); - } - } - catch (JMSException e) - { - //ignore just need to close - } - } - } - - public String connectionFactoryValue() - { - if (_connectionFactory != null) - { - return _connectionFactory.getConnectionURL().toString(); - } - return ""; - } - - public String connectionValue() - { - if (_connectionURL != null) - { - return _connectionURL; - } - return ""; - } - - public String topicValue() - { - if (_topic != null) - { - return _topic.toURL(); - } - return ""; - } - - public String getProviderURL() - { - return PROVIDER_URL; - } - - public static void main(String[] args) - { - new Lookup(); - } -} - diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java deleted file mode 100644 index 869bc55d8f..0000000000 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.jndi.referenceabletest; - -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NameNotFoundException; -import javax.naming.NamingException; -import javax.jms.Connection; -import javax.jms.JMSException; -import java.io.File; -import java.util.Hashtable; - -/** - * Usage: To run these you need to have the sun JNDI SPI for the FileSystem. - * This can be downloaded from sun here: - * http://java.sun.com/products/jndi/downloads/index.html - * Click : Download JNDI 1.2.1 & More button - * Download: File System Service Provider, 1.2 Beta 3 - * and add the two jars in the lib dir to your class path. - * <p/> - * Also you need to create the directory /temp/qpid-jndi-test - */ -class Unbind -{ - public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest" + System.currentTimeMillis(); - public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH; - public String PROVIDER_URL = DEFAULT_PROVIDER_URL; - - boolean _unbound = false; - - public Unbind() - { - this(false, DEFAULT_PROVIDER_URL); - } - - public Unbind(Boolean output) - { - this(output, DEFAULT_PROVIDER_URL); - } - - public Unbind(String provider) - { - this(false, provider); - } - - public Unbind(boolean output, String providerURL) - { - PROVIDER_URL = providerURL; - // Set up the environment for creating the initial context - Hashtable env = new Hashtable(11); - env.put(Context.INITIAL_CONTEXT_FACTORY, - "com.sun.jndi.fscontext.RefFSContextFactory"); - env.put(Context.PROVIDER_URL, PROVIDER_URL); - - File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3)); - - if (file.exists() && !file.isDirectory()) - { - System.out.println("Couldn't make directory file already exists"); - return; - } - else - { - if (!file.exists()) - { - if (!file.mkdirs()) - { - System.out.println("Couldn't make directory"); - return; - } - } - } - - try - { - // Create the initial context - Context ctx = new InitialContext(env); - - // Remove the binding - ctx.unbind("ConnectionFactory"); - ctx.unbind("Connection"); - ctx.unbind("Topic"); - - // Check that it is gone - Object obj = null; - try - { - obj = ctx.lookup("ConnectionFactory"); - } - catch (NameNotFoundException ne) - { - if (output) - { - System.out.println("unbind ConnectionFactory successful"); - } - try - { - obj = ctx.lookup("Connection"); - try - { - ((Connection) obj).close(); - } - catch (JMSException e) - { - //ignore just need to close - } - } - catch (NameNotFoundException ne2) - { - if (output) - { - System.out.println("unbind Connection successful"); - } - - try - { - obj = ctx.lookup("Topic"); - } - catch (NameNotFoundException ne3) - { - if (output) - { - System.out.println("unbind Topic successful"); - } - _unbound = true; - } - } - } - - //System.out.println("unbind failed; object still there: " + obj); - - // Close the context when we're done - - ctx.close(); - - } - catch (NamingException e) - { - System.out.println("Operation failed: " + e); - } - } - - public boolean unbound() - { - return _unbound; - } - - public static void main(String[] args) - { - - new Unbind(true); - } -} - diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java new file mode 100644 index 0000000000..165059946c --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue + * <p/> + * The message delivery process: + * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s + * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start + * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a + * session can run in any order and a synchronous put/poll will block the dispatcher). + * <p/> + * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered + * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class DispatcherTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(DispatcherTest.class); + + Context _context; + + private static final int MSG_COUNT = 6; + private int _receivedCount = 0; + private int _receivedCountWhileStopped = 0; + private Connection _clientConnection, _producerConnection; + private MessageConsumer _consumer; + MessageProducer _producer; + Session _clientSession, _producerSession; + + private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1); //all messages Sent Lock + private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1); //all messages Sent Lock + + private volatile boolean _connectionStopped = false; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'"); + env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client 1 + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer = _clientSession.createConsumer(queue); + + //Create Producer + _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _producerConnection.start(); + + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producer = _producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(_producerSession.createTextMessage("Message " + msg)); + } + + } + + protected void tearDown() throws Exception + { + + _clientConnection.close(); + + _producerConnection.close(); + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testAsynchronousRecieve() + { + + _logger.info("Test Start"); + + + assertTrue(!((AMQConnection) _clientConnection).started()); + + //Set default Message Listener + try + { + _consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message); + + _receivedCount++; + + if (_receivedCount == MSG_COUNT) + { + _allFirstMessagesSent.countDown(); + } + + if (_connectionStopped) + { + _logger.info("Running with Message:" + _receivedCount); + } + + if (_connectionStopped && _allFirstMessagesSent.getCount() == 0) + { + _receivedCountWhileStopped++; + } + + if (_allFirstMessagesSent.getCount() == 0) + { + if (_receivedCount == MSG_COUNT * 2) + { + _allSecondMessagesSent.countDown(); + } + } + } + }); + + assertTrue("Connecion should not be started", !((AMQConnection) _clientConnection).started()); + _clientConnection.start(); + } + catch (JMSException e) + { + _logger.error("Error Setting Default ML on consumer1"); + } + + + try + { + _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + + try + { + assertTrue("Connecion should be started", ((AMQConnection) _clientConnection).started()); + _clientConnection.stop(); + _connectionStopped = true; + } + catch (JMSException e) + { + _logger.error("Error stopping connection"); + } + + + try + { + _logger.error("Send additional messages"); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(_producerSession.createTextMessage("Message " + msg)); + } + } + catch (JMSException e) + { + _logger.error("Unable to send additional messages", e); + } + + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) + { + //ignore + } + + try + { + _logger.info("Restarting connection"); + + _connectionStopped = false; + _clientConnection.start(); + } + catch (JMSException e) + { + _logger.error("Error Setting Better ML on consumer1", e); + } + + + _logger.info("Waiting upto 2 seconds for messages"); + + try + { + _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + + assertEquals("Messages not received correctly", 0, _allFirstMessagesSent.getCount()); + assertEquals("Messages not received correctly", 0, _allSecondMessagesSent.getCount()); + assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount); + assertEquals("Messages received while stopped is not 0", 0, _receivedCountWhileStopped); + + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(DispatcherTest.class); + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java new file mode 100644 index 0000000000..28bb2b614b --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue + * <p/> + * The message delivery process: + * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s + * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start + * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a + * session can run in any order and a synchronous put/poll will block the dispatcher). + * <p/> + * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered + * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class ResetMessageListenerTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(ResetMessageListenerTest.class); + + Context _context; + + private static final int MSG_COUNT = 6; + private int receivedCount1ML1 = 0; + private int receivedCount1ML2 = 0; + private int receivedCount2 = 0; + private Connection _clientConnection, _producerConnection; + private MessageConsumer _consumer1; + private MessageConsumer _consumer2; + MessageProducer _producer; + Session _clientSession, _producerSession; + + private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(2); //all messages Sent Lock + private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(2); //all messages Sent Lock + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'"); + env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client 1 + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer1 = _clientSession.createConsumer(queue); + + //Create Client 2 on same session + _consumer2 = _clientSession.createConsumer(queue); + + //Create Producer + _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _producerConnection.start(); + + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producer = _producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(_producerSession.createTextMessage("Message " + msg)); + } + + } + + protected void tearDown() throws Exception + { + assertEquals("First batch of messages not received correctly", 0, _allFirstMessagesSent.getCount()); + assertEquals("Second batch of messages not received correctly", 0, _allSecondMessagesSent.getCount()); + assertEquals("Client 1 ML1 didn't get all messages", MSG_COUNT / 2, receivedCount1ML1); + assertEquals("Client 2 didn't get all messages", MSG_COUNT, receivedCount2); + assertEquals("Client 1 ML2 didn't get all messages", MSG_COUNT / 2, receivedCount1ML2); + + _clientConnection.close(); + + _producerConnection.close(); + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testAsynchronousRecieve() + { + + _logger.info("Test Start"); + + //Set default Message Listener + try + { + _consumer1.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 1 ML 1 Received Message(" + receivedCount1ML1 + "):" + message); + + receivedCount1ML1++; + if (receivedCount1ML1 == MSG_COUNT / 2) + { + _allFirstMessagesSent.countDown(); + } + } + }); + } + catch (JMSException e) + { + _logger.error("Error Setting Default ML on consumer1"); + } + + + try + { + _consumer2.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message); + + receivedCount2++; + if (receivedCount2 == MSG_COUNT / 2) + { + _logger.info("Client 2 received all its messages1"); + _allFirstMessagesSent.countDown(); + } + + if (receivedCount2 == MSG_COUNT) + { + _logger.info("Client 2 received all its messages2"); + _allSecondMessagesSent.countDown(); + } + } + }); + + _clientConnection.start(); + } + catch (JMSException e) + { + _logger.error("Error Setting Default ML on consumer2"); + + } + + + try + { + _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS); + _logger.info("Received first batch of messages"); + } + catch (InterruptedException e) + { + //do nothing + } + + try + { + _clientConnection.stop(); + } + catch (JMSException e) + { + _logger.error("Error stopping connection"); + } + + _logger.info("Reset Message Listener to better listener while connection stopped, will restart session"); + try + { + _consumer1.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 1 ML2 Received Message(" + receivedCount1ML1 + "):" + message); + + receivedCount1ML2++; + if (receivedCount1ML2 == MSG_COUNT / 2) + { + _allSecondMessagesSent.countDown(); + } + } + }); + + _clientConnection.start(); + } + catch (javax.jms.IllegalStateException e) + { + _logger.error("Connection not stopped while setting ML", e); + fail("Unable to change message listener:" + e.getCause()); + } + catch (JMSException e) + { + _logger.error("Error Setting Better ML on consumer1", e); + } + + try + { + _logger.error("Send additional messages"); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(_producerSession.createTextMessage("Message " + msg)); + } + } + catch (JMSException e) + { + _logger.error("Unable to send additional messages", e); + } + + _logger.info("Waiting upto 2 seconds for messages"); + + try + { + _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(ResetMessageListenerTest.class); + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java b/qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java index f7bea1b36a..c3434164d8 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java @@ -24,7 +24,7 @@ import javax.jms.*; import java.util.Enumeration; import java.io.Serializable; -public class TestNonQpidTextMessage implements ObjectMessage { +public class NonQpidObjectMessage implements ObjectMessage { private JMSObjectMessage _realMessage; private String _contentString; @@ -34,7 +34,7 @@ public class TestNonQpidTextMessage implements ObjectMessage { * does not inherit from the Qpid message superclasses * and expand our unit testing of MessageConverter et al */ - public TestNonQpidTextMessage() + public NonQpidObjectMessage() { _realMessage = new JMSObjectMessage(); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java new file mode 100644 index 0000000000..3287314a44 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java @@ -0,0 +1,165 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQAuthenticationException; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.message.AMQMessage; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.AMQConnectionFailureException; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class ConnectionStartTest extends TestCase +{ + + String _broker = "vm://:1"; + + AMQConnection _connection; + private Session _consumerSess; + private MessageConsumer _consumer; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + try + { + AMQQueue queue = new AMQQueue("ConnectionStartTest"); + + AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + + Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + + MessageProducer pub = pubSess.createProducer(queue); + + pub.send(pubSess.createTextMessage("Initial Message")); + + _connection = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + + _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + + _consumer = _consumerSess.createConsumer(queue); + + pubCon.close(); + + } + catch (Exception e) + { + fail("Connection to " + _broker + " should succeed. Reason: " + e); + } + } + + protected void tearDown() throws Exception + { + _connection.close(); + TransportConnection.killVMBroker(1); + } + + public void testSimpleReceiveConnection() + { + try + { + assertTrue("Connection should not be started", !_connection.started()); + //Note that this next line will start the dispatcher in the session + // should really not be called before _connection start + assertTrue("There should not be messages waiting for the consumer", _consumer.receiveNoWait() == null); + _connection.start(); + assertTrue("There should be messages waiting for the consumer", _consumer.receiveNoWait() == null); + assertTrue("Connection should be started", _connection.started()); + + } + catch (JMSException e) + { + fail("An error occured during test because:" + e); + } + + } + + public void testMessageListenerConnection() + { + final CountDownLatch _gotMessage = new CountDownLatch(1); + + try + { + assertTrue("Connection should not be started", !_connection.started()); + _consumerSess.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + try + { + assertTrue("Connection should be started", _connection.started()); + assertEquals("Mesage Received", "Initial Message", ((TextMessage) message).getText()); + _gotMessage.countDown(); + } + catch (JMSException e) + { + fail("Couldn't get message text because:" + e.getCause()); + } + } + }); + + assertTrue("Connection should not be started", !_connection.started()); + _connection.start(); + assertTrue("Connection should be started", _connection.started()); + + try + { + _gotMessage.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + fail("Timed out awaiting message via onMessage"); + } + + } + catch (JMSException e) + { + fail("Failed because:" + e.getCause()); + } + + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(ConnectionStartTest.class); + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java index c09d2504eb..2d61ceb00f 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -26,7 +26,7 @@ import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.message.TestNonQpidTextMessage; +import org.apache.qpid.client.message.NonQpidObjectMessage; import org.apache.qpid.framing.AMQShortString; import javax.jms.*; @@ -71,7 +71,7 @@ public class JMSPropertiesTest extends TestCase MessageProducer producer = producerSession.createProducer(queue); //create a test message to send - ObjectMessage sentMsg = new TestNonQpidTextMessage(); + ObjectMessage sentMsg = new NonQpidObjectMessage(); sentMsg.setJMSCorrelationID(JMS_CORR_ID); sentMsg.setJMSDeliveryMode(JMS_DELIV_MODE); sentMsg.setJMSType(JMS_TYPE); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java index 6a335b8627..a8a5c7d8b2 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java @@ -31,10 +31,12 @@ import javax.jms.Message; import javax.jms.Destination; import javax.jms.TextMessage; import javax.jms.MapMessage; +import javax.jms.JMSException; import java.util.HashMap; -public class MessageConverterTest extends TestCase { +public class MessageConverterTest extends TestCase +{ public static final String JMS_CORR_ID = "QPIDID_01"; public static final int JMS_DELIV_MODE = 1; @@ -50,53 +52,79 @@ public class MessageConverterTest extends TestCase { super.setUp(); testTextMessage = new JMSTextMessage(); - //Add JMSProperties - testTextMessage.setJMSCorrelationID(JMS_CORR_ID); - testTextMessage.setJMSDeliveryMode(JMS_DELIV_MODE); - testTextMessage.setJMSType(JMS_TYPE); - testTextMessage.setJMSReplyTo(JMS_REPLY_TO); + //Set Message Text testTextMessage.setText("testTextMessage text"); - - //Add non-JMS properties - testTextMessage.setStringProperty("testProp1","testValue1"); - testTextMessage.setDoubleProperty("testProp2",Double.MIN_VALUE); + setMessageProperties(testTextMessage); testMapMessage = new JMSMapMessage(); - testMapMessage.setString("testMapString","testMapStringValue"); - testMapMessage.setDouble("testMapDouble",Double.MAX_VALUE); + testMapMessage.setString("testMapString", "testMapStringValue"); + testMapMessage.setDouble("testMapDouble", Double.MAX_VALUE); } public void testSetProperties() throws Exception { - AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage(); - - //check JMS prop values on newMessage match - assertEquals("JMS Correlation ID mismatch",testTextMessage.getJMSCorrelationID(),newMessage.getJMSCorrelationID()); - assertEquals("JMS Delivery mode mismatch",testTextMessage.getJMSDeliveryMode(),newMessage.getJMSDeliveryMode()); - assertEquals("JMS Type mismatch",testTextMessage.getJMSType(),newMessage.getJMSType()); - assertEquals("JMS Reply To mismatch",testTextMessage.getJMSReplyTo(),newMessage.getJMSReplyTo()); - - //check non-JMS standard props ok too - assertEquals("Test String prop value mismatch",testTextMessage.getStringProperty("testProp1"), - newMessage.getStringProperty("testProp1")); - assertEquals("Test Double prop value mismatch",testTextMessage.getDoubleProperty("testProp2"), - newMessage.getDoubleProperty("testProp2")); + AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage(); + mesagePropertiesTest(testTextMessage, newMessage); } public void testJMSTextMessageConversion() throws Exception { - AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage(); - assertEquals("Converted message text mismatch",((JMSTextMessage)newMessage).getText(),testTextMessage.getText()); + AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage(); + assertEquals("Converted message text mismatch", ((JMSTextMessage) newMessage).getText(), testTextMessage.getText()); } public void testJMSMapMessageConversion() throws Exception { - AbstractJMSMessage newMessage = new MessageConverter((MapMessage)testMapMessage).getConvertedMessage(); - assertEquals("Converted map message String mismatch",((JMSMapMessage)newMessage).getString("testMapString"), - testMapMessage.getString("testMapString")); - assertEquals("Converted map message Double mismatch",((JMSMapMessage)newMessage).getDouble("testMapDouble"), - testMapMessage.getDouble("testMapDouble")); + AbstractJMSMessage newMessage = new MessageConverter((MapMessage) testMapMessage).getConvertedMessage(); + assertEquals("Converted map message String mismatch", ((JMSMapMessage) newMessage).getString("testMapString"), + testMapMessage.getString("testMapString")); + assertEquals("Converted map message Double mismatch", ((JMSMapMessage) newMessage).getDouble("testMapDouble"), + testMapMessage.getDouble("testMapDouble")); + + } + + public void testMessageConversion() throws Exception + { + Message newMessage = new NonQpidMessage(); + setMessageProperties(newMessage); + mesagePropertiesTest(testTextMessage, newMessage); + } + + private void setMessageProperties(Message message) throws JMSException + { + message.setJMSCorrelationID(JMS_CORR_ID); + message.setJMSDeliveryMode(JMS_DELIV_MODE); + message.setJMSType(JMS_TYPE); + message.setJMSReplyTo(JMS_REPLY_TO); + //Add non-JMS properties + message.setStringProperty("testProp1", "testValue1"); + message.setDoubleProperty("testProp2", Double.MIN_VALUE); + } + + + private void mesagePropertiesTest(Message expectedMessage, Message actualMessage) + { + try + { + //check JMS prop values on newMessage match + assertEquals("JMS Correlation ID mismatch", expectedMessage.getJMSCorrelationID(), actualMessage.getJMSCorrelationID()); + assertEquals("JMS Delivery mode mismatch", expectedMessage.getJMSDeliveryMode(), actualMessage.getJMSDeliveryMode()); + assertEquals("JMS Type mismatch", expectedMessage.getJMSType(), actualMessage.getJMSType()); + assertEquals("JMS Reply To mismatch", expectedMessage.getJMSReplyTo(), actualMessage.getJMSReplyTo()); + + //check non-JMS standard props ok too + assertEquals("Test String prop value mismatch", expectedMessage.getStringProperty("testProp1"), + actualMessage.getStringProperty("testProp1")); + + assertEquals("Test Double prop value mismatch", expectedMessage.getDoubleProperty("testProp2"), + actualMessage.getDoubleProperty("testProp2")); + } + catch (JMSException e) + { + fail("An error occured testing the property values" + e.getCause()); + e.printStackTrace(); + } } protected void tearDown() throws Exception diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java new file mode 100644 index 0000000000..e992290513 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.message; + +import javax.jms.Message; +import javax.jms.JMSException; +import javax.jms.Destination; +import java.util.Enumeration; +import java.util.Hashtable; + +public class NonQpidMessage implements Message +{ + private String _JMSMessageID; + private long _JMSTimestamp; + private byte[] _JMSCorrelationIDAsBytes; + private String _JMSCorrelationID; + private Destination _JMSReplyTo; + private Destination _JMSDestination; + private int _JMSDeliveryMode; + private boolean _JMSRedelivered; + private String _JMSType; + private long _JMSExpiration; + private int _JMSPriority; + private Hashtable _properties; + + public NonQpidMessage() + { + _properties = new Hashtable(); + _JMSPriority = javax.jms.Message.DEFAULT_PRIORITY; + _JMSDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE; + } + + public String getJMSMessageID() throws JMSException + { + return _JMSMessageID; + } + + public void setJMSMessageID(String string) throws JMSException + { + _JMSMessageID = string; + } + + public long getJMSTimestamp() throws JMSException + { + return _JMSTimestamp; + } + + public void setJMSTimestamp(long l) throws JMSException + { + _JMSTimestamp = l; + } + + public byte[] getJMSCorrelationIDAsBytes() throws JMSException + { + return _JMSCorrelationIDAsBytes; + } + + public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException + { + _JMSCorrelationIDAsBytes = bytes; + } + + public void setJMSCorrelationID(String string) throws JMSException + { + _JMSCorrelationID = string; + } + + public String getJMSCorrelationID() throws JMSException + { + return _JMSCorrelationID; + } + + public Destination getJMSReplyTo() throws JMSException + { + return _JMSReplyTo; + } + + public void setJMSReplyTo(Destination destination) throws JMSException + { + _JMSReplyTo = destination; + } + + public Destination getJMSDestination() throws JMSException + { + return _JMSDestination; + } + + public void setJMSDestination(Destination destination) throws JMSException + { + _JMSDestination = destination; + } + + public int getJMSDeliveryMode() throws JMSException + { + return _JMSDeliveryMode; + } + + public void setJMSDeliveryMode(int i) throws JMSException + { + _JMSDeliveryMode = i; + } + + public boolean getJMSRedelivered() throws JMSException + { + return _JMSRedelivered; + } + + public void setJMSRedelivered(boolean b) throws JMSException + { + _JMSRedelivered = b; + } + + public String getJMSType() throws JMSException + { + return _JMSType; + } + + public void setJMSType(String string) throws JMSException + { + _JMSType = string; + } + + public long getJMSExpiration() throws JMSException + { + return _JMSExpiration; + } + + public void setJMSExpiration(long l) throws JMSException + { + _JMSExpiration = l; + } + + public int getJMSPriority() throws JMSException + { + return _JMSPriority; + } + + public void setJMSPriority(int i) throws JMSException + { + _JMSPriority = i; + } + + public void clearProperties() throws JMSException + { + _properties.clear(); + } + + public boolean propertyExists(String string) throws JMSException + { + return _properties.containsKey(string); + } + + public boolean getBooleanProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Boolean) + { + return (Boolean) o; + } + else + { + return Boolean.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public byte getByteProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Byte) + { + return (Byte) o; + } + else + { + return Byte.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public short getShortProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Short) + { + return (Short) o; + } + else + { + return Short.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public int getIntProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Integer) + { + return (Integer) o; + } + else + { + return Integer.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public long getLongProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Long) + { + return (Long) o; + } + else + { + return Long.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public float getFloatProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Float) + { + return (Float) o; + } + else + { + return Float.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public double getDoubleProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Double) + { + return (Double) o; + } + else + { + return Double.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public String getStringProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof String) + { + return (String) o; + } + else + { + return null; + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public Object getObjectProperty(String string) throws JMSException + { + if (propertyExists(string)) + { + Object o = _properties.get(string); + if (o instanceof Boolean) + { + return (Boolean) o; + } + else + { + return Boolean.valueOf(null); + } + } + else + { + throw new JMSException("property does not exist: " + string); + } + } + + public Enumeration getPropertyNames() throws JMSException + { + return _properties.keys(); + } + + public void setBooleanProperty(String string, boolean b) throws JMSException + { + _properties.put(string, b); + } + + public void setByteProperty(String string, byte b) throws JMSException + { + _properties.put(string, b); + } + + public void setShortProperty(String string, short i) throws JMSException + { + _properties.put(string, i); + } + + public void setIntProperty(String string, int i) throws JMSException + { + _properties.put(string, i); + } + + public void setLongProperty(String string, long l) throws JMSException + { + _properties.put(string, l); + } + + public void setFloatProperty(String string, float v) throws JMSException + { + _properties.put(string, v); + } + + public void setDoubleProperty(String string, double v) throws JMSException + { + _properties.put(string, v); + } + + public void setStringProperty(String string, String string1) throws JMSException + { + _properties.put(string, string1); + } + + public void setObjectProperty(String string, Object object) throws JMSException + { + _properties.put(string, object); + } + + public void acknowledge() throws JMSException + { + + } + + public void clearBody() throws JMSException + { + + } +} |
