diff options
author | Robert Greig <rgreig@apache.org> | 2007-04-09 15:26:04 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-04-09 15:26:04 +0000 |
commit | f966c066fc10c1510c244c41958e343c682dd1a1 (patch) | |
tree | ff0d2818fe3729bb6ebdfcb8d654d17b8599538f /java/client | |
parent | b8b2e032a4a6a6ad796ce6247c581a0498b5c264 (diff) | |
download | qpid-python-f966c066fc10c1510c244c41958e343c682dd1a1.tar.gz |
Stopped throwing away exception causes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@526776 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
17 files changed, 480 insertions, 378 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java index 88bcbbbccb..f3b21e3c64 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java @@ -47,7 +47,9 @@ public class FileMessageFactory } catch (IOException e) { - throw new MessageFactoryException(e.toString()); + MessageFactoryException mfe = new MessageFactoryException(e.toString()); + mfe.initCause(e); + throw mfe; } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java index 8505d1d457..98a2c0d497 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java @@ -59,11 +59,11 @@ public class InitialContextHelper } catch (IOException e) { - throw new ContextException(e.toString()); + throw new ContextException(e.toString(), e); } catch (NamingException n) { - throw new ContextException(n.toString()); + throw new ContextException(n.toString(), n); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 2274f2964f..0e3d99eeba 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,29 @@ */ package org.apache.qpid.client; +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + +import org.apache.log4j.Logger; + import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; @@ -44,28 +67,6 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; - -import javax.jms.*; -import javax.jms.IllegalStateException; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = Logger.getLogger(AMQConnection.class); @@ -95,7 +96,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap + private final Map _sessions = new LinkedHashMap(); // fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap private String _clientName; @@ -125,15 +126,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /* * _Connected should be refactored with a suitable wait object. - */ + */ private boolean _connected; /* * The last error code that occured on the connection. Used to return the correct exception to the client - */ + */ private AMQException _lastAMQException = null; - /* * The connection meta data */ @@ -161,13 +161,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @throws AMQException * @throws URLSyntaxException */ - public AMQConnection(String broker, String username, String password, - String clientName, String virtualHost) throws AMQException, URLSyntaxException + public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) + throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + - (clientName == null ? "" : clientName) + "/" + - virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), null); + this(new AMQConnectionURL( + ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + + AMQBrokerDetails.checkTransport(broker) + "'"), null); } /** @@ -180,44 +180,38 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @throws AMQException * @throws URLSyntaxException */ - public AMQConnection(String broker, String username, String password, - String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + public AMQConnection(String broker, String username, String password, String clientName, String virtualHost, + SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + - (clientName == null ? "" : clientName) + "/" + - virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); + this(new AMQConnectionURL( + ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); } - - public AMQConnection(String host, int port, String username, String password, - String clientName, String virtualHost) throws AMQException, URLSyntaxException + public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) + throws AMQException, URLSyntaxException { this(host, port, false, username, password, clientName, virtualHost, null); } - public AMQConnection(String host, int port, String username, String password, - String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost, + SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { this(host, port, false, username, password, clientName, virtualHost, sslConfig); } - - public AMQConnection(String host, int port, boolean useSSL, String username, String password, - String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, + String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(useSSL ? - ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + - (clientName == null ? "" : clientName) + - virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" - + "," + ConnectionURL.OPTIONS_SSL + "='true'" : - ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + - (clientName == null ? "" : clientName) + - virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" - + "," + ConnectionURL.OPTIONS_SSL + "='false'" - ), sslConfig); + this(new AMQConnectionURL( + useSSL + ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'") + : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig); } public AMQConnection(String connection) throws AMQException, URLSyntaxException @@ -230,13 +224,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL(connection), sslConfig); } - public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); } + _sslConfiguration = sslConfig; if (connectionURL == null) { @@ -250,7 +244,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _password = connectionURL.getPassword(); setVirtualHost(connectionURL.getVirtualHost()); - if (connectionURL.getDefaultQueueExchangeName() != null) { _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName(); @@ -271,7 +264,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName(); } - _failoverPolicy = new FailoverPolicy(connectionURL); _protocolHandler = new AMQProtocolHandler(this); @@ -279,7 +271,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // We are not currently connected _connected = false; - Exception lastException = new Exception(); lastException.initCause(new ConnectException()); @@ -297,7 +288,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (_logger.isInfoEnabled()) { - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause()); + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), + e.getCause()); } } } @@ -323,7 +315,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - if (message == null || message.equals("")) + if ((message == null) || message.equals("")) { message = "Unable to Connect"; } @@ -336,11 +328,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString()); } + e.initCause(lastException); } throw e; } + _connectionMetaData = new QpidConnectionMetaData(this); } @@ -370,6 +364,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { virtualHost = virtualHost.substring(1); } + _virtualHost = virtualHost; } @@ -383,7 +378,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler.attainState(AMQState.CONNECTION_OPEN); _failoverPolicy.attainedConnection(); - //Again this should be changed to a suitable notify + // Again this should be changed to a suitable notify _connected = true; } catch (AMQException e) @@ -402,6 +397,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { makeBrokerConnection(bd); + return true; } catch (Exception e) @@ -410,8 +406,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info("Unable to connect to broker at " + bd); } + attemptReconnection(); } + return false; } @@ -422,6 +420,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); + return true; } catch (Exception e) @@ -437,13 +436,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (_logger.isInfoEnabled()) { - _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails()); + _logger.info(e.getMessage() + ":Unable to connect to broker at " + + _failoverPolicy.getCurrentBrokerDetails()); } } } } - //connection unsuccessful + // connection unsuccessful return false; } @@ -475,14 +475,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK); } - public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, - final int prefetch) throws JMSException + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) + throws JMSException { return createSession(transacted, acknowledgeMode, prefetch, prefetch); } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, - final int prefetchHigh, final int prefetchLow) throws JMSException + final int prefetchHigh, final int prefetchLow) throws JMSException { checkNotClosed(); if (channelLimitReached()) @@ -492,85 +492,81 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect else { return (org.apache.qpid.jms.Session) new FailoverSupport() - { - public Object operation() throws JMSException - { - int channelId = _idFactory.incrementAndGet(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Write channel open frame for channel id " + channelId); - } - - // We must create the session and register it before actually sending the frame to the server to - // open it, so that there is no window where we could receive data on the channel and not be set - // up to handle it appropriately. - AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, - prefetchHigh, prefetchLow); - _protocolHandler.addSessionByChannel(channelId, session); - registerSession(channelId, session); - - boolean success = false; - try - { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - success = true; - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error creating session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - if (!success) - { - _protocolHandler.removeSessionByChannel(channelId); - deregisterSession(channelId); - } - } - - if (_started) { - try - { - session.start(); - } - catch (AMQException e) + public Object operation() throws JMSException { - throw new JMSAMQException(e); + int channelId = _idFactory.incrementAndGet(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Write channel open frame for channel id " + channelId); + } + + // We must create the session and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AMQSession session = + new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, + prefetchLow); + _protocolHandler.addSessionByChannel(channelId, session); + registerSession(channelId, session); + + boolean success = false; + try + { + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + success = true; + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error creating session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + finally + { + if (!success) + { + _protocolHandler.removeSessionByChannel(channelId); + deregisterSession(channelId); + } + } + + if (_started) + { + try + { + session.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + return session; } - } - return session; - } - }.execute(this); + }.execute(this); } } private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) - throws AMQException + throws AMQException { // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - null), // outOfBand - ChannelOpenOkBody.class); - - //todo send low water mark when protocol allows. - //todo Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite( - BasicQosBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - false, // global - prefetchHigh, // prefetchCount - 0), // prefetchSize - BasicQosOkBody.class); + _protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), null), // outOfBand + ChannelOpenOkBody.class); + + // todo send low water mark when protocol allows. + // todo Be aware of possible changes to parameter order as versions change. + _protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), false, // global + prefetchHigh, // prefetchCount + 0), // prefetchSize + BasicQosOkBody.class); if (transacted) { @@ -580,10 +576,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion()), - TxSelectOkBody.class); + _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class); } } @@ -597,11 +591,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _protocolHandler.removeSessionByChannel(channelId); deregisterSession(channelId); - throw new AMQException("Error reopening channel " + channelId + " after failover: " + e); + throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e); } } - public void setFailoverPolicy(FailoverPolicy policy) { _failoverPolicy = policy; @@ -646,12 +639,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private boolean channelLimitReached() { - return _maximumChannelCount != 0 && _sessions.size() == _maximumChannelCount; + return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount); } public String getClientID() throws JMSException { checkNotClosed(); + return _clientName; } @@ -667,6 +661,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ConnectionMetaData getMetaData() throws JMSException { checkNotClosed(); + return _connectionMetaData; } @@ -674,6 +669,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ExceptionListener getExceptionListener() throws JMSException { checkNotClosed(); + return _exceptionListener; } @@ -707,6 +703,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new JMSAMQException(e); } } + _started = true; } } @@ -727,6 +724,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new JMSAMQException(e); } } + _started = false; } } @@ -753,7 +751,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - //adjust timeout + // adjust timeout long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS); @@ -764,7 +762,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - //adjust timeout + // adjust timeout timeout = adjustTimeout(timeout, startCloseTime); _protocolHandler.closeConnection(timeout); @@ -772,7 +770,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } catch (AMQException e) { - throw new JMSException("Error closing connection: " + e); + JMSException jmse = new JMSException("Error closing connection: " + e); + jmse.setLinkedException(e); + throw jmse; } } } @@ -786,6 +786,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { timeout = 0; } + return timeout; } @@ -804,6 +805,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect session.markClosed(); } + _sessions.clear(); } @@ -843,6 +845,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } } + _sessions.clear(); if (sessionException != null) { @@ -851,42 +854,42 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException + ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkNotClosed(); + return null; } - public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException + public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, + int maxMessages) throws JMSException { checkNotClosed(); + return null; } - public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) throws JMSException + public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, + int maxMessages) throws JMSException { checkNotClosed(); + return null; } - public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, - String messageSelector, ServerSessionPool sessionPool, - int maxMessages) - throws JMSException + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws JMSException { // TODO Auto-generated method stub checkNotClosed(); + return null; } public long getMaximumChannelCount() throws JMSException { checkNotClosed(); + return _maximumChannelCount; } @@ -975,6 +978,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { proceed = _connectionListener.preFailover(redirect); } + return proceed; } @@ -995,6 +999,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { markAllSessionsClosed(); } + return resubscribe; } else @@ -1058,12 +1063,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (cause instanceof AMQException) { - je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), "Exception thrown against " + toString() + ": " + cause); + je = + new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), + "Exception thrown against " + toString() + ": " + cause); } else { je = new JMSException("Exception thrown against " + toString() + ": " + cause); } + if (cause instanceof Exception) { je.setLinkedException((Exception) cause); @@ -1091,6 +1099,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info("Closing AMQConnection due to :" + cause.getMessage()); } + _closed.set(true); closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. } @@ -1146,9 +1155,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect buf.append("Host: ").append(String.valueOf(bd.getHost())); buf.append("\nPort: ").append(String.valueOf(bd.getPort())); } + buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost)); buf.append("\nClient ID: ").append(String.valueOf(_clientName)); - buf.append("\nActive session count: ").append(_sessions == null ? 0 : _sessions.size()); + buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size()); + return buf.toString(); } @@ -1159,11 +1170,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public Reference getReference() throws NamingException { - return new Reference( - AMQConnection.class.getName(), - new StringRefAddr(AMQConnection.class.getName(), toURL()), - AMQConnectionFactory.class.getName(), - null); // factory location + return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()), + AMQConnectionFactory.class.getName(), null); // factory location } public SSLConfiguration getSSLConfiguration() @@ -1176,19 +1184,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _defaultTopicExchangeName; } - public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName) { _defaultTopicExchangeName = defaultTopicExchangeName; } - public AMQShortString getDefaultQueueExchangeName() { return _defaultQueueExchangeName; } - public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName) { _defaultQueueExchangeName = defaultQueueExchangeName; @@ -1201,10 +1206,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQShortString getTemporaryQueueExchangeName() { - return _temporaryQueueExchangeName; //To change body of created methods use File | Settings | File Templates. + return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates. } - public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName) { _temporaryTopicExchangeName = temporaryTopicExchangeName; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 73010ce517..38c1cd8205 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.client; +import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -34,6 +34,7 @@ import javax.jms.Message; import javax.jms.MessageListener; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; @@ -138,10 +139,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private boolean _noConsume; private List<StackTraceElement> _closedStack = null; - protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, - boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, - int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -160,7 +161,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _autoClose = autoClose; _noConsume = noConsume; - //Force queue browsers not to use acknowledge modes. + // Force queue browsers not to use acknowledge modes. if (_noConsume) { _acknowledgeMode = Session.NO_ACKNOWLEDGE; @@ -175,12 +176,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public String getMessageSelector() throws JMSException { checkPreConditions(); + return _messageSelector; } public MessageListener getMessageListener() throws JMSException { checkPreConditions(); + return _messageListener.get(); } @@ -198,14 +201,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { checkPreConditions(); - //if the current listener is non-null and the session is not stopped, then - //it is an error to call this method. + // if the current listener is non-null and the session is not stopped, then + // it is an error to call this method. - //i.e. it is only valid to call this method if + // i.e. it is only valid to call this method if // - // (a) the connection is stopped, in which case the dispatcher is not running - // OR - // (b) the listener is null AND we are not receiving synchronously at present + // (a) the connection is stopped, in which case the dispatcher is not running + // OR + // (b) the listener is null AND we are not receiving synchronously at present // if (!_session.getAMQConnection().started()) @@ -215,7 +218,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { - _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination); + _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + + _destination); } } else @@ -224,6 +228,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); } + if (!_messageListener.compareAndSet(null, messageListener)) { throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started."); @@ -233,7 +238,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (messageListener != null) { - //handle case where connection has already been started, and the dispatcher has alreaded started + // handle case where connection has already been started, and the dispatcher has alreaded started // putting values on the _synchronousQueue synchronized (_session) @@ -263,10 +268,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("Another thread is already receiving."); } + if (isMessageListenerSet()) { throw new javax.jms.IllegalStateException("A listener has already been set."); } + _receivingThread = Thread.currentThread(); } @@ -331,6 +338,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { return null; } + Object o = null; if (l > 0) { @@ -340,6 +348,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { o = _synchronousQueue.take(); } + final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { @@ -352,6 +361,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer catch (InterruptedException e) { _logger.warn("Interrupted: " + e); + return null; } finally @@ -365,6 +375,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) { close(false); + return true; } else @@ -387,6 +398,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { return null; } + Object o = _synchronousQueue.poll(); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) @@ -414,8 +426,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a * JMSException is created with the linked exception set appropriately */ - private AbstractJMSMessage returnMessageOrThrow(Object o) - throws JMSException + private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException { // errors are passed via the queue too since there is no way of interrupting the poll() via the API. if (o instanceof Throwable) @@ -425,6 +436,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { e.setLinkedException((Exception) o); } + throw e; } else @@ -433,7 +445,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public void close() throws JMSException { close(true); @@ -441,7 +452,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { - //synchronized (_closed) + // synchronized (_closed) if (_logger.isInfoEnabled()) { @@ -456,7 +467,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (_closedStack != null) { - _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + _logger.trace(_consumerTag + " close():" + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -464,14 +476,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6); } } + if (sendClose) { // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - _consumerTag, // consumerTag - false); // nowait + final AMQFrame cancelFrame = + BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag + false); // nowait try { @@ -485,25 +497,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (AMQException e) { - _logger.error("Error closing consumer: " + e, e); - throw new JMSException("Error closing consumer: " + e); + // _logger.error("Error closing consumer: " + e, e); + JMSException jmse = new JMSException("Error closing consumer: " + e); + jmse.setLinkedException(e); + throw jmse; } } else { -// //fixme this probably is not right -// if (!isNoConsume()) - { //done in BasicCancelOK Handler but not sending one so just deregister. + // //fixme this probably is not right + // if (!isNoConsume()) + { // done in BasicCancelOK Handler but not sending one so just deregister. deregisterConsumer(); } } - if (_messageListener != null && _receiving.get()) + if ((_messageListener != null) && _receiving.get()) { if (_logger.isInfoEnabled()) { _logger.info("Interrupting thread: " + _receivingThread); } + _receivingThread.interrupt(); } } @@ -516,7 +531,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ void markClosed() { -// synchronized (_closed) + // synchronized (_closed) { _closed.set(true); @@ -524,7 +539,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (_closedStack != null) { - _logger.trace(_consumerTag + " markClosed():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + _logger.trace(_consumerTag + " markClosed():" + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -533,6 +549,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } } + deregisterConsumer(); } @@ -551,22 +568,22 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag); } + try { - AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, - messageFrame.getDeliverBody().redelivered, - messageFrame.getDeliverBody().exchange, - messageFrame.getDeliverBody().routingKey, - messageFrame.getContentHeader(), - messageFrame.getBodies()); + AbstractJMSMessage jmsMessage = + _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, + messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, + messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); if (debug) { _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); } -// synchronized (_closed) + // synchronized (_closed) + { -// if (!_closed.get()) + // if (!_closed.get()) { jmsMessage.setConsumer(this); @@ -575,12 +592,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer notifyMessage(jmsMessage, channelId); } -// else -// { -// _logger.error("MESSAGE REJECTING!"); -// _session.rejectMessage(jmsMessage, true); -// //_logger.error("MESSAGE JUST DROPPED!"); -// } + // else + // { + // _logger.error("MESSAGE REJECTING!"); + // _session.rejectMessage(jmsMessage, true); + // //_logger.error("MESSAGE JUST DROPPED!"); + // } } } catch (Exception e) @@ -606,11 +623,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (isMessageListenerSet()) { - //we do not need a lock around the test above, and the dispatch below as it is invalid - //for an application to alter an installed listener while the session is started -// synchronized (_closed) + // we do not need a lock around the test above, and the dispatch below as it is invalid + // for an application to alter an installed listener while the session is started + // synchronized (_closed) { -// if (!_closed.get()) + // if (!_closed.get()) { preApplicationProcessing(jmsMessage); @@ -641,14 +658,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { switch (_acknowledgeMode) { - case Session.PRE_ACKNOWLEDGE: - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - break; - case Session.CLIENT_ACKNOWLEDGE: - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - msg.setAMQSession(_session); - break; + + case Session.PRE_ACKNOWLEDGE: + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + break; + + case Session.CLIENT_ACKNOWLEDGE: + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + msg.setAMQSession(_session); + break; } } @@ -657,47 +676,56 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer msg.setJMSDestination(_destination); switch (_acknowledgeMode) { - case Session.CLIENT_ACKNOWLEDGE: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - break; - case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetchHigh) - { - _dups_ok_acknowledge_send = true; - } - if (_outstanding <= _prefetchLow) - { - _dups_ok_acknowledge_send = false; - } - if (_dups_ok_acknowledge_send) - { - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); - } - } - break; - case Session.AUTO_ACKNOWLEDGE: - // we do not auto ack a message if the application code called recover() + case Session.CLIENT_ACKNOWLEDGE: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + + break; + + case Session.DUPS_OK_ACKNOWLEDGE: + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } + + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } + + if (_dups_ok_acknowledge_send) + { if (!_session.isInRecovery()) { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); + _session.acknowledgeMessage(msg.getDeliveryTag(), true); } - break; - case Session.SESSION_TRANSACTED: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - else - { - _receivedDeliveryTags.add(msg.getDeliveryTag()); - } - break; + } + + break; + + case Session.AUTO_ACKNOWLEDGE: + // we do not auto ack a message if the application code called recover() + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + + break; + + case Session.SESSION_TRANSACTED: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + _receivedDeliveryTags.add(msg.getDeliveryTag()); + } + + break; } } @@ -721,14 +749,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer void notifyError(Throwable cause) { -// synchronized (_closed) + // synchronized (_closed) { _closed.set(true); if (_logger.isTraceEnabled()) { if (_closedStack != null) { - _logger.trace(_consumerTag + " notifyError():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + _logger.trace(_consumerTag + " notifyError():" + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously" + _closedStack.toString()); } else @@ -737,7 +766,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } } - //QPID-293 can "request redelivery of this error through dispatcher" + // QPID-293 can "request redelivery of this error through dispatcher" // we have no way of propagating the exception to a message listener - a JMS limitation - so we // deal with the case where we have a synchronous receive() waiting for a message to arrive @@ -749,10 +778,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _logger.debug("Passed exception to synchronous queue for propagation to receive()"); } } + deregisterConsumer(); } - /** * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in * the case of an error occurring. @@ -782,7 +811,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer this.checkNotClosed(); - if (_session == null || _session.isClosed()) + if ((_session == null) || _session.isClosed()) { throw new javax.jms.IllegalStateException("Invalid Session"); } @@ -817,7 +846,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return _autoClose; } - public boolean isNoConsume() { return _noConsume; @@ -827,10 +855,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _closeWhenNoMessages = b; - if (_closeWhenNoMessages - && _synchronousQueue.isEmpty() - && _receiving.get() - && _messageListener != null) + if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null)) { _receivingThread.interrupt(); } @@ -846,13 +871,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _logger.debug("Rejecting received messages in _receivedDTs (RQ)"); } - //rollback received but not committed messages + // rollback received but not committed messages while (!_receivedDeliveryTags.isEmpty()) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" + - "for consumer with tag:" + _consumerTag); + _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" + + "for consumer with tag:" + _consumerTag); } Long tag = _receivedDeliveryTags.poll(); @@ -876,14 +901,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - //rollback pending messages + // rollback pending messages if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" + - "for consumer with tag:" + _consumerTag); + _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" + + "for consumer with tag:" + _consumerTag); } + Iterator iterator = _synchronousQueue.iterator(); while (iterator.hasNext()) @@ -898,13 +924,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); } + iterator.remove(); } else { - _logger.error("Queue contained a :" + o.getClass() + - " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + _logger.error("Queue contained a :" + o.getClass() + + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); iterator.remove(); } } @@ -919,7 +946,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public String debugIdentity() { return String.valueOf(_consumerTag); diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java index d2ab6bd2c2..d1237cff49 100644 --- a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java +++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java @@ -1,5 +1,25 @@ /* * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/* + * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,10 +42,35 @@ import javax.jms.JMSException; import org.apache.qpid.AMQException; /** + * JMSException does not accept wrapped exceptions in its constructor. Presumably this is because it is a relatively old + * Java exception class, before this was added as a default to Throwable. This exception class accepts wrapped exceptions + * as well as error messages, through its constructor, but is a JMSException. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Accept wrapped exceptions as a JMSException. + * </table> + * * @author Apache Software Foundation */ public class JMSAMQException extends JMSException { + /** + * Creates a JMSException, wrapping another exception class. + * + * @param message The error message. + * @param cause The underlying exception that caused this one. May be null if none is to be set. + */ + public JMSAMQException(String message, Exception cause) + { + super(message); + + if (cause != null) + { + setLinkedException(cause); + } + } + public JMSAMQException(AMQException s) { super(s.getMessage(), String.valueOf(s.getErrorCode())); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java index 8938130417..af254fbbaf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -27,6 +27,7 @@ import javax.jms.JMSException; import javax.jms.MessageEOFException; import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -72,7 +73,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage } AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException + AMQShortString routingKey, ByteBuffer data) throws AMQException { // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); @@ -93,7 +94,9 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage } catch (IOException e) { - throw new JMSException(e.toString()); + JMSException jmse = new JMSException(e.toString()); + jmse.setLinkedException(e); + throw jmse; } } @@ -112,6 +115,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage { return null; } + int pos = _data.position(); _data.rewind(); // one byte left is for the end of frame marker @@ -119,12 +123,14 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage { // this is really redundant since pos must be zero _data.position(pos); + return null; } else { String data = _data.getString(Charset.forName("UTF8").newDecoder()); _data.position(pos); + return data; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 41a143c544..f87b4027f6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -33,12 +33,7 @@ import javax.jms.MessageNotWriteableException; import org.apache.commons.collections.map.ReferenceMap; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.AMQUndefinedDestination; -import org.apache.qpid.client.BasicMessageConsumer; -import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.client.*; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; @@ -184,7 +179,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } catch (URLSyntaxException e) { - throw new JMSException("Illegal value in JMS_ReplyTo property: " + replyToEncoding); + throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e); } _destinationCache.put(replyToEncoding, dest); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index 6352f7029f..348a0bd152 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -384,7 +384,9 @@ public final class JMSHeaderAdapter }
catch (AMQPInvalidClassException aice)
{
- throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ mfe.setLinkedException(aice);
+ throw mfe;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index df1400b167..caf8741280 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -33,6 +33,7 @@ import javax.jms.MessageFormatException; import javax.jms.ObjectMessage; import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -61,14 +62,15 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); _data.setAutoExpand(true); } + getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); } /** * Creates read only message for delivery to consumers */ - JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException + JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, + ByteBuffer data) throws AMQException { super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); } @@ -79,6 +81,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag { _data.release(); } + _data = null; } @@ -116,11 +119,13 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag } catch (IOException e) { - throw new MessageFormatException("Message not serializable: " + e); + MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e); + mfe.setLinkedException(e); + throw mfe; } } - + public Serializable getObject() throws JMSException { ObjectInputStream in = null; @@ -133,17 +138,20 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag { _data.rewind(); in = new ObjectInputStream(_data.asInputStream()); + return (Serializable) in.readObject(); } catch (IOException e) { - e.printStackTrace(); - throw new MessageFormatException("Could not deserialize message: " + e); + MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); + mfe.setLinkedException(e); + throw mfe; } catch (ClassNotFoundException e) { - e.printStackTrace(); - throw new MessageFormatException("Could not deserialize message: " + e); + MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); + mfe.setLinkedException(e); + throw mfe; } finally { @@ -162,8 +170,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag } } catch (IOException ignore) - { - } + { } } private static String toString(ByteBuffer data) @@ -172,6 +179,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag { return null; } + int pos = data.position(); try { diff --git a/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java b/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java index 4504498308..dcca55e6c2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java @@ -31,15 +31,16 @@ import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.RealmCallback; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.log4j.Logger; import com.sun.crypto.provider.HmacMD5; +import org.apache.log4j.Logger; + +import org.apache.qpid.client.protocol.AMQProtocolSession; + public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler { private static final Logger _logger = Logger.getLogger(UsernameHashedPasswordCallbackHandler.class); - private AMQProtocolSession _protocolSession; public void initialise(AMQProtocolSession protocolSession) @@ -58,14 +59,15 @@ public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler } else if (cb instanceof PasswordCallback) { - try { ((PasswordCallback) cb).setPassword(getHash(_protocolSession.getPassword())); } - catch (Exception e) + catch (NoSuchAlgorithmException e) { - throw new UnsupportedCallbackException(cb); + UnsupportedCallbackException uce = new UnsupportedCallbackException(cb); + uce.initCause(e); + throw uce; } } else diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java index 104c5bfc44..1ec3adc2eb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,7 +33,7 @@ public class AMQNoTransportForProtocolException extends AMQTransportConnectionEx public AMQNoTransportForProtocolException(BrokerDetails details, String message) { - super(message); + super(null, message, null); _details = details; } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java index 4b17661bc3..fec7ff693c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,12 +21,12 @@ package org.apache.qpid.client.transport; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; public class AMQTransportConnectionException extends AMQException { - public AMQTransportConnectionException(String message) + public AMQTransportConnectionException(AMQConstant errorCode, String message, Throwable cause) { - super(message); - + super(errorCode, message, cause); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 8368eee125..0bc83e9804 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,12 +26,14 @@ import java.util.Iterator; import java.util.Map; import org.apache.log4j.Logger; + import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; + import org.apache.qpid.client.AMQBrokerDetails; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; @@ -64,13 +66,11 @@ public class TransportConnection int transport = getTransport(details.getTransport()); if (transport == -1) - { throw new AMQNoTransportForProtocolException(details); } if (transport == _currentInstance) - { if (transport == VM) { @@ -88,40 +88,42 @@ public class TransportConnection _currentInstance = transport; switch (transport) - { - case TCP: - _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() - { - public IoConnector newSocketConnector() + + case TCP: + _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() { - SocketConnector result; - //FIXME - this needs to be sorted to use the new Mina MultiThread SA. - if (Boolean.getBoolean("qpidnio")) + public IoConnector newSocketConnector() { - _logger.fatal("Using Qpid NIO - sysproperty 'qpidnio' is set."); -// result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector + SocketConnector result; + // FIXME - this needs to be sorted to use the new Mina MultiThread SA. + if (Boolean.getBoolean("qpidnio")) + { + _logger.fatal("Using Qpid NIO - sysproperty 'qpidnio' is set."); + // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector + } + // else + + { + _logger.info("Using Mina NIO"); + result = new SocketConnector(); // non-blocking connector + } + + // Don't have the connector's worker thread wait around for other connections (we only use + // one SocketConnector per connection at the moment anyway). This allows short-running + // clients (like unit tests) to complete quickly. + result.setWorkerTimeout(0); + + return result; } -// else - { - _logger.info("Using Mina NIO"); - result = new SocketConnector(); // non-blocking connector - } - - // Don't have the connector's worker thread wait around for other connections (we only use - // one SocketConnector per connection at the moment anyway). This allows short-running - // clients (like unit tests) to complete quickly. - result.setWorkerTimeout(0); + }); + break; - return result; - } - }); - break; - case VM: - { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); - break; - } + case VM: + { + _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); + break; + } } return _instance; @@ -142,7 +144,8 @@ public class TransportConnection return -1; } - private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException + private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) + throws AMQVMBrokerCreationException { int port = details.getPort(); @@ -154,14 +157,14 @@ public class TransportConnection } else { - throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled."); + throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port + + " does not exist. Auto create disabled.", null); } } return new VmPipeTransportConnection(port); } - public static void createVMBroker(int port) throws AMQVMBrokerCreationException { if (_acceptor == null) @@ -192,7 +195,7 @@ public class TransportConnection { _logger.error(e); - //Try and unbind provider + // Try and unbind provider try { VmPipeAddress pipe = new VmPipeAddress(port); @@ -203,7 +206,7 @@ public class TransportConnection } catch (Exception ignore) { - //ignore + // ignore } if (provider == null) @@ -227,7 +230,7 @@ public class TransportConnection because = e.getCause().toString(); } - throw new AMQVMBrokerCreationException(port, because + " Stopped binding of InVM Qpid.AMQP"); + throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e); } } } @@ -246,14 +249,14 @@ public class TransportConnection // can't use introspection to get Provider as it is a server class. // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access. - //get right constructor and pass in instancec ID - "port" + // get right constructor and pass in instancec ID - "port" IoHandlerAdapter provider; try { - Class[] cnstr = {Integer.class}; - Object[] params = {port}; + Class[] cnstr = { Integer.class }; + Object[] params = { port }; provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); - //Give the broker a second to create + // Give the broker a second to create _logger.info("Created VMBroker Instance:" + port); } catch (Exception e) @@ -270,8 +273,10 @@ public class TransportConnection because = e.getCause().toString(); } - - throw new AMQVMBrokerCreationException(port, because + " Stopped InVM Qpid.AMQP creation"); + AMQVMBrokerCreationException amqbce = + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); + amqbce.initCause(e); + throw amqbce; } return provider; diff --git a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java index 607ddcc26a..4b2982fe9c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java +++ b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,19 +21,25 @@ package org.apache.qpid.client.vmbroker; import org.apache.qpid.client.transport.AMQTransportConnectionException; +import org.apache.qpid.protocol.AMQConstant; public class AMQVMBrokerCreationException extends AMQTransportConnectionException { private int _port; + /** + * @param port + * + * @deprecated + */ public AMQVMBrokerCreationException(int port) { - this(port, "Unable to create vm broker"); + this(null, port, "Unable to create vm broker", null); } - public AMQVMBrokerCreationException(int port, String message) + public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause) { - super(message); + super(errorCode, message, cause); _port = port; } diff --git a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java index 9adf04e182..6ad3fb4bae 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java +++ b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java @@ -101,7 +101,7 @@ public class FailoverPolicy } catch (Exception cnfe) { - throw new IllegalArgumentException("Unknown failover method:" + failoverMethod); + throw new IllegalArgumentException("Unknown failover method:" + failoverMethod, cnfe); } } } diff --git a/java/client/src/test/java/org/apache/qpid/testutil/Config.java b/java/client/src/test/java/org/apache/qpid/testutil/Config.java index 8109d20a33..b777cf93b6 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/Config.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/Config.java @@ -172,7 +172,7 @@ public class Config } catch(NumberFormatException e) { - throw new RuntimeException("Bad port number: " + value); + throw new RuntimeException("Bad port number: " + value, e); } } else if("-name".equalsIgnoreCase(key)) diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java index f2afa472ab..195ed79dab 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -3,6 +3,7 @@ package org.apache.qpid.testutil; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.url.URLSyntaxException; import org.apache.log4j.Logger; @@ -70,7 +71,7 @@ public class QpidClientConnection implements ExceptionListener } catch (URLSyntaxException e) { - throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e); } } } |