diff options
Diffstat (limited to 'qpid/java/client')
38 files changed, 1075 insertions, 511 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java index b199d41432..6a7626c51d 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,33 +7,35 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. + * */ package org.apache.qpid.example.publisher; -import org.apache.log4j.Logger; - import java.io.File; +import javax.jms.JMSException; + +import org.apache.log4j.Logger; + import org.apache.qpid.example.shared.FileUtils; import org.apache.qpid.example.shared.Statics; -import javax.jms.JMSException; - /** * Class that sends message files to the Publisher to distribute * using files as input * Must set properties for host in properties file or uses in vm broker */ -public class FileMessageDispatcher { +public class FileMessageDispatcher +{ protected static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class); @@ -48,30 +51,30 @@ public class FileMessageDispatcher { public static void main(String[] args) { - //Check command line args ok - must provide a path or file for us to dispatch + // Check command line args ok - must provide a path or file for us to dispatch if (args.length == 0) { - System.err.println("Usage: FileMessageDispatcher <filesToDispatch>" + ""); + System.out.println("Usage: FileMessageDispatcher <filesToDispatch>" + ""); } else { try { - //publish message(s) from file(s) to configured queue + // publish message(s) from file(s) to configured queue publish(args[0]); - //Move payload file(s) to archive location as no error + // Move payload file(s) to archive location as no error FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH)); } - catch(Exception e) + catch (Exception e) { - //log error and exit + // log error and exit _logger.error("Error trying to dispatch message: " + e); System.exit(1); } finally { - //clean up before exiting + // clean up before exiting if (getPublisher() != null) { getPublisher().cleanup(); @@ -98,10 +101,10 @@ public class FileMessageDispatcher { File tempFile = new File(path); if (tempFile.isDirectory()) { - //while more files in dir publish them + // while more files in dir publish them File[] files = tempFile.listFiles(); - if (files == null || files.length == 0) + if ((files == null) || (files.length == 0)) { _logger.info("FileMessageDispatcher - No files to publish in input directory: " + tempFile); } @@ -109,10 +112,10 @@ public class FileMessageDispatcher { { for (File file : files) { - //Create message factory passing in payload path + // Create message factory passing in payload path FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString()); - //Send the message generated from the payload using the _publisher + // Send the message generated from the payload using the _publisher getPublisher().sendMessage(factory.createEventMessage()); } @@ -120,11 +123,11 @@ public class FileMessageDispatcher { } else { - //handle a single file - //Create message factory passing in payload path - FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(),tempFile.toString()); + // handle a single file + // Create message factory passing in payload path + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), tempFile.toString()); - //Send the message generated from the payload using the _publisher + // Send the message generated from the payload using the _publisher getPublisher().sendMessage(factory.createEventMessage()); } } @@ -145,15 +148,15 @@ public class FileMessageDispatcher { */ private static Publisher getPublisher() { - if (_publisher != null) - { - return _publisher; - } + if (_publisher != null) + { + return _publisher; + } - //Create a _publisher - _publisher = new Publisher(); + // Create a _publisher + _publisher = new Publisher(); - return _publisher; + return _publisher; } } diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java index 88bcbbbccb..f3b21e3c64 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java @@ -47,7 +47,9 @@ public class FileMessageFactory } catch (IOException e) { - throw new MessageFactoryException(e.toString()); + MessageFactoryException mfe = new MessageFactoryException(e.toString()); + mfe.initCause(e); + throw mfe; } } diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java index 8505d1d457..98a2c0d497 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java @@ -59,11 +59,11 @@ public class InitialContextHelper } catch (IOException e) { - throw new ContextException(e.toString()); + throw new ContextException(e.toString(), e); } catch (NamingException n) { - throw new ContextException(n.toString()); + throw new ContextException(n.toString(), n); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 413524b6d8..0e3d99eeba 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,29 @@ */ package org.apache.qpid.client; +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + +import org.apache.log4j.Logger; + import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; @@ -44,28 +67,6 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; - -import javax.jms.*; -import javax.jms.IllegalStateException; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = Logger.getLogger(AMQConnection.class); @@ -95,7 +96,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap + private final Map _sessions = new LinkedHashMap(); // fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap private String _clientName; @@ -125,15 +126,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /* * _Connected should be refactored with a suitable wait object. - */ + */ private boolean _connected; /* * The last error code that occured on the connection. Used to return the correct exception to the client - */ + */ private AMQException _lastAMQException = null; - /* * The connection meta data */ @@ -149,6 +149,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); + private static final long DEFAULT_TIMEOUT = 1000 * 30; /** * @param broker brokerdetails @@ -160,13 +161,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @throws AMQException * @throws URLSyntaxException */ - public AMQConnection(String broker, String username, String password, - String clientName, String virtualHost) throws AMQException, URLSyntaxException + public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) + throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + - (clientName == null ? "" : clientName) + "/" + - virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), null); + this(new AMQConnectionURL( + ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + + AMQBrokerDetails.checkTransport(broker) + "'"), null); } /** @@ -179,44 +180,38 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @throws AMQException * @throws URLSyntaxException */ - public AMQConnection(String broker, String username, String password, - String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + public AMQConnection(String broker, String username, String password, String clientName, String virtualHost, + SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + - (clientName == null ? "" : clientName) + "/" + - virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); + this(new AMQConnectionURL( + ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); } - - public AMQConnection(String host, int port, String username, String password, - String clientName, String virtualHost) throws AMQException, URLSyntaxException + public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) + throws AMQException, URLSyntaxException { this(host, port, false, username, password, clientName, virtualHost, null); } - public AMQConnection(String host, int port, String username, String password, - String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost, + SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { this(host, port, false, username, password, clientName, virtualHost, sslConfig); } - - public AMQConnection(String host, int port, boolean useSSL, String username, String password, - String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, + String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(useSSL ? - ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + - (clientName == null ? "" : clientName) + - virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" - + "," + ConnectionURL.OPTIONS_SSL + "='true'" : - ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + - (clientName == null ? "" : clientName) + - virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" - + "," + ConnectionURL.OPTIONS_SSL + "='false'" - ), sslConfig); + this(new AMQConnectionURL( + useSSL + ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'") + : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig); } public AMQConnection(String connection) throws AMQException, URLSyntaxException @@ -229,13 +224,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL(connection), sslConfig); } - public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); } + _sslConfiguration = sslConfig; if (connectionURL == null) { @@ -249,7 +244,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _password = connectionURL.getPassword(); setVirtualHost(connectionURL.getVirtualHost()); - if (connectionURL.getDefaultQueueExchangeName() != null) { _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName(); @@ -270,7 +264,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName(); } - _failoverPolicy = new FailoverPolicy(connectionURL); _protocolHandler = new AMQProtocolHandler(this); @@ -278,7 +271,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // We are not currently connected _connected = false; - Exception lastException = new Exception(); lastException.initCause(new ConnectException()); @@ -296,7 +288,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (_logger.isInfoEnabled()) { - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause()); + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), + e.getCause()); } } } @@ -322,7 +315,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - if (message == null || message.equals("")) + if ((message == null) || message.equals("")) { message = "Unable to Connect"; } @@ -335,11 +328,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString()); } + e.initCause(lastException); } throw e; } + _connectionMetaData = new QpidConnectionMetaData(this); } @@ -369,6 +364,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { virtualHost = virtualHost.substring(1); } + _virtualHost = virtualHost; } @@ -382,7 +378,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler.attainState(AMQState.CONNECTION_OPEN); _failoverPolicy.attainedConnection(); - //Again this should be changed to a suitable notify + // Again this should be changed to a suitable notify _connected = true; } catch (AMQException e) @@ -401,6 +397,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { makeBrokerConnection(bd); + return true; } catch (Exception e) @@ -409,8 +406,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info("Unable to connect to broker at " + bd); } + attemptReconnection(); } + return false; } @@ -421,6 +420,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); + return true; } catch (Exception e) @@ -436,13 +436,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (_logger.isInfoEnabled()) { - _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails()); + _logger.info(e.getMessage() + ":Unable to connect to broker at " + + _failoverPolicy.getCurrentBrokerDetails()); } } } } - //connection unsuccessful + // connection unsuccessful return false; } @@ -474,14 +475,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK); } - public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, - final int prefetch) throws JMSException + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) + throws JMSException { return createSession(transacted, acknowledgeMode, prefetch, prefetch); } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, - final int prefetchHigh, final int prefetchLow) throws JMSException + final int prefetchHigh, final int prefetchLow) throws JMSException { checkNotClosed(); if (channelLimitReached()) @@ -491,85 +492,81 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect else { return (org.apache.qpid.jms.Session) new FailoverSupport() - { - public Object operation() throws JMSException - { - int channelId = _idFactory.incrementAndGet(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Write channel open frame for channel id " + channelId); - } - - // We must create the session and register it before actually sending the frame to the server to - // open it, so that there is no window where we could receive data on the channel and not be set - // up to handle it appropriately. - AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, - prefetchHigh, prefetchLow); - _protocolHandler.addSessionByChannel(channelId, session); - registerSession(channelId, session); - - boolean success = false; - try - { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - success = true; - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error creating session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - if (!success) - { - _protocolHandler.removeSessionByChannel(channelId); - deregisterSession(channelId); - } - } - - if (_started) { - try - { - session.start(); - } - catch (AMQException e) + public Object operation() throws JMSException { - throw new JMSAMQException(e); + int channelId = _idFactory.incrementAndGet(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Write channel open frame for channel id " + channelId); + } + + // We must create the session and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AMQSession session = + new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, + prefetchLow); + _protocolHandler.addSessionByChannel(channelId, session); + registerSession(channelId, session); + + boolean success = false; + try + { + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + success = true; + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error creating session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + finally + { + if (!success) + { + _protocolHandler.removeSessionByChannel(channelId); + deregisterSession(channelId); + } + } + + if (_started) + { + try + { + session.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + return session; } - } - return session; - } - }.execute(this); + }.execute(this); } } private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) - throws AMQException + throws AMQException { // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - null), // outOfBand - ChannelOpenOkBody.class); - - //todo send low water mark when protocol allows. - //todo Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite( - BasicQosBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - false, // global - prefetchHigh, // prefetchCount - 0), // prefetchSize - BasicQosOkBody.class); + _protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), null), // outOfBand + ChannelOpenOkBody.class); + + // todo send low water mark when protocol allows. + // todo Be aware of possible changes to parameter order as versions change. + _protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), false, // global + prefetchHigh, // prefetchCount + 0), // prefetchSize + BasicQosOkBody.class); if (transacted) { @@ -579,10 +576,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion()), - TxSelectOkBody.class); + _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class); } } @@ -596,11 +591,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _protocolHandler.removeSessionByChannel(channelId); deregisterSession(channelId); - throw new AMQException("Error reopening channel " + channelId + " after failover: " + e); + throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e); } } - public void setFailoverPolicy(FailoverPolicy policy) { _failoverPolicy = policy; @@ -645,12 +639,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private boolean channelLimitReached() { - return _maximumChannelCount != 0 && _sessions.size() == _maximumChannelCount; + return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount); } public String getClientID() throws JMSException { checkNotClosed(); + return _clientName; } @@ -666,6 +661,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ConnectionMetaData getMetaData() throws JMSException { checkNotClosed(); + return _connectionMetaData; } @@ -673,6 +669,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ExceptionListener getExceptionListener() throws JMSException { checkNotClosed(); + return _exceptionListener; } @@ -706,6 +703,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new JMSAMQException(e); } } + _started = true; } } @@ -726,13 +724,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new JMSAMQException(e); } } + _started = false; } } public void close() throws JMSException { - close(-1); + close(DEFAULT_TIMEOUT); } public void close(long timeout) throws JMSException @@ -752,7 +751,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - //adjust timeout + // adjust timeout long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS); @@ -763,7 +762,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - //adjust timeout + // adjust timeout timeout = adjustTimeout(timeout, startCloseTime); _protocolHandler.closeConnection(timeout); @@ -771,7 +770,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } catch (AMQException e) { - throw new JMSException("Error closing connection: " + e); + JMSException jmse = new JMSException("Error closing connection: " + e); + jmse.setLinkedException(e); + throw jmse; } } } @@ -785,6 +786,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { timeout = 0; } + return timeout; } @@ -803,6 +805,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect session.markClosed(); } + _sessions.clear(); } @@ -842,6 +845,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } } + _sessions.clear(); if (sessionException != null) { @@ -850,42 +854,42 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException + ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkNotClosed(); + return null; } - public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException + public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, + int maxMessages) throws JMSException { checkNotClosed(); + return null; } - public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException + public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, + int maxMessages) throws JMSException { checkNotClosed(); + return null; } - public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, - String messageSelector, ServerSessionPool sessionPool, - int maxMessages) - throws JMSException + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws JMSException { // TODO Auto-generated method stub checkNotClosed(); + return null; } public long getMaximumChannelCount() throws JMSException { checkNotClosed(); + return _maximumChannelCount; } @@ -974,6 +978,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { proceed = _connectionListener.preFailover(redirect); } + return proceed; } @@ -994,6 +999,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { markAllSessionsClosed(); } + return resubscribe; } else @@ -1057,12 +1063,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (cause instanceof AMQException) { - je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), "Exception thrown against " + toString() + ": " + cause); + je = + new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), + "Exception thrown against " + toString() + ": " + cause); } else { je = new JMSException("Exception thrown against " + toString() + ": " + cause); } + if (cause instanceof Exception) { je.setLinkedException((Exception) cause); @@ -1090,6 +1099,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info("Closing AMQConnection due to :" + cause.getMessage()); } + _closed.set(true); closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. } @@ -1145,9 +1155,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect buf.append("Host: ").append(String.valueOf(bd.getHost())); buf.append("\nPort: ").append(String.valueOf(bd.getPort())); } + buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost)); buf.append("\nClient ID: ").append(String.valueOf(_clientName)); - buf.append("\nActive session count: ").append(_sessions == null ? 0 : _sessions.size()); + buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size()); + return buf.toString(); } @@ -1158,11 +1170,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public Reference getReference() throws NamingException { - return new Reference( - AMQConnection.class.getName(), - new StringRefAddr(AMQConnection.class.getName(), toURL()), - AMQConnectionFactory.class.getName(), - null); // factory location + return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()), + AMQConnectionFactory.class.getName(), null); // factory location } public SSLConfiguration getSSLConfiguration() @@ -1175,19 +1184,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _defaultTopicExchangeName; } - public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName) { _defaultTopicExchangeName = defaultTopicExchangeName; } - public AMQShortString getDefaultQueueExchangeName() { return _defaultQueueExchangeName; } - public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName) { _defaultQueueExchangeName = defaultQueueExchangeName; @@ -1200,10 +1206,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQShortString getTemporaryQueueExchangeName() { - return _temporaryQueueExchangeName; //To change body of created methods use File | Settings | File Templates. + return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates. } - public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName) { _temporaryTopicExchangeName = temporaryTopicExchangeName; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index 0dcc544ea8..b3fbd1f510 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,12 +20,6 @@ */ package org.apache.qpid.client; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.url.URLHelper; -import org.apache.qpid.url.URLSyntaxException; - import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; @@ -33,6 +27,12 @@ import java.util.LinkedList; import java.util.List; import java.util.StringTokenizer; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + public class AMQConnectionURL implements ConnectionURL { private String _url; @@ -49,7 +49,6 @@ public class AMQConnectionURL implements ConnectionURL private AMQShortString _temporaryTopicExchangeName; private AMQShortString _temporaryQueueExchangeName; - public AMQConnectionURL(String fullURL) throws URLSyntaxException { _url = fullURL; @@ -58,18 +57,18 @@ public class AMQConnectionURL implements ConnectionURL _failoverOptions = new HashMap<String, String>(); // Connection URL format - //amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''" + // amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''" // Options are of course optional except for requiring a single broker in the broker list. try { URI connection = new URI(fullURL); - if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL))) + if ((connection.getScheme() == null) || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL))) { throw new URISyntaxException(fullURL, "Not an AMQP URL"); } - if (connection.getHost() == null || connection.getHost().equals("")) + if ((connection.getHost() == null) || connection.getHost().equals("")) { String uid = AMQConnectionFactory.getUniqueClientID(); if (uid == null) @@ -91,7 +90,7 @@ public class AMQConnectionURL implements ConnectionURL if (userInfo == null) { - //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs + // Fix for Java 1.5 which doesn't parse UserInfo for non http URIs userInfo = connection.getAuthority(); if (userInfo != null) @@ -112,16 +111,16 @@ public class AMQConnectionURL implements ConnectionURL if (userInfo == null) { - throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, - "User information not found on url", fullURL); + throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, "User information not found on url", fullURL); } else { parseUserInfo(userInfo); } + String virtualHost = connection.getPath(); - if (virtualHost != null && (!virtualHost.equals(""))) + if ((virtualHost != null) && (!virtualHost.equals(""))) { setVirtualHost(virtualHost); } @@ -130,7 +129,7 @@ public class AMQConnectionURL implements ConnectionURL int authLength = connection.getAuthority().length(); int start = AMQ_PROTOCOL.length() + 3; int testIndex = start + authLength; - if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?') + if ((testIndex < fullURL.length()) && (fullURL.charAt(testIndex) == '?')) { throw URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL); } @@ -141,14 +140,9 @@ public class AMQConnectionURL implements ConnectionURL } - URLHelper.parseOptions(_options, connection.getQuery()); processOptions(); - - //Fragment is #string (not used) - //System.out.println(connection.getFragment()); - } catch (URISyntaxException uris) { @@ -165,11 +159,10 @@ public class AMQConnectionURL implements ConnectionURL } else { - if (slash != 0 && fullURL.charAt(slash - 1) == ':') + if ((slash != 0) && (fullURL.charAt(slash - 1) == ':')) { throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, - "Virtual host looks like a windows path, forward slash not allowed in URL", - fullURL); + "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); } else { @@ -182,14 +175,14 @@ public class AMQConnectionURL implements ConnectionURL private void parseUserInfo(String userinfo) throws URLSyntaxException { - //user info = user:pass + // user info = user:pass int colonIndex = userinfo.indexOf(':'); if (colonIndex == -1) { throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), - "Null password in user information not allowed.", _url); + "Null password in user information not allowed.", _url); } else { @@ -205,7 +198,7 @@ public class AMQConnectionURL implements ConnectionURL { String brokerlist = _options.get(OPTIONS_BROKERLIST); - //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value' + // brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value' StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR); while (st.hasMoreTokens()) @@ -244,19 +237,16 @@ public class AMQConnectionURL implements ConnectionURL _defaultTopicExchangeName = new AMQShortString(_options.get(OPTIONS_DEFAULT_TOPIC_EXCHANGE)); } - if (_options.containsKey(OPTIONS_DEFAULT_QUEUE_EXCHANGE)) { _defaultQueueExchangeName = new AMQShortString(_options.get(OPTIONS_DEFAULT_QUEUE_EXCHANGE)); } - if (_options.containsKey(OPTIONS_TEMPORARY_QUEUE_EXCHANGE)) { _temporaryQueueExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_QUEUE_EXCHANGE)); } - if (_options.containsKey(OPTIONS_TEMPORARY_TOPIC_EXCHANGE)) { _temporaryTopicExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_TOPIC_EXCHANGE)); @@ -439,12 +429,11 @@ public class AMQConnectionURL implements ConnectionURL return sb.toString(); } - public static void main(String[] args) throws URLSyntaxException { - - String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'"; - //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''"; + String url2 = + "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'"; + // "amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''"; ConnectionURL connectionurl2 = new AMQConnectionURL(url2); @@ -452,5 +441,4 @@ public class AMQConnectionURL implements ConnectionURL System.out.println(connectionurl2); } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 661372845a..585991d905 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -53,6 +53,8 @@ public abstract class AMQDestination implements Destination, Referenceable private String _url; private AMQShortString _urlAsShortString; + private boolean _validated; + private byte[] _byteEncoding; private static final int IS_DURABLE_MASK = 0x1; private static final int IS_EXCLUSIVE_MASK = 0x2; @@ -198,12 +200,16 @@ public abstract class AMQDestination implements Destination, Referenceable { return toURL(); - /* - return "Destination: " + _destinationName + ", " + - "Queue Name: " + _queueName + ", Exchange: " + _exchangeName + - ", Exchange class: " + _exchangeClass + ", Exclusive: " + _isExclusive + - ", AutoDelete: " + _isAutoDelete + ", Routing Key: " + getRoutingKey(); - */ + } + + public boolean isValidated() + { + return _validated; + } + + public void setValidated(boolean validated) + { + _validated = validated; } public String toURL() @@ -348,15 +354,7 @@ public abstract class AMQDestination implements Destination, Referenceable { return false; } - /* if (_isExclusive != that._isExclusive) - { - return false; - } - if (_isAutoDelete != that._isAutoDelete) - { - return false; - } - */ + return true; } @@ -370,8 +368,7 @@ public abstract class AMQDestination implements Destination, Referenceable { result = 29 * result + _queueName.hashCode(); } -// result = result * (_isExclusive ? 13 : 7); -// result = result * (_isAutoDelete ? 13 : 7); + return result; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 82f9a036d2..8bb5b622f7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -202,6 +202,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); + private AtomicBoolean _firstDispatcher = new AtomicBoolean(true); private class Dispatcher extends Thread { @@ -327,8 +328,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi ") is closed rejecting(requeue)..."); } } - - rejectMessage(message, true); + // Don't reject if we're already closing + if (!_closed.get()) + { + rejectMessage(message, true); + } } else { @@ -995,42 +999,42 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi throw new java.lang.UnsupportedOperationException(); } - public MessageProducer createProducer(Destination destination, boolean mandatory, - boolean immediate, boolean waitUntilSent) + public BasicMessageProducer createProducer(Destination destination, boolean mandatory, + boolean immediate, boolean waitUntilSent) throws JMSException { return createProducerImpl(destination, mandatory, immediate, waitUntilSent); } - public MessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) + public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate); } - public MessageProducer createProducer(Destination destination, boolean immediate) + public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException { return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); } - public MessageProducer createProducer(Destination destination) throws JMSException + public BasicMessageProducer createProducer(Destination destination) throws JMSException { return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); } - private org.apache.qpid.jms.MessageProducer createProducerImpl(Destination destination, boolean mandatory, - boolean immediate) + private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, + boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate, false); } - private org.apache.qpid.jms.MessageProducer createProducerImpl(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent) + private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, + final boolean immediate, final boolean waitUntilSent) throws JMSException { - return (org.apache.qpid.jms.MessageProducer) new FailoverSupport() + return (BasicMessageProducer) new FailoverSupport() { public Object operation() throws JMSException { @@ -1248,8 +1252,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { JMSException ex = new JMSException("Error registering consumer: " + e); - //todo remove - e.printStackTrace(); + if (_logger.isDebugEnabled()) + { + e.printStackTrace(); + } ex.setLinkedException(e); throw ex; } @@ -1926,6 +1932,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { + if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false"))) + { +// if (!connectionStopped) + { + if (isSuspended() && _firstDispatcher.getAndSet(false)) + { + try + { + suspendChannel(false); + } + catch (AMQException e) + { + _logger.info("Suspending channel threw an exception:" + e); + } + } + } + } + startDistpatcherIfNecessary(false); } @@ -1974,6 +1998,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); + // The dispatcher will be null if we have just created this session + // so suspend the channel before we register our consumer so that we don't + // start prefetching until a receive/mListener is set. + if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false"))) + { + if (_dispatcher == null) + { + if (!isSuspended()) + { + try + { + suspendChannel(true); + } + catch (AMQException e) + { + _logger.info("Suspending channel threw an exception:" + e); + } + } + } + } + try { consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); @@ -2089,7 +2134,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // Remove the consumer from the map BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); if (consumer != null) - { + { // fixme this isn't right.. needs to check if _queue contains data for this consumer if (consumer.isAutoClose())// && _queue.isEmpty()) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 73010ce517..1c3cdbcb65 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.client; +import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -34,6 +34,7 @@ import javax.jms.Message; import javax.jms.MessageListener; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; @@ -138,10 +139,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private boolean _noConsume; private List<StackTraceElement> _closedStack = null; - protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, - boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, - int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -160,7 +161,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _autoClose = autoClose; _noConsume = noConsume; - //Force queue browsers not to use acknowledge modes. + // Force queue browsers not to use acknowledge modes. if (_noConsume) { _acknowledgeMode = Session.NO_ACKNOWLEDGE; @@ -175,12 +176,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public String getMessageSelector() throws JMSException { checkPreConditions(); + return _messageSelector; } public MessageListener getMessageListener() throws JMSException { checkPreConditions(); + return _messageListener.get(); } @@ -198,14 +201,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { checkPreConditions(); - //if the current listener is non-null and the session is not stopped, then - //it is an error to call this method. + // if the current listener is non-null and the session is not stopped, then + // it is an error to call this method. - //i.e. it is only valid to call this method if + // i.e. it is only valid to call this method if // - // (a) the connection is stopped, in which case the dispatcher is not running - // OR - // (b) the listener is null AND we are not receiving synchronously at present + // (a) the connection is stopped, in which case the dispatcher is not running + // OR + // (b) the listener is null AND we are not receiving synchronously at present // if (!_session.getAMQConnection().started()) @@ -215,7 +218,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { - _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination); + _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + + _destination); } } else @@ -224,6 +228,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); } + if (!_messageListener.compareAndSet(null, messageListener)) { throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started."); @@ -233,7 +238,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (messageListener != null) { - //handle case where connection has already been started, and the dispatcher has alreaded started + // handle case where connection has already been started, and the dispatcher has alreaded started // putting values on the _synchronousQueue synchronized (_session) @@ -263,10 +268,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("Another thread is already receiving."); } + if (isMessageListenerSet()) { throw new javax.jms.IllegalStateException("A listener has already been set."); } + _receivingThread = Thread.currentThread(); } @@ -331,6 +338,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { return null; } + Object o = null; if (l > 0) { @@ -340,6 +348,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { o = _synchronousQueue.take(); } + final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { @@ -352,6 +361,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer catch (InterruptedException e) { _logger.warn("Interrupted: " + e); + return null; } finally @@ -365,6 +375,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) { close(false); + return true; } else @@ -387,6 +398,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { return null; } + Object o = _synchronousQueue.poll(); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) @@ -414,8 +426,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a * JMSException is created with the linked exception set appropriately */ - private AbstractJMSMessage returnMessageOrThrow(Object o) - throws JMSException + private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException { // errors are passed via the queue too since there is no way of interrupting the poll() via the API. if (o instanceof Throwable) @@ -425,6 +436,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { e.setLinkedException((Exception) o); } + throw e; } else @@ -433,7 +445,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public void close() throws JMSException { close(true); @@ -441,7 +452,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { - //synchronized (_closed) + // synchronized (_closed) if (_logger.isInfoEnabled()) { @@ -456,7 +467,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (_closedStack != null) { - _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + _logger.trace(_consumerTag + " close():" + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -464,14 +476,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6); } } + if (sendClose) { // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - _consumerTag, // consumerTag - false); // nowait + final AMQFrame cancelFrame = + BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag + false); // nowait try { @@ -485,25 +497,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (AMQException e) { - _logger.error("Error closing consumer: " + e, e); - throw new JMSException("Error closing consumer: " + e); + // _logger.error("Error closing consumer: " + e, e); + JMSException jmse = new JMSException("Error closing consumer: " + e); + jmse.setLinkedException(e); + throw jmse; } } else { -// //fixme this probably is not right -// if (!isNoConsume()) - { //done in BasicCancelOK Handler but not sending one so just deregister. + // //fixme this probably is not right + // if (!isNoConsume()) + { // done in BasicCancelOK Handler but not sending one so just deregister. deregisterConsumer(); } } - if (_messageListener != null && _receiving.get()) + if ((_messageListener != null) && _receiving.get()) { if (_logger.isInfoEnabled()) { _logger.info("Interrupting thread: " + _receivingThread); } + _receivingThread.interrupt(); } } @@ -516,7 +531,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ void markClosed() { -// synchronized (_closed) + // synchronized (_closed) { _closed.set(true); @@ -524,7 +539,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (_closedStack != null) { - _logger.trace(_consumerTag + " markClosed():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + _logger.trace(_consumerTag + " markClosed():" + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -533,6 +549,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } } + deregisterConsumer(); } @@ -551,22 +568,22 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag); } + try { - AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, - messageFrame.getDeliverBody().redelivered, - messageFrame.getDeliverBody().exchange, - messageFrame.getDeliverBody().routingKey, - messageFrame.getContentHeader(), - messageFrame.getBodies()); + AbstractJMSMessage jmsMessage = + _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, + messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, + messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); if (debug) { _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); } -// synchronized (_closed) + // synchronized (_closed) + { -// if (!_closed.get()) + // if (!_closed.get()) { jmsMessage.setConsumer(this); @@ -575,12 +592,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer notifyMessage(jmsMessage, channelId); } -// else -// { -// _logger.error("MESSAGE REJECTING!"); -// _session.rejectMessage(jmsMessage, true); -// //_logger.error("MESSAGE JUST DROPPED!"); -// } + // else + // { + // _logger.error("MESSAGE REJECTING!"); + // _session.rejectMessage(jmsMessage, true); + // //_logger.error("MESSAGE JUST DROPPED!"); + // } } } catch (Exception e) @@ -606,11 +623,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (isMessageListenerSet()) { - //we do not need a lock around the test above, and the dispatch below as it is invalid - //for an application to alter an installed listener while the session is started -// synchronized (_closed) + // we do not need a lock around the test above, and the dispatch below as it is invalid + // for an application to alter an installed listener while the session is started + // synchronized (_closed) { -// if (!_closed.get()) + // if (!_closed.get()) { preApplicationProcessing(jmsMessage); @@ -641,9 +658,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { switch (_acknowledgeMode) { + case Session.PRE_ACKNOWLEDGE: _session.acknowledgeMessage(msg.getDeliveryTag(), false); break; + case Session.CLIENT_ACKNOWLEDGE: // we set the session so that when the user calls acknowledge() it can call the method on session // to send out the appropriate frame @@ -657,17 +676,21 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer msg.setJMSDestination(_destination); switch (_acknowledgeMode) { + case Session.CLIENT_ACKNOWLEDGE: if (isNoConsume()) { _session.acknowledgeMessage(msg.getDeliveryTag(), false); } + break; + case Session.DUPS_OK_ACKNOWLEDGE: if (++_outstanding >= _prefetchHigh) { _dups_ok_acknowledge_send = true; } + if (_outstanding <= _prefetchLow) { _dups_ok_acknowledge_send = false; @@ -680,14 +703,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _session.acknowledgeMessage(msg.getDeliveryTag(), true); } } + break; + case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() if (!_session.isInRecovery()) { _session.acknowledgeMessage(msg.getDeliveryTag(), false); } + break; + case Session.SESSION_TRANSACTED: if (isNoConsume()) { @@ -697,6 +724,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _receivedDeliveryTags.add(msg.getDeliveryTag()); } + break; } } @@ -721,14 +749,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer void notifyError(Throwable cause) { -// synchronized (_closed) + // synchronized (_closed) { _closed.set(true); if (_logger.isTraceEnabled()) { if (_closedStack != null) { - _logger.trace(_consumerTag + " notifyError():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + _logger.trace(_consumerTag + " notifyError():" + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously" + _closedStack.toString()); } else @@ -737,7 +766,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } } - //QPID-293 can "request redelivery of this error through dispatcher" + // QPID-293 can "request redelivery of this error through dispatcher" // we have no way of propagating the exception to a message listener - a JMS limitation - so we // deal with the case where we have a synchronous receive() waiting for a message to arrive @@ -749,10 +778,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _logger.debug("Passed exception to synchronous queue for propagation to receive()"); } } + deregisterConsumer(); } - /** * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in * the case of an error occurring. @@ -782,7 +811,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer this.checkNotClosed(); - if (_session == null || _session.isClosed()) + if ((_session == null) || _session.isClosed()) { throw new javax.jms.IllegalStateException("Invalid Session"); } @@ -817,7 +846,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return _autoClose; } - public boolean isNoConsume() { return _noConsume; @@ -827,10 +855,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _closeWhenNoMessages = b; - if (_closeWhenNoMessages - && _synchronousQueue.isEmpty() - && _receiving.get() - && _messageListener != null) + if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null)) { _receivingThread.interrupt(); } @@ -846,13 +871,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _logger.debug("Rejecting received messages in _receivedDTs (RQ)"); } - //rollback received but not committed messages + // rollback received but not committed messages while (!_receivedDeliveryTags.isEmpty()) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" + - "for consumer with tag:" + _consumerTag); + _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" + + "for consumer with tag:" + _consumerTag); } Long tag = _receivedDeliveryTags.poll(); @@ -876,14 +901,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - //rollback pending messages + // rollback pending messages if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" + - "for consumer with tag:" + _consumerTag); + _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" + + "for consumer with tag:" + _consumerTag); } + Iterator iterator = _synchronousQueue.iterator(); while (iterator.hasNext()) @@ -898,13 +924,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); } + iterator.remove(); } else { - _logger.error("Queue contained a :" + o.getClass() + - " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + _logger.error("Queue contained a :" + o.getClass() + + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); iterator.remove(); } } @@ -919,7 +946,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public String debugIdentity() { return String.valueOf(_consumerTag); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index b01e087ce1..bd7cc94582 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.io.UnsupportedEncodingException; +import java.util.UUID; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; @@ -118,6 +119,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private final boolean _mandatory; private final boolean _waitUntilSent; + + private boolean _disableMessageId; + private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, @@ -172,15 +176,14 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { checkPreConditions(); checkNotClosed(); - // IGNORED + _disableMessageId = b; } public boolean getDisableMessageID() throws JMSException { checkNotClosed(); - // Always false for AMQP - return false; + return _disableMessageId; } public void setDisableMessageTimestamp(boolean b) throws JMSException @@ -450,6 +453,18 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j origMessage.setJMSDestination(destination); AbstractJMSMessage message = convertToNativeMessage(origMessage); + + if(_disableMessageId) + { + message.setJMSMessageID(null); + } + else + { + if (message.getJMSMessageID() == null) + { + message.setJMSMessageID(UUID.randomUUID().toString()); + } + } int type; if (destination instanceof Topic) @@ -667,4 +682,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { return _session; } + + public boolean isBound(AMQDestination destination) throws JMSException + { + return _session.isQueueBound(destination.getExchangeName(),null,destination.getRoutingKey()); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java index d2ab6bd2c2..d1237cff49 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java @@ -1,5 +1,25 @@ /* * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/* + * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,10 +42,35 @@ import javax.jms.JMSException; import org.apache.qpid.AMQException; /** + * JMSException does not accept wrapped exceptions in its constructor. Presumably this is because it is a relatively old + * Java exception class, before this was added as a default to Throwable. This exception class accepts wrapped exceptions + * as well as error messages, through its constructor, but is a JMSException. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Accept wrapped exceptions as a JMSException. + * </table> + * * @author Apache Software Foundation */ public class JMSAMQException extends JMSException { + /** + * Creates a JMSException, wrapping another exception class. + * + * @param message The error message. + * @param cause The underlying exception that caused this one. May be null if none is to be set. + */ + public JMSAMQException(String message, Exception cause) + { + super(message); + + if (cause != null) + { + setLinkedException(cause); + } + } + public JMSAMQException(AMQException s) { super(s.getMessage(), String.valueOf(s.getErrorCode())); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index c9d29d8077..e0c4b61333 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -7,14 +7,15 @@ import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueSender; +import javax.jms.InvalidDestinationException; public class QueueSenderAdapter implements QueueSender { - private MessageProducer _delegate; + private BasicMessageProducer _delegate; private Queue _queue; private boolean closed = false; - public QueueSenderAdapter(MessageProducer msgProducer, Queue queue){ + public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue){ _delegate = msgProducer; _queue = queue; } @@ -122,12 +123,13 @@ public class QueueSenderAdapter implements QueueSender { _delegate.setTimeToLive(timeToLive); } - private void checkPreConditions() throws IllegalStateException, IllegalStateException + private void checkPreConditions() throws JMSException { checkPreConditions(_queue); } - private void checkPreConditions(Queue queue) throws IllegalStateException, IllegalStateException { + private void checkPreConditions(Queue queue) throws JMSException + { if (closed){ throw new javax.jms.IllegalStateException("Publisher is closed"); } @@ -137,5 +139,28 @@ public class QueueSenderAdapter implements QueueSender { if(session == null || session.isClosed()){ throw new javax.jms.IllegalStateException("Invalid Session"); } - } + + if(!(queue instanceof AMQDestination)) + { + throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue"); + } + AMQDestination destination = (AMQDestination) queue; + if(!destination.isValidated() && checkQueueBeforePublish()) + { + + if (_delegate.isBound(destination)) + { + destination.setValidated(true); + } + else + { + throw new InvalidDestinationException("Queue: " + queue + " is not a valid destination (no bindings on server"); + } + } + } + + private boolean checkQueueBeforePublish() + { + return "true".equalsIgnoreCase(System.getProperty("org.apache.qpid.client.verifyQueueBindingBeforePublish", "true")); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java index f67b984658..02a408465b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java @@ -175,5 +175,10 @@ public class TopicPublisherAdapter implements TopicPublisher { throw new InvalidDestinationException("Destination " + topic + " is not a topic"); } + if(!(topic instanceof AMQDestination)) + { + throw new InvalidDestinationException("Destination " + topic + " is not a Qpid topic"); + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index e2b101ab79..f62baf2c3a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -94,6 +94,8 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener } } + //fixme why is this only done when the close is expected... + // should the above forced closes not also cause a close? protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason)); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 8938130417..af254fbbaf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -27,6 +27,7 @@ import javax.jms.JMSException; import javax.jms.MessageEOFException; import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -72,7 +73,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage } AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException + AMQShortString routingKey, ByteBuffer data) throws AMQException { // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); @@ -93,7 +94,9 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage } catch (IOException e) { - throw new JMSException(e.toString()); + JMSException jmse = new JMSException(e.toString()); + jmse.setLinkedException(e); + throw jmse; } } @@ -112,6 +115,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage { return null; } + int pos = _data.position(); _data.rewind(); // one byte left is for the end of frame marker @@ -119,12 +123,14 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage { // this is really redundant since pos must be zero _data.position(pos); + return null; } else { String data = _data.getString(Charset.forName("UTF8").newDecoder()); _data.position(pos); + return data; } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 66524edce3..f87b4027f6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.message; import java.util.Collections; import java.util.Enumeration; import java.util.Map; +import java.util.UUID; import javax.jms.Destination; import javax.jms.JMSException; @@ -32,12 +33,7 @@ import javax.jms.MessageNotWriteableException; import org.apache.commons.collections.map.ReferenceMap; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.AMQUndefinedDestination; -import org.apache.qpid.client.BasicMessageConsumer; -import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.client.*; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; @@ -123,7 +119,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { if (getContentHeaderProperties().getMessageIdAsString() == null) { - getContentHeaderProperties().setMessageId("ID:" + _deliveryTag); + getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID()); } return getContentHeaderProperties().getMessageIdAsString(); @@ -183,7 +179,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } catch (URLSyntaxException e) { - throw new JMSException("Illegal value in JMS_ReplyTo property: " + replyToEncoding); + throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e); } _destinationCache.put(replyToEncoding, dest); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index 6352f7029f..348a0bd152 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -384,7 +384,9 @@ public final class JMSHeaderAdapter }
catch (AMQPInvalidClassException aice)
{
- throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ mfe.setLinkedException(aice);
+ throw mfe;
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index df1400b167..caf8741280 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -33,6 +33,7 @@ import javax.jms.MessageFormatException; import javax.jms.ObjectMessage; import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -61,14 +62,15 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); _data.setAutoExpand(true); } + getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); } /** * Creates read only message for delivery to consumers */ - JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException + JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, + ByteBuffer data) throws AMQException { super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); } @@ -79,6 +81,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag { _data.release(); } + _data = null; } @@ -116,11 +119,13 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag } catch (IOException e) { - throw new MessageFormatException("Message not serializable: " + e); + MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e); + mfe.setLinkedException(e); + throw mfe; } } - + public Serializable getObject() throws JMSException { ObjectInputStream in = null; @@ -133,17 +138,20 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag { _data.rewind(); in = new ObjectInputStream(_data.asInputStream()); + return (Serializable) in.readObject(); } catch (IOException e) { - e.printStackTrace(); - throw new MessageFormatException("Could not deserialize message: " + e); + MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); + mfe.setLinkedException(e); + throw mfe; } catch (ClassNotFoundException e) { - e.printStackTrace(); - throw new MessageFormatException("Could not deserialize message: " + e); + MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); + mfe.setLinkedException(e); + throw mfe; } finally { @@ -162,8 +170,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag } } catch (IOException ignore) - { - } + { } } private static String toString(ByteBuffer data) @@ -172,6 +179,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag { return null; } + int pos = data.position(); try { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index d0cc52271a..5bc1555df7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -34,6 +34,7 @@ import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; +import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.SSLConfiguration; @@ -248,6 +249,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter sessionClosed(session); } + + //FIXME Need to correctly handle other exceptions. Things like ... +// if (cause instanceof AMQChannelClosedException) + // which will cause the JMSSession to end due to a channel close and so that Session needs + // to be removed from the map so we can correctly still call close without an exception when trying to close + // the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception } // we reach this point if failover was attempted and failed therefore we need to let the calling app // know since we cannot recover the situation @@ -510,11 +517,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter _protocolSession.closeSession(session); } - public void closeConnection() throws AMQException - { - closeConnection(-1); - } - public void closeConnection(long timeout) throws AMQException { getStateManager().changeState(AMQState.CONNECTION_CLOSING); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties b/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties index 50e6f1efaa..89ee8337f8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties @@ -16,5 +16,6 @@ # specific language governing permissions and limitations # under the License. # +CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java index f8ee22a5d9..04db8044de 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java @@ -20,10 +20,6 @@ */ package org.apache.qpid.client.security; -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.security.Security; @@ -34,6 +30,7 @@ import java.util.TreeMap; import javax.security.sasl.SaslClientFactory; + import org.apache.log4j.Logger; import org.apache.qpid.util.FileUtils; @@ -50,14 +47,11 @@ import org.apache.qpid.util.FileUtils; * mechanism=fully.qualified.class.name * </pre> * - * <p/>Where mechanism is an IANA-registered mechanism name and the fully qualified class name refers to a - * class that implements javax.security.sasl.SaslClientFactory and provides the specified mechanism. + * <p/>Where mechanism is an IANA-registered mechanism name and the fully qualified class name refers to a class that + * implements javax.security.sasl.SaslClientFactory and provides the specified mechanism. * - * <p><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Parse SASL mechanism properties. - * <tr><td> Create and register security provider for SASL mechanisms. - * </table> + * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Parse SASL + * mechanism properties. <tr><td> Create and register security provider for SASL mechanisms. </table> */ public class DynamicSaslRegistrar { @@ -69,10 +63,7 @@ public class DynamicSaslRegistrar /** The default name of the SASL properties file resource. */ public static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/client/security/DynamicSaslRegistrar.properties"; - /** - * Reads the properties file, and creates a dynamic security provider to register the SASL implementations - * with. - */ + /** Reads the properties file, and creates a dynamic security provider to register the SASL implementations with. */ public static void registerSaslProviders() { _logger.debug("public static void registerSaslProviders(): called"); @@ -80,8 +71,8 @@ public class DynamicSaslRegistrar // Open the SASL properties file, using the default name is one is not specified. String filename = System.getProperty(FILE_PROPERTY); InputStream is = - FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME, - DynamicSaslRegistrar.class.getClassLoader()); + FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME, + DynamicSaslRegistrar.class.getClassLoader()); try { @@ -94,7 +85,7 @@ public class DynamicSaslRegistrar if (factories.size() > 0) { - Security.addProvider(new JCAProvider(factories)); + Security.insertProviderAt(new JCAProvider(factories), 0); _logger.debug("Dynamic SASL provider added as a security provider"); } } @@ -170,15 +161,15 @@ public class DynamicSaslRegistrar * @return A map from SASL mechanism names to implementing client factory classes. * * @todo Why tree map here? Do really want mechanisms in alphabetical order? Seems more likely that the declared - * order of the mechanisms is intended to be preserved, so that they are registered in the declared order - * of preference. Consider LinkedHashMap instead. + * order of the mechanisms is intended to be preserved, so that they are registered in the declared order of + * preference. Consider LinkedHashMap instead. */ private static Map<String, Class<? extends SaslClientFactory>> parseProperties(Properties props) { Enumeration e = props.propertyNames(); TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister = - new TreeMap<String, Class<? extends SaslClientFactory>>(); + new TreeMap<String, Class<? extends SaslClientFactory>>(); while (e.hasMoreElements()) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties b/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties index c2a7d7928c..1bff43142b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties @@ -17,3 +17,4 @@ # under the License. # AMQPLAIN=org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory +CRAM-MD5-HASHED=org.apache.qpid.client.security.crammd5hashed.CRAMMD5HashedSaslClientFactory diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java index 2fa8dcddde..5bf120454e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java @@ -52,7 +52,7 @@ public class JCAProvider extends Provider super("AMQSASLProvider", 1.0, "A JCA provider that registers all " + "AMQ SASL providers that want to be registered"); register(providerMap); - Security.addProvider(this); +// Security.addProvider(this); } /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java new file mode 100644 index 0000000000..46323e8c09 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.security; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.RealmCallback; + +import com.sun.crypto.provider.HmacMD5; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.protocol.AMQProtocolSession; + +public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler +{ + private static final Logger _logger = Logger.getLogger(UsernameHashedPasswordCallbackHandler.class); + + private AMQProtocolSession _protocolSession; + + public void initialise(AMQProtocolSession protocolSession) + { + _protocolSession = protocolSession; + } + + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException + { + for (int i = 0; i < callbacks.length; i++) + { + Callback cb = callbacks[i]; + if (cb instanceof NameCallback) + { + ((NameCallback) cb).setName(_protocolSession.getUsername()); + } + else if (cb instanceof PasswordCallback) + { + try + { + ((PasswordCallback) cb).setPassword(getHash(_protocolSession.getPassword())); + } + catch (NoSuchAlgorithmException e) + { + UnsupportedCallbackException uce = new UnsupportedCallbackException(cb); + uce.initCause(e); + throw uce; + } + } + else + { + throw new UnsupportedCallbackException(cb); + } + } + } + + private char[] getHash(String text) throws NoSuchAlgorithmException, UnsupportedEncodingException + { + + byte[] data = text.getBytes("utf-8"); + + MessageDigest md = MessageDigest.getInstance("MD5"); + + for (byte b : data) + { + md.update(b); + } + + byte[] digest = md.digest(); + + char[] hash = new char[digest.length ]; + + int index = 0; + for (byte b : digest) + { + hash[index++] = (char) b; + } + + return hash; + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java new file mode 100644 index 0000000000..22bb1ac156 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client.security.crammd5hashed; + +import org.apache.qpid.client.security.amqplain.AmqPlainSaslClient; + +import javax.security.sasl.SaslClientFactory; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.Sasl; +import javax.security.auth.callback.CallbackHandler; +import java.util.Map; +import java.security.Security; + +public class CRAMMD5HashedSaslClientFactory implements SaslClientFactory +{ + /** The name of this mechanism */ + public static final String MECHANISM = "CRAM-MD5-HASHED"; + + + public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh) throws SaslException + { + for (int i = 0; i < mechanisms.length; i++) + { + if (mechanisms[i].equals(MECHANISM)) + { + if (cbh == null) + { + throw new SaslException("CallbackHandler must not be null"); + } + + String[] mechs = {"CRAM-MD5"}; + return Sasl.createSaslClient(mechs, authorizationId, protocol, serverName, props, cbh); + } + } + return null; + } + + public String[] getMechanismNames(Map props) + { + if (props != null) + { + if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE)) + { + // returned array must be non null according to interface documentation + return new String[0]; + } + } + + return new String[]{MECHANISM}; + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java index 104c5bfc44..1ec3adc2eb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,7 +33,7 @@ public class AMQNoTransportForProtocolException extends AMQTransportConnectionEx public AMQNoTransportForProtocolException(BrokerDetails details, String message) { - super(message); + super(null, message, null); _details = details; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java index 4b17661bc3..fec7ff693c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,12 +21,12 @@ package org.apache.qpid.client.transport; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; public class AMQTransportConnectionException extends AMQException { - public AMQTransportConnectionException(String message) + public AMQTransportConnectionException(AMQConstant errorCode, String message, Throwable cause) { - super(message); - + super(errorCode, message, cause); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 8368eee125..0bc83e9804 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,12 +26,14 @@ import java.util.Iterator; import java.util.Map; import org.apache.log4j.Logger; + import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; + import org.apache.qpid.client.AMQBrokerDetails; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; @@ -64,13 +66,11 @@ public class TransportConnection int transport = getTransport(details.getTransport()); if (transport == -1) - { throw new AMQNoTransportForProtocolException(details); } if (transport == _currentInstance) - { if (transport == VM) { @@ -88,40 +88,42 @@ public class TransportConnection _currentInstance = transport; switch (transport) - { - case TCP: - _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() - { - public IoConnector newSocketConnector() + + case TCP: + _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() { - SocketConnector result; - //FIXME - this needs to be sorted to use the new Mina MultiThread SA. - if (Boolean.getBoolean("qpidnio")) + public IoConnector newSocketConnector() { - _logger.fatal("Using Qpid NIO - sysproperty 'qpidnio' is set."); -// result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector + SocketConnector result; + // FIXME - this needs to be sorted to use the new Mina MultiThread SA. + if (Boolean.getBoolean("qpidnio")) + { + _logger.fatal("Using Qpid NIO - sysproperty 'qpidnio' is set."); + // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector + } + // else + + { + _logger.info("Using Mina NIO"); + result = new SocketConnector(); // non-blocking connector + } + + // Don't have the connector's worker thread wait around for other connections (we only use + // one SocketConnector per connection at the moment anyway). This allows short-running + // clients (like unit tests) to complete quickly. + result.setWorkerTimeout(0); + + return result; } -// else - { - _logger.info("Using Mina NIO"); - result = new SocketConnector(); // non-blocking connector - } - - // Don't have the connector's worker thread wait around for other connections (we only use - // one SocketConnector per connection at the moment anyway). This allows short-running - // clients (like unit tests) to complete quickly. - result.setWorkerTimeout(0); + }); + break; - return result; - } - }); - break; - case VM: - { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); - break; - } + case VM: + { + _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); + break; + } } return _instance; @@ -142,7 +144,8 @@ public class TransportConnection return -1; } - private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException + private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) + throws AMQVMBrokerCreationException { int port = details.getPort(); @@ -154,14 +157,14 @@ public class TransportConnection } else { - throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled."); + throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port + + " does not exist. Auto create disabled.", null); } } return new VmPipeTransportConnection(port); } - public static void createVMBroker(int port) throws AMQVMBrokerCreationException { if (_acceptor == null) @@ -192,7 +195,7 @@ public class TransportConnection { _logger.error(e); - //Try and unbind provider + // Try and unbind provider try { VmPipeAddress pipe = new VmPipeAddress(port); @@ -203,7 +206,7 @@ public class TransportConnection } catch (Exception ignore) { - //ignore + // ignore } if (provider == null) @@ -227,7 +230,7 @@ public class TransportConnection because = e.getCause().toString(); } - throw new AMQVMBrokerCreationException(port, because + " Stopped binding of InVM Qpid.AMQP"); + throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e); } } } @@ -246,14 +249,14 @@ public class TransportConnection // can't use introspection to get Provider as it is a server class. // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access. - //get right constructor and pass in instancec ID - "port" + // get right constructor and pass in instancec ID - "port" IoHandlerAdapter provider; try { - Class[] cnstr = {Integer.class}; - Object[] params = {port}; + Class[] cnstr = { Integer.class }; + Object[] params = { port }; provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); - //Give the broker a second to create + // Give the broker a second to create _logger.info("Created VMBroker Instance:" + port); } catch (Exception e) @@ -270,8 +273,10 @@ public class TransportConnection because = e.getCause().toString(); } - - throw new AMQVMBrokerCreationException(port, because + " Stopped InVM Qpid.AMQP creation"); + AMQVMBrokerCreationException amqbce = + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); + amqbce.initCause(e); + throw amqbce; } return provider; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java index 607ddcc26a..4b2982fe9c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,19 +21,25 @@ package org.apache.qpid.client.vmbroker; import org.apache.qpid.client.transport.AMQTransportConnectionException; +import org.apache.qpid.protocol.AMQConstant; public class AMQVMBrokerCreationException extends AMQTransportConnectionException { private int _port; + /** + * @param port + * + * @deprecated + */ public AMQVMBrokerCreationException(int port) { - this(port, "Unable to create vm broker"); + this(null, port, "Unable to create vm broker", null); } - public AMQVMBrokerCreationException(int port, String message) + public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause) { - super(message); + super(errorCode, message, cause); _port = port; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java index 9adf04e182..6ad3fb4bae 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java @@ -101,7 +101,7 @@ public class FailoverPolicy } catch (Exception cnfe) { - throw new IllegalArgumentException("Unknown failover method:" + failoverMethod); + throw new IllegalArgumentException("Unknown failover method:" + failoverMethod, cnfe); } } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index a406f9f86e..794fd5c8c1 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -65,6 +65,7 @@ public class MessageListenerMultiConsumerTest extends TestCase private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock + protected void setUp() throws Exception { super.setUp(); @@ -122,30 +123,39 @@ public class MessageListenerMultiConsumerTest extends TestCase TransportConnection.killAllVMBrokers(); } +// public void testRecieveC1thenC2() throws Exception +// { +// +// for (int msg = 0; msg < MSG_COUNT / 2; msg++) +// { +// +// assertTrue(_consumer1.receive() != null); +// } +// +// for (int msg = 0; msg < MSG_COUNT / 2; msg++) +// { +// assertTrue(_consumer2.receive() != null); +// } +// } - public void testRecieveC1thenC2() throws Exception + public void testRecieveInterleaved() throws Exception { - - for (int msg = 0; msg < MSG_COUNT / 2; msg++) + int msg = 0; + int MAX_LOOPS = MSG_COUNT * 2; + for (int loops = 0; msg < MSG_COUNT || loops < MAX_LOOPS; loops++) { - assertTrue(_consumer1.receive() != null); - } - - for (int msg = 0; msg < MSG_COUNT / 2; msg++) - { - assertTrue(_consumer2.receive() != null); + if (_consumer1.receive(100) != null) + { + msg++; + } + if (_consumer2.receive(100) != null) + { + msg++; + } } - } - - public void testRecieveInterleaved() throws Exception - { - for (int msg = 0; msg < MSG_COUNT / 2; msg++) - { - assertTrue(_consumer1.receive() != null); - assertTrue(_consumer2.receive() != null); - } + assertEquals("Not all messages received.", MSG_COUNT, msg); } @@ -161,7 +171,7 @@ public class MessageListenerMultiConsumerTest extends TestCase if (receivedCount1 == MSG_COUNT / 2) { - _allMessagesSent.countDown(); + _allMessagesSent.countDown(); } } @@ -196,6 +206,18 @@ public class MessageListenerMultiConsumerTest extends TestCase assertEquals(MSG_COUNT, receivedCount1 + receivedCount2); } + public void testRecieveC2Only_OnlyRunWith_REGISTER_CONSUMERS_FLOWED() throws Exception + { + if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false"))) + { + for (int msg = 0; msg < MSG_COUNT; msg++) + { + assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg, + _consumer2.receive(1000) != null); + } + } + } + public static junit.framework.Test suite() { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java index 5fb77af4db..7b5957ac8c 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -144,6 +144,36 @@ public class MessageListenerTest extends TestCase implements MessageListener } + public void testRecieveTheUseMessageListener() throws Exception + { + + _logger.error("Test disabled as initial receive is not called first"); + // Perform initial receive to start connection +// assertTrue(_consumer.receive(2000) != null); +// receivedCount++; + + // Sleep to ensure remaining 4 msgs end up on _synchronousQueue +// Thread.sleep(1000); + + // Set the message listener and wait for the messages to come in. + _consumer.setMessageListener(this); + + _logger.info("Waiting 3 seconds for messages"); + + try + { + _awaitMessages.await(3000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + //Should have recieved all async messages + assertEquals(MSG_COUNT, receivedCount); + + } + + public void onMessage(Message message) { _logger.info("Received Message(" + receivedCount + "):" + message); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java index 10bf1a8d6d..42594fff8e 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java @@ -83,7 +83,7 @@ public class ResetMessageListenerTest extends TestCase Hashtable<String, String> env = new Hashtable<String, String>(); env.put("connectionfactory.connection", "amqp://guest:guest@MLT_ID/test?brokerlist='vm://:1'"); - env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + env.put("queue.queue", "direct://amq.direct//ResetMessageListenerTest"); _context = factory.getInitialContext(env); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java new file mode 100644 index 0000000000..1b5da2631d --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java @@ -0,0 +1,109 @@ +package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import junit.framework.TestCase;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.QueueSession;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+import javax.jms.TextMessage;
+import javax.jms.InvalidDestinationException;
+
+public class InvalidDestinationTest extends TestCase
+{
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private MessageConsumer _consumer;
+
+ private static final String VM_BROKER = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ createVMBroker();
+ _connection = new AMQConnection(VM_BROKER, "guest", "guest", "ReceiveTestClient", "test");
+ }
+
+ public void createVMBroker()
+ {
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ fail("Unable to create broker: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ TransportConnection.killVMBroker(1);
+ super.tearDown();
+ }
+
+
+
+ public void testInvalidDestination() throws Exception
+ {
+ Queue invalidDestination = new AMQQueue("amq.direct","unknownQ");
+ AMQQueue validDestination = new AMQQueue("amq.direct","knownQ");
+ QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // This is the only easy way to create and bind a queue from the API :-(
+ MessageConsumer consumer = queueSession.createConsumer(validDestination);
+
+ QueueSender sender = queueSession.createSender(invalidDestination);
+ TextMessage msg = queueSession.createTextMessage("Hello");
+ try
+ {
+ sender.send(msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.close();
+
+ sender = queueSession.createSender(null);
+ invalidDestination = new AMQQueue("amq.direct","unknownQ");
+
+ try
+ {
+ sender.send(invalidDestination,msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.send(validDestination,msg);
+ sender.close();
+ validDestination = new AMQQueue("amq.direct","knownQ");
+ sender = queueSession.createSender(validDestination);
+ sender.send(msg);
+
+
+
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+
+ return new junit.framework.TestSuite(InvalidDestinationTest.class);
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index 7762cb3fe9..62234ad21f 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -330,7 +330,7 @@ public class MessageRequeueTest extends TestCase public void testRequeue() throws JMSException, AMQException, URLSyntaxException { int run = 0; - while (run < 10) +// while (run < 10) { run++; @@ -350,17 +350,10 @@ public class MessageRequeueTest extends TestCase _logger.debug("Create Consumer"); MessageConsumer consumer = session.createConsumer(q); - try - { - Thread.sleep(2000); - } - catch (InterruptedException e) - { - // - } + conn.start(); _logger.debug("Receiving msg"); - Message msg = consumer.receive(1000); + Message msg = consumer.receive(2000); assertNotNull("Message should not be null", msg); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 0828ab398c..190b3861f0 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -100,7 +100,9 @@ public class DurableSubscriptionTest extends TestCase AMQTopic topic = new AMQTopic(con,"MyTopic"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer1 = session1.createConsumer(topic); - MessageProducer producer = session1.createProducer(topic); + + Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + MessageProducer producer = sessionProd.createProducer(topic); Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); @@ -112,12 +114,12 @@ public class DurableSubscriptionTest extends TestCase Message msg; msg = consumer1.receive(); assertEquals("A", ((TextMessage) msg).getText()); - msg = consumer1.receive(1000); + msg = consumer1.receive(100); assertEquals(null, msg); msg = consumer2.receive(); assertEquals("A", ((TextMessage) msg).getText()); - msg = consumer2.receive(1000); + msg = consumer2.receive(100); assertEquals(null, msg); consumer2.close(); @@ -127,14 +129,14 @@ public class DurableSubscriptionTest extends TestCase producer.send(session1.createTextMessage("B")); - msg = consumer1.receive(); + msg = consumer1.receive(100); assertEquals("B", ((TextMessage) msg).getText()); - msg = consumer1.receive(1000); + msg = consumer1.receive(100); assertEquals(null, msg); - msg = consumer3.receive(); + msg = consumer3.receive(100); assertEquals("B", ((TextMessage) msg).getText()); - msg = consumer3.receive(1000); + msg = consumer3.receive(100); assertEquals(null, msg); con.close(); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 2abc139ced..685fe20048 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -53,12 +53,15 @@ public class CommitRollbackTest extends TestCase Queue _jmsQueue; private static final Logger _logger = Logger.getLogger(CommitRollbackTest.class); + private static final String BROKER = "vm://:1"; protected void setUp() throws Exception { super.setUp(); - TransportConnection.createVMBroker(1); - + if (BROKER.startsWith("vm")) + { + TransportConnection.createVMBroker(1); + } testMethod++; queue += testMethod; @@ -68,7 +71,7 @@ public class CommitRollbackTest extends TestCase private void newConnection() throws AMQException, URLSyntaxException, JMSException { - conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"); + conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='" + BROKER + "'"); _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); @@ -87,7 +90,10 @@ public class CommitRollbackTest extends TestCase super.tearDown(); conn.close(); - TransportConnection.killVMBroker(1); + if (BROKER.startsWith("vm")) + { + TransportConnection.killVMBroker(1); + } } /** @@ -261,7 +267,7 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _pubSession.getTransacted()); _logger.info("sending test message"); - String MESSAGE_TEXT = "testGetThenDisconnect"; + String MESSAGE_TEXT = "testGetThenRollback"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); _pubSession.commit(); @@ -394,16 +400,60 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); result = _consumer.receive(1000); assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("1", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); - - result = _consumer.receive(1000); - assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("2", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); - + if (result.getJMSRedelivered()) + { + assertEquals("1", ((TextMessage) result).getText()); + + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + } + else + { + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + + } result = _consumer.receive(1000); assertNull("test message should be null:" + result, result); + + } + + + public void testPutThenRollbackThenGet() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenRollbackThenGet"; + + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + _pubSession.commit(); + + assertNotNull(_consumer.receive(100)); + + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("rolling back"); + _pubSession.rollback(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + assertNull("test message was put and rolled back, but is still present", result); + + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + assertNotNull(_consumer.receive(100)); + } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/Config.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/Config.java index 8109d20a33..b777cf93b6 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/testutil/Config.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/Config.java @@ -172,7 +172,7 @@ public class Config } catch(NumberFormatException e) { - throw new RuntimeException("Bad port number: " + value); + throw new RuntimeException("Bad port number: " + value, e); } } else if("-name".equalsIgnoreCase(key)) diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java index f2afa472ab..195ed79dab 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -3,6 +3,7 @@ package org.apache.qpid.testutil; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.url.URLSyntaxException; import org.apache.log4j.Logger; @@ -70,7 +71,7 @@ public class QpidClientConnection implements ExceptionListener } catch (URLSyntaxException e) { - throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e); } } } |
