summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-04-09 15:26:04 +0000
committerRobert Greig <rgreig@apache.org>2007-04-09 15:26:04 +0000
commitf966c066fc10c1510c244c41958e343c682dd1a1 (patch)
treeff0d2818fe3729bb6ebdfcb8d654d17b8599538f /java/client
parentb8b2e032a4a6a6ad796ce6247c581a0498b5c264 (diff)
downloadqpid-python-f966c066fc10c1510c244c41958e343c682dd1a1.tar.gz
Stopped throwing away exception causes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@526776 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java4
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java352
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java254
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java28
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java95
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/testutil/Config.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java3
17 files changed, 480 insertions, 378 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
index 88bcbbbccb..f3b21e3c64 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
@@ -47,7 +47,9 @@ public class FileMessageFactory
}
catch (IOException e)
{
- throw new MessageFactoryException(e.toString());
+ MessageFactoryException mfe = new MessageFactoryException(e.toString());
+ mfe.initCause(e);
+ throw mfe;
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
index 8505d1d457..98a2c0d497 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
@@ -59,11 +59,11 @@ public class InitialContextHelper
}
catch (IOException e)
{
- throw new ContextException(e.toString());
+ throw new ContextException(e.toString(), e);
}
catch (NamingException n)
{
- throw new ContextException(n.toString());
+ throw new ContextException(n.toString(), n);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 2274f2964f..0e3d99eeba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,29 @@
*/
package org.apache.qpid.client;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
+import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
@@ -44,28 +67,6 @@ import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.log4j.Logger;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = Logger.getLogger(AMQConnection.class);
@@ -95,7 +96,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
+ private final Map _sessions = new LinkedHashMap(); // fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
private String _clientName;
@@ -125,15 +126,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/*
* _Connected should be refactored with a suitable wait object.
- */
+ */
private boolean _connected;
/*
* The last error code that occured on the connection. Used to return the correct exception to the client
- */
+ */
private AMQException _lastAMQException = null;
-
/*
* The connection meta data
*/
@@ -161,13 +161,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* @throws AMQException
* @throws URLSyntaxException
*/
- public AMQConnection(String broker, String username, String password,
- String clientName, String virtualHost) throws AMQException, URLSyntaxException
+ public AMQConnection(String broker, String username, String password, String clientName, String virtualHost)
+ throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" +
- (clientName == null ? "" : clientName) + "/" +
- virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), null);
+ this(new AMQConnectionURL(
+ ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
+ + AMQBrokerDetails.checkTransport(broker) + "'"), null);
}
/**
@@ -180,44 +180,38 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* @throws AMQException
* @throws URLSyntaxException
*/
- public AMQConnection(String broker, String username, String password,
- String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ public AMQConnection(String broker, String username, String password, String clientName, String virtualHost,
+ SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" +
- (clientName == null ? "" : clientName) + "/" +
- virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
+ this(new AMQConnectionURL(
+ ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
+ + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
}
-
- public AMQConnection(String host, int port, String username, String password,
- String clientName, String virtualHost) throws AMQException, URLSyntaxException
+ public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
+ throws AMQException, URLSyntaxException
{
this(host, port, false, username, password, clientName, virtualHost, null);
}
- public AMQConnection(String host, int port, String username, String password,
- String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost,
+ SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
this(host, port, false, username, password, clientName, virtualHost, sslConfig);
}
-
- public AMQConnection(String host, int port, boolean useSSL, String username, String password,
- String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName,
+ String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(useSSL ?
- ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" +
- (clientName == null ? "" : clientName) +
- virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
- + "," + ConnectionURL.OPTIONS_SSL + "='true'" :
- ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" +
- (clientName == null ? "" : clientName) +
- virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
- + "," + ConnectionURL.OPTIONS_SSL + "='false'"
- ), sslConfig);
+ this(new AMQConnectionURL(
+ useSSL
+ ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'")
+ : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig);
}
public AMQConnection(String connection) throws AMQException, URLSyntaxException
@@ -230,13 +224,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
this(new AMQConnectionURL(connection), sslConfig);
}
-
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
if (_logger.isInfoEnabled())
{
_logger.info("Connection:" + connectionURL);
}
+
_sslConfiguration = sslConfig;
if (connectionURL == null)
{
@@ -250,7 +244,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_password = connectionURL.getPassword();
setVirtualHost(connectionURL.getVirtualHost());
-
if (connectionURL.getDefaultQueueExchangeName() != null)
{
_defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
@@ -271,7 +264,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
}
-
_failoverPolicy = new FailoverPolicy(connectionURL);
_protocolHandler = new AMQProtocolHandler(this);
@@ -279,7 +271,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// We are not currently connected
_connected = false;
-
Exception lastException = new Exception();
lastException.initCause(new ConnectException());
@@ -297,7 +288,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (_logger.isInfoEnabled())
{
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause());
+ _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(),
+ e.getCause());
}
}
}
@@ -323,7 +315,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- if (message == null || message.equals(""))
+ if ((message == null) || message.equals(""))
{
message = "Unable to Connect";
}
@@ -336,11 +328,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString());
}
+
e.initCause(lastException);
}
throw e;
}
+
_connectionMetaData = new QpidConnectionMetaData(this);
}
@@ -370,6 +364,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
virtualHost = virtualHost.substring(1);
}
+
_virtualHost = virtualHost;
}
@@ -383,7 +378,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_protocolHandler.attainState(AMQState.CONNECTION_OPEN);
_failoverPolicy.attainedConnection();
- //Again this should be changed to a suitable notify
+ // Again this should be changed to a suitable notify
_connected = true;
}
catch (AMQException e)
@@ -402,6 +397,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
makeBrokerConnection(bd);
+
return true;
}
catch (Exception e)
@@ -410,8 +406,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Unable to connect to broker at " + bd);
}
+
attemptReconnection();
}
+
return false;
}
@@ -422,6 +420,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
+
return true;
}
catch (Exception e)
@@ -437,13 +436,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (_logger.isInfoEnabled())
{
- _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails());
+ _logger.info(e.getMessage() + ":Unable to connect to broker at "
+ + _failoverPolicy.getCurrentBrokerDetails());
}
}
}
}
- //connection unsuccessful
+ // connection unsuccessful
return false;
}
@@ -475,14 +475,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
}
- public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
- final int prefetch) throws JMSException
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
+ throws JMSException
{
return createSession(transacted, acknowledgeMode, prefetch, prefetch);
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
- final int prefetchHigh, final int prefetchLow) throws JMSException
+ final int prefetchHigh, final int prefetchLow) throws JMSException
{
checkNotClosed();
if (channelLimitReached())
@@ -492,85 +492,81 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
else
{
return (org.apache.qpid.jms.Session) new FailoverSupport()
- {
- public Object operation() throws JMSException
- {
- int channelId = _idFactory.incrementAndGet();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Write channel open frame for channel id " + channelId);
- }
-
- // We must create the session and register it before actually sending the frame to the server to
- // open it, so that there is no window where we could receive data on the channel and not be set
- // up to handle it appropriately.
- AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode,
- prefetchHigh, prefetchLow);
- _protocolHandler.addSessionByChannel(channelId, session);
- registerSession(channelId, session);
-
- boolean success = false;
- try
- {
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- success = true;
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error creating session: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
- finally
- {
- if (!success)
- {
- _protocolHandler.removeSessionByChannel(channelId);
- deregisterSession(channelId);
- }
- }
-
- if (_started)
{
- try
- {
- session.start();
- }
- catch (AMQException e)
+ public Object operation() throws JMSException
{
- throw new JMSAMQException(e);
+ int channelId = _idFactory.incrementAndGet();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Write channel open frame for channel id " + channelId);
+ }
+
+ // We must create the session and register it before actually sending the frame to the server to
+ // open it, so that there is no window where we could receive data on the channel and not be set
+ // up to handle it appropriately.
+ AMQSession session =
+ new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
+ prefetchLow);
+ _protocolHandler.addSessionByChannel(channelId, session);
+ registerSession(channelId, session);
+
+ boolean success = false;
+ try
+ {
+ createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ success = true;
+ }
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error creating session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ finally
+ {
+ if (!success)
+ {
+ _protocolHandler.removeSessionByChannel(channelId);
+ deregisterSession(channelId);
+ }
+ }
+
+ if (_started)
+ {
+ try
+ {
+ session.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ return session;
}
- }
- return session;
- }
- }.execute(this);
+ }.execute(this);
}
}
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException
+ throws AMQException
{
// TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(
- ChannelOpenBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- null), // outOfBand
- ChannelOpenOkBody.class);
-
- //todo send low water mark when protocol allows.
- //todo Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(
- BasicQosBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- false, // global
- prefetchHigh, // prefetchCount
- 0), // prefetchSize
- BasicQosOkBody.class);
+ _protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), null), // outOfBand
+ ChannelOpenOkBody.class);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ _protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), false, // global
+ prefetchHigh, // prefetchCount
+ 0), // prefetchSize
+ BasicQosOkBody.class);
if (transacted)
{
@@ -580,10 +576,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
// TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()),
- TxSelectOkBody.class);
+ _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class);
}
}
@@ -597,11 +591,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_protocolHandler.removeSessionByChannel(channelId);
deregisterSession(channelId);
- throw new AMQException("Error reopening channel " + channelId + " after failover: " + e);
+ throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e);
}
}
-
public void setFailoverPolicy(FailoverPolicy policy)
{
_failoverPolicy = policy;
@@ -646,12 +639,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private boolean channelLimitReached()
{
- return _maximumChannelCount != 0 && _sessions.size() == _maximumChannelCount;
+ return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount);
}
public String getClientID() throws JMSException
{
checkNotClosed();
+
return _clientName;
}
@@ -667,6 +661,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ConnectionMetaData getMetaData() throws JMSException
{
checkNotClosed();
+
return _connectionMetaData;
}
@@ -674,6 +669,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ExceptionListener getExceptionListener() throws JMSException
{
checkNotClosed();
+
return _exceptionListener;
}
@@ -707,6 +703,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new JMSAMQException(e);
}
}
+
_started = true;
}
}
@@ -727,6 +724,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new JMSAMQException(e);
}
}
+
_started = false;
}
}
@@ -753,7 +751,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
- //adjust timeout
+ // adjust timeout
long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
_taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
@@ -764,7 +762,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- //adjust timeout
+ // adjust timeout
timeout = adjustTimeout(timeout, startCloseTime);
_protocolHandler.closeConnection(timeout);
@@ -772,7 +770,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
catch (AMQException e)
{
- throw new JMSException("Error closing connection: " + e);
+ JMSException jmse = new JMSException("Error closing connection: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
}
@@ -786,6 +786,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
timeout = 0;
}
+
return timeout;
}
@@ -804,6 +805,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
session.markClosed();
}
+
_sessions.clear();
}
@@ -843,6 +845,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
}
+
_sessions.clear();
if (sessionException != null)
{
@@ -851,42 +854,42 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
- ServerSessionPool sessionPool,
- int maxMessages) throws JMSException
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
checkNotClosed();
+
return null;
}
- public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
- ServerSessionPool sessionPool,
- int maxMessages) throws JMSException
+ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException
{
checkNotClosed();
+
return null;
}
- public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
- ServerSessionPool sessionPool,
- int maxMessages) throws JMSException
+ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException
{
checkNotClosed();
+
return null;
}
- public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
- String messageSelector, ServerSessionPool sessionPool,
- int maxMessages)
- throws JMSException
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
// TODO Auto-generated method stub
checkNotClosed();
+
return null;
}
public long getMaximumChannelCount() throws JMSException
{
checkNotClosed();
+
return _maximumChannelCount;
}
@@ -975,6 +978,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
proceed = _connectionListener.preFailover(redirect);
}
+
return proceed;
}
@@ -995,6 +999,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
markAllSessionsClosed();
}
+
return resubscribe;
}
else
@@ -1058,12 +1063,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (cause instanceof AMQException)
{
- je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), "Exception thrown against " + toString() + ": " + cause);
+ je =
+ new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()),
+ "Exception thrown against " + toString() + ": " + cause);
}
else
{
je = new JMSException("Exception thrown against " + toString() + ": " + cause);
}
+
if (cause instanceof Exception)
{
je.setLinkedException((Exception) cause);
@@ -1091,6 +1099,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Closing AMQConnection due to :" + cause.getMessage());
}
+
_closed.set(true);
closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
}
@@ -1146,9 +1155,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
buf.append("Host: ").append(String.valueOf(bd.getHost()));
buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
}
+
buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost));
buf.append("\nClient ID: ").append(String.valueOf(_clientName));
- buf.append("\nActive session count: ").append(_sessions == null ? 0 : _sessions.size());
+ buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size());
+
return buf.toString();
}
@@ -1159,11 +1170,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public Reference getReference() throws NamingException
{
- return new Reference(
- AMQConnection.class.getName(),
- new StringRefAddr(AMQConnection.class.getName(), toURL()),
- AMQConnectionFactory.class.getName(),
- null); // factory location
+ return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()),
+ AMQConnectionFactory.class.getName(), null); // factory location
}
public SSLConfiguration getSSLConfiguration()
@@ -1176,19 +1184,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _defaultTopicExchangeName;
}
-
public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName)
{
_defaultTopicExchangeName = defaultTopicExchangeName;
}
-
public AMQShortString getDefaultQueueExchangeName()
{
return _defaultQueueExchangeName;
}
-
public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName)
{
_defaultQueueExchangeName = defaultQueueExchangeName;
@@ -1201,10 +1206,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQShortString getTemporaryQueueExchangeName()
{
- return _temporaryQueueExchangeName; //To change body of created methods use File | Settings | File Templates.
+ return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates.
}
-
public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)
{
_temporaryTopicExchangeName = temporaryTopicExchangeName;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 73010ce517..38c1cd8205 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.client;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -34,6 +34,7 @@ import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
@@ -138,10 +139,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
- protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
- boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
- int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -160,7 +161,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_autoClose = autoClose;
_noConsume = noConsume;
- //Force queue browsers not to use acknowledge modes.
+ // Force queue browsers not to use acknowledge modes.
if (_noConsume)
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
@@ -175,12 +176,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public String getMessageSelector() throws JMSException
{
checkPreConditions();
+
return _messageSelector;
}
public MessageListener getMessageListener() throws JMSException
{
checkPreConditions();
+
return _messageListener.get();
}
@@ -198,14 +201,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
checkPreConditions();
- //if the current listener is non-null and the session is not stopped, then
- //it is an error to call this method.
+ // if the current listener is non-null and the session is not stopped, then
+ // it is an error to call this method.
- //i.e. it is only valid to call this method if
+ // i.e. it is only valid to call this method if
//
- // (a) the connection is stopped, in which case the dispatcher is not running
- // OR
- // (b) the listener is null AND we are not receiving synchronously at present
+ // (a) the connection is stopped, in which case the dispatcher is not running
+ // OR
+ // (b) the listener is null AND we are not receiving synchronously at present
//
if (!_session.getAMQConnection().started())
@@ -215,7 +218,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_logger.isDebugEnabled())
{
- _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
+ _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination "
+ + _destination);
}
}
else
@@ -224,6 +228,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
}
+
if (!_messageListener.compareAndSet(null, messageListener))
{
throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
@@ -233,7 +238,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (messageListener != null)
{
- //handle case where connection has already been started, and the dispatcher has alreaded started
+ // handle case where connection has already been started, and the dispatcher has alreaded started
// putting values on the _synchronousQueue
synchronized (_session)
@@ -263,10 +268,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
throw new javax.jms.IllegalStateException("Another thread is already receiving.");
}
+
if (isMessageListenerSet())
{
throw new javax.jms.IllegalStateException("A listener has already been set.");
}
+
_receivingThread = Thread.currentThread();
}
@@ -331,6 +338,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
return null;
}
+
Object o = null;
if (l > 0)
{
@@ -340,6 +348,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
o = _synchronousQueue.take();
}
+
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
@@ -352,6 +361,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
catch (InterruptedException e)
{
_logger.warn("Interrupted: " + e);
+
return null;
}
finally
@@ -365,6 +375,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
{
close(false);
+
return true;
}
else
@@ -387,6 +398,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
return null;
}
+
Object o = _synchronousQueue.poll();
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -414,8 +426,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a
* JMSException is created with the linked exception set appropriately
*/
- private AbstractJMSMessage returnMessageOrThrow(Object o)
- throws JMSException
+ private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException
{
// errors are passed via the queue too since there is no way of interrupting the poll() via the API.
if (o instanceof Throwable)
@@ -425,6 +436,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
e.setLinkedException((Exception) o);
}
+
throw e;
}
else
@@ -433,7 +445,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
-
public void close() throws JMSException
{
close(true);
@@ -441,7 +452,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close(boolean sendClose) throws JMSException
{
- //synchronized (_closed)
+ // synchronized (_closed)
if (_logger.isInfoEnabled())
{
@@ -456,7 +467,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ _logger.trace(_consumerTag + " close():"
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -464,14 +476,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6);
}
}
+
if (sendClose)
{
// TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- _consumerTag, // consumerTag
- false); // nowait
+ final AMQFrame cancelFrame =
+ BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
+ false); // nowait
try
{
@@ -485,25 +497,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (AMQException e)
{
- _logger.error("Error closing consumer: " + e, e);
- throw new JMSException("Error closing consumer: " + e);
+ // _logger.error("Error closing consumer: " + e, e);
+ JMSException jmse = new JMSException("Error closing consumer: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
else
{
-// //fixme this probably is not right
-// if (!isNoConsume())
- { //done in BasicCancelOK Handler but not sending one so just deregister.
+ // //fixme this probably is not right
+ // if (!isNoConsume())
+ { // done in BasicCancelOK Handler but not sending one so just deregister.
deregisterConsumer();
}
}
- if (_messageListener != null && _receiving.get())
+ if ((_messageListener != null) && _receiving.get())
{
if (_logger.isInfoEnabled())
{
_logger.info("Interrupting thread: " + _receivingThread);
}
+
_receivingThread.interrupt();
}
}
@@ -516,7 +531,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
void markClosed()
{
-// synchronized (_closed)
+ // synchronized (_closed)
{
_closed.set(true);
@@ -524,7 +539,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " markClosed():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " markClosed():"
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -533,6 +549,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
}
+
deregisterConsumer();
}
@@ -551,22 +568,22 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag);
}
+
try
{
- AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
- messageFrame.getDeliverBody().redelivered,
- messageFrame.getDeliverBody().exchange,
- messageFrame.getDeliverBody().routingKey,
- messageFrame.getContentHeader(),
- messageFrame.getBodies());
+ AbstractJMSMessage jmsMessage =
+ _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
+ messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
+ messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
if (debug)
{
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
}
-// synchronized (_closed)
+ // synchronized (_closed)
+
{
-// if (!_closed.get())
+ // if (!_closed.get())
{
jmsMessage.setConsumer(this);
@@ -575,12 +592,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
notifyMessage(jmsMessage, channelId);
}
-// else
-// {
-// _logger.error("MESSAGE REJECTING!");
-// _session.rejectMessage(jmsMessage, true);
-// //_logger.error("MESSAGE JUST DROPPED!");
-// }
+ // else
+ // {
+ // _logger.error("MESSAGE REJECTING!");
+ // _session.rejectMessage(jmsMessage, true);
+ // //_logger.error("MESSAGE JUST DROPPED!");
+ // }
}
}
catch (Exception e)
@@ -606,11 +623,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (isMessageListenerSet())
{
- //we do not need a lock around the test above, and the dispatch below as it is invalid
- //for an application to alter an installed listener while the session is started
-// synchronized (_closed)
+ // we do not need a lock around the test above, and the dispatch below as it is invalid
+ // for an application to alter an installed listener while the session is started
+ // synchronized (_closed)
{
-// if (!_closed.get())
+ // if (!_closed.get())
{
preApplicationProcessing(jmsMessage);
@@ -641,14 +658,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
switch (_acknowledgeMode)
{
- case Session.PRE_ACKNOWLEDGE:
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- break;
- case Session.CLIENT_ACKNOWLEDGE:
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- msg.setAMQSession(_session);
- break;
+
+ case Session.PRE_ACKNOWLEDGE:
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ break;
+
+ case Session.CLIENT_ACKNOWLEDGE:
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ msg.setAMQSession(_session);
+ break;
}
}
@@ -657,47 +676,56 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
- case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- break;
- case Session.DUPS_OK_ACKNOWLEDGE:
- if (++_outstanding >= _prefetchHigh)
- {
- _dups_ok_acknowledge_send = true;
- }
- if (_outstanding <= _prefetchLow)
- {
- _dups_ok_acknowledge_send = false;
- }
- if (_dups_ok_acknowledge_send)
- {
- if (!_session.isInRecovery())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), true);
- }
- }
- break;
- case Session.AUTO_ACKNOWLEDGE:
- // we do not auto ack a message if the application code called recover()
+ case Session.CLIENT_ACKNOWLEDGE:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+
+ break;
+
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
+
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
+
+ if (_dups_ok_acknowledge_send)
+ {
if (!_session.isInRecovery())
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ _session.acknowledgeMessage(msg.getDeliveryTag(), true);
}
- break;
- case Session.SESSION_TRANSACTED:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- else
- {
- _receivedDeliveryTags.add(msg.getDeliveryTag());
- }
- break;
+ }
+
+ break;
+
+ case Session.AUTO_ACKNOWLEDGE:
+ // we do not auto ack a message if the application code called recover()
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+
+ break;
+
+ case Session.SESSION_TRANSACTED:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ _receivedDeliveryTags.add(msg.getDeliveryTag());
+ }
+
+ break;
}
}
@@ -721,14 +749,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
void notifyError(Throwable cause)
{
-// synchronized (_closed)
+ // synchronized (_closed)
{
_closed.set(true);
if (_logger.isTraceEnabled())
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " notifyError():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " notifyError():"
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously" + _closedStack.toString());
}
else
@@ -737,7 +766,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
}
- //QPID-293 can "request redelivery of this error through dispatcher"
+ // QPID-293 can "request redelivery of this error through dispatcher"
// we have no way of propagating the exception to a message listener - a JMS limitation - so we
// deal with the case where we have a synchronous receive() waiting for a message to arrive
@@ -749,10 +778,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_logger.debug("Passed exception to synchronous queue for propagation to receive()");
}
}
+
deregisterConsumer();
}
-
/**
* Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in
* the case of an error occurring.
@@ -782,7 +811,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
this.checkNotClosed();
- if (_session == null || _session.isClosed())
+ if ((_session == null) || _session.isClosed())
{
throw new javax.jms.IllegalStateException("Invalid Session");
}
@@ -817,7 +846,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return _autoClose;
}
-
public boolean isNoConsume()
{
return _noConsume;
@@ -827,10 +855,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_closeWhenNoMessages = b;
- if (_closeWhenNoMessages
- && _synchronousQueue.isEmpty()
- && _receiving.get()
- && _messageListener != null)
+ if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null))
{
_receivingThread.interrupt();
}
@@ -846,13 +871,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_logger.debug("Rejecting received messages in _receivedDTs (RQ)");
}
- //rollback received but not committed messages
+ // rollback received but not committed messages
while (!_receivedDeliveryTags.isEmpty())
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" +
- "for consumer with tag:" + _consumerTag);
+ _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
+ + "for consumer with tag:" + _consumerTag);
}
Long tag = _receivedDeliveryTags.poll();
@@ -876,14 +901,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- //rollback pending messages
+ // rollback pending messages
if (_synchronousQueue.size() > 0)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" +
- "for consumer with tag:" + _consumerTag);
+ _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)"
+ + "for consumer with tag:" + _consumerTag);
}
+
Iterator iterator = _synchronousQueue.iterator();
while (iterator.hasNext())
@@ -898,13 +924,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
}
+
iterator.remove();
}
else
{
- _logger.error("Queue contained a :" + o.getClass() +
- " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ _logger.error("Queue contained a :" + o.getClass()
+ + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
iterator.remove();
}
}
@@ -919,7 +946,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
-
public String debugIdentity()
{
return String.valueOf(_consumerTag);
diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
index d2ab6bd2c2..d1237cff49 100644
--- a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
@@ -1,5 +1,25 @@
/*
*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*
+ *
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,10 +42,35 @@ import javax.jms.JMSException;
import org.apache.qpid.AMQException;
/**
+ * JMSException does not accept wrapped exceptions in its constructor. Presumably this is because it is a relatively old
+ * Java exception class, before this was added as a default to Throwable. This exception class accepts wrapped exceptions
+ * as well as error messages, through its constructor, but is a JMSException.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Accept wrapped exceptions as a JMSException.
+ * </table>
+ *
* @author Apache Software Foundation
*/
public class JMSAMQException extends JMSException
{
+ /**
+ * Creates a JMSException, wrapping another exception class.
+ *
+ * @param message The error message.
+ * @param cause The underlying exception that caused this one. May be null if none is to be set.
+ */
+ public JMSAMQException(String message, Exception cause)
+ {
+ super(message);
+
+ if (cause != null)
+ {
+ setLinkedException(cause);
+ }
+ }
+
public JMSAMQException(AMQException s)
{
super(s.getMessage(), String.valueOf(s.getErrorCode()));
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
index 8938130417..af254fbbaf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
@@ -27,6 +27,7 @@ import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -72,7 +73,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
}
AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
@@ -93,7 +94,9 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
}
catch (IOException e)
{
- throw new JMSException(e.toString());
+ JMSException jmse = new JMSException(e.toString());
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
@@ -112,6 +115,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
{
return null;
}
+
int pos = _data.position();
_data.rewind();
// one byte left is for the end of frame marker
@@ -119,12 +123,14 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
{
// this is really redundant since pos must be zero
_data.position(pos);
+
return null;
}
else
{
String data = _data.getString(Charset.forName("UTF8").newDecoder());
_data.position(pos);
+
return data;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 41a143c544..f87b4027f6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -33,12 +33,7 @@ import javax.jms.MessageNotWriteableException;
import org.apache.commons.collections.map.ReferenceMap;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQUndefinedDestination;
-import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.client.*;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
@@ -184,7 +179,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
catch (URLSyntaxException e)
{
- throw new JMSException("Illegal value in JMS_ReplyTo property: " + replyToEncoding);
+ throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
}
_destinationCache.put(replyToEncoding, dest);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
index 6352f7029f..348a0bd152 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
@@ -384,7 +384,9 @@ public final class JMSHeaderAdapter
}
catch (AMQPInvalidClassException aice)
{
- throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ mfe.setLinkedException(aice);
+ throw mfe;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index df1400b167..caf8741280 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -33,6 +33,7 @@ import javax.jms.MessageFormatException;
import javax.jms.ObjectMessage;
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -61,14 +62,15 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
_data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
_data.setAutoExpand(true);
}
+
getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
}
/**
* Creates read only message for delivery to consumers
*/
- JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+ ByteBuffer data) throws AMQException
{
super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
}
@@ -79,6 +81,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
{
_data.release();
}
+
_data = null;
}
@@ -116,11 +119,13 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
}
catch (IOException e)
{
- throw new MessageFormatException("Message not serializable: " + e);
+ MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e);
+ mfe.setLinkedException(e);
+ throw mfe;
}
}
-
+
public Serializable getObject() throws JMSException
{
ObjectInputStream in = null;
@@ -133,17 +138,20 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
{
_data.rewind();
in = new ObjectInputStream(_data.asInputStream());
+
return (Serializable) in.readObject();
}
catch (IOException e)
{
- e.printStackTrace();
- throw new MessageFormatException("Could not deserialize message: " + e);
+ MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
+ mfe.setLinkedException(e);
+ throw mfe;
}
catch (ClassNotFoundException e)
{
- e.printStackTrace();
- throw new MessageFormatException("Could not deserialize message: " + e);
+ MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
+ mfe.setLinkedException(e);
+ throw mfe;
}
finally
{
@@ -162,8 +170,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
}
}
catch (IOException ignore)
- {
- }
+ { }
}
private static String toString(ByteBuffer data)
@@ -172,6 +179,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
{
return null;
}
+
int pos = data.position();
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java b/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java
index 4504498308..dcca55e6c2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java
@@ -31,15 +31,16 @@ import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.RealmCallback;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.log4j.Logger;
import com.sun.crypto.provider.HmacMD5;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+
public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler
{
private static final Logger _logger = Logger.getLogger(UsernameHashedPasswordCallbackHandler.class);
-
private AMQProtocolSession _protocolSession;
public void initialise(AMQProtocolSession protocolSession)
@@ -58,14 +59,15 @@ public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler
}
else if (cb instanceof PasswordCallback)
{
-
try
{
((PasswordCallback) cb).setPassword(getHash(_protocolSession.getPassword()));
}
- catch (Exception e)
+ catch (NoSuchAlgorithmException e)
{
- throw new UnsupportedCallbackException(cb);
+ UnsupportedCallbackException uce = new UnsupportedCallbackException(cb);
+ uce.initCause(e);
+ throw uce;
}
}
else
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
index 104c5bfc44..1ec3adc2eb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,7 +33,7 @@ public class AMQNoTransportForProtocolException extends AMQTransportConnectionEx
public AMQNoTransportForProtocolException(BrokerDetails details, String message)
{
- super(message);
+ super(null, message, null);
_details = details;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
index 4b17661bc3..fec7ff693c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,12 +21,12 @@
package org.apache.qpid.client.transport;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
public class AMQTransportConnectionException extends AMQException
{
- public AMQTransportConnectionException(String message)
+ public AMQTransportConnectionException(AMQConstant errorCode, String message, Throwable cause)
{
- super(message);
-
+ super(errorCode, message, cause);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 8368eee125..0bc83e9804 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,12 +26,14 @@ import java.util.Iterator;
import java.util.Map;
import org.apache.log4j.Logger;
+
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
import org.apache.qpid.client.AMQBrokerDetails;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
@@ -64,13 +66,11 @@ public class TransportConnection
int transport = getTransport(details.getTransport());
if (transport == -1)
-
{
throw new AMQNoTransportForProtocolException(details);
}
if (transport == _currentInstance)
-
{
if (transport == VM)
{
@@ -88,40 +88,42 @@ public class TransportConnection
_currentInstance = transport;
switch (transport)
-
{
- case TCP:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
- {
- public IoConnector newSocketConnector()
+
+ case TCP:
+ _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
{
- SocketConnector result;
- //FIXME - this needs to be sorted to use the new Mina MultiThread SA.
- if (Boolean.getBoolean("qpidnio"))
+ public IoConnector newSocketConnector()
{
- _logger.fatal("Using Qpid NIO - sysproperty 'qpidnio' is set.");
-// result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
+ SocketConnector result;
+ // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
+ if (Boolean.getBoolean("qpidnio"))
+ {
+ _logger.fatal("Using Qpid NIO - sysproperty 'qpidnio' is set.");
+ // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
+ }
+ // else
+
+ {
+ _logger.info("Using Mina NIO");
+ result = new SocketConnector(); // non-blocking connector
+ }
+
+ // Don't have the connector's worker thread wait around for other connections (we only use
+ // one SocketConnector per connection at the moment anyway). This allows short-running
+ // clients (like unit tests) to complete quickly.
+ result.setWorkerTimeout(0);
+
+ return result;
}
-// else
- {
- _logger.info("Using Mina NIO");
- result = new SocketConnector(); // non-blocking connector
- }
-
- // Don't have the connector's worker thread wait around for other connections (we only use
- // one SocketConnector per connection at the moment anyway). This allows short-running
- // clients (like unit tests) to complete quickly.
- result.setWorkerTimeout(0);
+ });
+ break;
- return result;
- }
- });
- break;
- case VM:
- {
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
- break;
- }
+ case VM:
+ {
+ _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+ break;
+ }
}
return _instance;
@@ -142,7 +144,8 @@ public class TransportConnection
return -1;
}
- private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException
+ private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
+ throws AMQVMBrokerCreationException
{
int port = details.getPort();
@@ -154,14 +157,14 @@ public class TransportConnection
}
else
{
- throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled.");
+ throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
+ + " does not exist. Auto create disabled.", null);
}
}
return new VmPipeTransportConnection(port);
}
-
public static void createVMBroker(int port) throws AMQVMBrokerCreationException
{
if (_acceptor == null)
@@ -192,7 +195,7 @@ public class TransportConnection
{
_logger.error(e);
- //Try and unbind provider
+ // Try and unbind provider
try
{
VmPipeAddress pipe = new VmPipeAddress(port);
@@ -203,7 +206,7 @@ public class TransportConnection
}
catch (Exception ignore)
{
- //ignore
+ // ignore
}
if (provider == null)
@@ -227,7 +230,7 @@ public class TransportConnection
because = e.getCause().toString();
}
- throw new AMQVMBrokerCreationException(port, because + " Stopped binding of InVM Qpid.AMQP");
+ throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
}
}
}
@@ -246,14 +249,14 @@ public class TransportConnection
// can't use introspection to get Provider as it is a server class.
// need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access.
- //get right constructor and pass in instancec ID - "port"
+ // get right constructor and pass in instancec ID - "port"
IoHandlerAdapter provider;
try
{
- Class[] cnstr = {Integer.class};
- Object[] params = {port};
+ Class[] cnstr = { Integer.class };
+ Object[] params = { port };
provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
- //Give the broker a second to create
+ // Give the broker a second to create
_logger.info("Created VMBroker Instance:" + port);
}
catch (Exception e)
@@ -270,8 +273,10 @@ public class TransportConnection
because = e.getCause().toString();
}
-
- throw new AMQVMBrokerCreationException(port, because + " Stopped InVM Qpid.AMQP creation");
+ AMQVMBrokerCreationException amqbce =
+ new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
+ amqbce.initCause(e);
+ throw amqbce;
}
return provider;
diff --git a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
index 607ddcc26a..4b2982fe9c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,19 +21,25 @@
package org.apache.qpid.client.vmbroker;
import org.apache.qpid.client.transport.AMQTransportConnectionException;
+import org.apache.qpid.protocol.AMQConstant;
public class AMQVMBrokerCreationException extends AMQTransportConnectionException
{
private int _port;
+ /**
+ * @param port
+ *
+ * @deprecated
+ */
public AMQVMBrokerCreationException(int port)
{
- this(port, "Unable to create vm broker");
+ this(null, port, "Unable to create vm broker", null);
}
- public AMQVMBrokerCreationException(int port, String message)
+ public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause)
{
- super(message);
+ super(errorCode, message, cause);
_port = port;
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
index 9adf04e182..6ad3fb4bae 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
@@ -101,7 +101,7 @@ public class FailoverPolicy
}
catch (Exception cnfe)
{
- throw new IllegalArgumentException("Unknown failover method:" + failoverMethod);
+ throw new IllegalArgumentException("Unknown failover method:" + failoverMethod, cnfe);
}
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/testutil/Config.java b/java/client/src/test/java/org/apache/qpid/testutil/Config.java
index 8109d20a33..b777cf93b6 100644
--- a/java/client/src/test/java/org/apache/qpid/testutil/Config.java
+++ b/java/client/src/test/java/org/apache/qpid/testutil/Config.java
@@ -172,7 +172,7 @@ public class Config
}
catch(NumberFormatException e)
{
- throw new RuntimeException("Bad port number: " + value);
+ throw new RuntimeException("Bad port number: " + value, e);
}
}
else if("-name".equalsIgnoreCase(key))
diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
index f2afa472ab..195ed79dab 100644
--- a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
+++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -3,6 +3,7 @@ package org.apache.qpid.testutil;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.log4j.Logger;
@@ -70,7 +71,7 @@ public class QpidClientConnection implements ExceptionListener
}
catch (URLSyntaxException e)
{
- throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e);
}
}
}