summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java61
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java4
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java355
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java60
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java31
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java75
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java164
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java26
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java45
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java35
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java28
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java33
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java104
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java72
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java95
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java60
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java30
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java109
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java13
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java16
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java76
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/testutil/Config.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java3
38 files changed, 1075 insertions, 511 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
index b199d41432..6a7626c51d 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,33 +7,35 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
*/
package org.apache.qpid.example.publisher;
-import org.apache.log4j.Logger;
-
import java.io.File;
+import javax.jms.JMSException;
+
+import org.apache.log4j.Logger;
+
import org.apache.qpid.example.shared.FileUtils;
import org.apache.qpid.example.shared.Statics;
-import javax.jms.JMSException;
-
/**
* Class that sends message files to the Publisher to distribute
* using files as input
* Must set properties for host in properties file or uses in vm broker
*/
-public class FileMessageDispatcher {
+public class FileMessageDispatcher
+{
protected static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class);
@@ -48,30 +51,30 @@ public class FileMessageDispatcher {
public static void main(String[] args)
{
- //Check command line args ok - must provide a path or file for us to dispatch
+ // Check command line args ok - must provide a path or file for us to dispatch
if (args.length == 0)
{
- System.err.println("Usage: FileMessageDispatcher <filesToDispatch>" + "");
+ System.out.println("Usage: FileMessageDispatcher <filesToDispatch>" + "");
}
else
{
try
{
- //publish message(s) from file(s) to configured queue
+ // publish message(s) from file(s) to configured queue
publish(args[0]);
- //Move payload file(s) to archive location as no error
+ // Move payload file(s) to archive location as no error
FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH));
}
- catch(Exception e)
+ catch (Exception e)
{
- //log error and exit
+ // log error and exit
_logger.error("Error trying to dispatch message: " + e);
System.exit(1);
}
finally
{
- //clean up before exiting
+ // clean up before exiting
if (getPublisher() != null)
{
getPublisher().cleanup();
@@ -98,10 +101,10 @@ public class FileMessageDispatcher {
File tempFile = new File(path);
if (tempFile.isDirectory())
{
- //while more files in dir publish them
+ // while more files in dir publish them
File[] files = tempFile.listFiles();
- if (files == null || files.length == 0)
+ if ((files == null) || (files.length == 0))
{
_logger.info("FileMessageDispatcher - No files to publish in input directory: " + tempFile);
}
@@ -109,10 +112,10 @@ public class FileMessageDispatcher {
{
for (File file : files)
{
- //Create message factory passing in payload path
+ // Create message factory passing in payload path
FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString());
- //Send the message generated from the payload using the _publisher
+ // Send the message generated from the payload using the _publisher
getPublisher().sendMessage(factory.createEventMessage());
}
@@ -120,11 +123,11 @@ public class FileMessageDispatcher {
}
else
{
- //handle a single file
- //Create message factory passing in payload path
- FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(),tempFile.toString());
+ // handle a single file
+ // Create message factory passing in payload path
+ FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), tempFile.toString());
- //Send the message generated from the payload using the _publisher
+ // Send the message generated from the payload using the _publisher
getPublisher().sendMessage(factory.createEventMessage());
}
}
@@ -145,15 +148,15 @@ public class FileMessageDispatcher {
*/
private static Publisher getPublisher()
{
- if (_publisher != null)
- {
- return _publisher;
- }
+ if (_publisher != null)
+ {
+ return _publisher;
+ }
- //Create a _publisher
- _publisher = new Publisher();
+ // Create a _publisher
+ _publisher = new Publisher();
- return _publisher;
+ return _publisher;
}
}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
index 88bcbbbccb..f3b21e3c64 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
@@ -47,7 +47,9 @@ public class FileMessageFactory
}
catch (IOException e)
{
- throw new MessageFactoryException(e.toString());
+ MessageFactoryException mfe = new MessageFactoryException(e.toString());
+ mfe.initCause(e);
+ throw mfe;
}
}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
index 8505d1d457..98a2c0d497 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
@@ -59,11 +59,11 @@ public class InitialContextHelper
}
catch (IOException e)
{
- throw new ContextException(e.toString());
+ throw new ContextException(e.toString(), e);
}
catch (NamingException n)
{
- throw new ContextException(n.toString());
+ throw new ContextException(n.toString(), n);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 413524b6d8..0e3d99eeba 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,29 @@
*/
package org.apache.qpid.client;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
+import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
@@ -44,28 +67,6 @@ import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.log4j.Logger;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = Logger.getLogger(AMQConnection.class);
@@ -95,7 +96,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
+ private final Map _sessions = new LinkedHashMap(); // fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
private String _clientName;
@@ -125,15 +126,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/*
* _Connected should be refactored with a suitable wait object.
- */
+ */
private boolean _connected;
/*
* The last error code that occured on the connection. Used to return the correct exception to the client
- */
+ */
private AMQException _lastAMQException = null;
-
/*
* The connection meta data
*/
@@ -149,6 +149,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+ private static final long DEFAULT_TIMEOUT = 1000 * 30;
/**
* @param broker brokerdetails
@@ -160,13 +161,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* @throws AMQException
* @throws URLSyntaxException
*/
- public AMQConnection(String broker, String username, String password,
- String clientName, String virtualHost) throws AMQException, URLSyntaxException
+ public AMQConnection(String broker, String username, String password, String clientName, String virtualHost)
+ throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" +
- (clientName == null ? "" : clientName) + "/" +
- virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), null);
+ this(new AMQConnectionURL(
+ ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
+ + AMQBrokerDetails.checkTransport(broker) + "'"), null);
}
/**
@@ -179,44 +180,38 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* @throws AMQException
* @throws URLSyntaxException
*/
- public AMQConnection(String broker, String username, String password,
- String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ public AMQConnection(String broker, String username, String password, String clientName, String virtualHost,
+ SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" +
- (clientName == null ? "" : clientName) + "/" +
- virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
+ this(new AMQConnectionURL(
+ ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
+ + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
}
-
- public AMQConnection(String host, int port, String username, String password,
- String clientName, String virtualHost) throws AMQException, URLSyntaxException
+ public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
+ throws AMQException, URLSyntaxException
{
this(host, port, false, username, password, clientName, virtualHost, null);
}
- public AMQConnection(String host, int port, String username, String password,
- String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost,
+ SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
this(host, port, false, username, password, clientName, virtualHost, sslConfig);
}
-
- public AMQConnection(String host, int port, boolean useSSL, String username, String password,
- String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName,
+ String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(useSSL ?
- ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" +
- (clientName == null ? "" : clientName) +
- virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
- + "," + ConnectionURL.OPTIONS_SSL + "='true'" :
- ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" +
- (clientName == null ? "" : clientName) +
- virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
- + "," + ConnectionURL.OPTIONS_SSL + "='false'"
- ), sslConfig);
+ this(new AMQConnectionURL(
+ useSSL
+ ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'")
+ : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig);
}
public AMQConnection(String connection) throws AMQException, URLSyntaxException
@@ -229,13 +224,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
this(new AMQConnectionURL(connection), sslConfig);
}
-
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
if (_logger.isInfoEnabled())
{
_logger.info("Connection:" + connectionURL);
}
+
_sslConfiguration = sslConfig;
if (connectionURL == null)
{
@@ -249,7 +244,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_password = connectionURL.getPassword();
setVirtualHost(connectionURL.getVirtualHost());
-
if (connectionURL.getDefaultQueueExchangeName() != null)
{
_defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
@@ -270,7 +264,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
}
-
_failoverPolicy = new FailoverPolicy(connectionURL);
_protocolHandler = new AMQProtocolHandler(this);
@@ -278,7 +271,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// We are not currently connected
_connected = false;
-
Exception lastException = new Exception();
lastException.initCause(new ConnectException());
@@ -296,7 +288,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (_logger.isInfoEnabled())
{
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause());
+ _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(),
+ e.getCause());
}
}
}
@@ -322,7 +315,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- if (message == null || message.equals(""))
+ if ((message == null) || message.equals(""))
{
message = "Unable to Connect";
}
@@ -335,11 +328,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString());
}
+
e.initCause(lastException);
}
throw e;
}
+
_connectionMetaData = new QpidConnectionMetaData(this);
}
@@ -369,6 +364,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
virtualHost = virtualHost.substring(1);
}
+
_virtualHost = virtualHost;
}
@@ -382,7 +378,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_protocolHandler.attainState(AMQState.CONNECTION_OPEN);
_failoverPolicy.attainedConnection();
- //Again this should be changed to a suitable notify
+ // Again this should be changed to a suitable notify
_connected = true;
}
catch (AMQException e)
@@ -401,6 +397,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
makeBrokerConnection(bd);
+
return true;
}
catch (Exception e)
@@ -409,8 +406,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Unable to connect to broker at " + bd);
}
+
attemptReconnection();
}
+
return false;
}
@@ -421,6 +420,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
+
return true;
}
catch (Exception e)
@@ -436,13 +436,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (_logger.isInfoEnabled())
{
- _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails());
+ _logger.info(e.getMessage() + ":Unable to connect to broker at "
+ + _failoverPolicy.getCurrentBrokerDetails());
}
}
}
}
- //connection unsuccessful
+ // connection unsuccessful
return false;
}
@@ -474,14 +475,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
}
- public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
- final int prefetch) throws JMSException
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
+ throws JMSException
{
return createSession(transacted, acknowledgeMode, prefetch, prefetch);
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
- final int prefetchHigh, final int prefetchLow) throws JMSException
+ final int prefetchHigh, final int prefetchLow) throws JMSException
{
checkNotClosed();
if (channelLimitReached())
@@ -491,85 +492,81 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
else
{
return (org.apache.qpid.jms.Session) new FailoverSupport()
- {
- public Object operation() throws JMSException
- {
- int channelId = _idFactory.incrementAndGet();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Write channel open frame for channel id " + channelId);
- }
-
- // We must create the session and register it before actually sending the frame to the server to
- // open it, so that there is no window where we could receive data on the channel and not be set
- // up to handle it appropriately.
- AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode,
- prefetchHigh, prefetchLow);
- _protocolHandler.addSessionByChannel(channelId, session);
- registerSession(channelId, session);
-
- boolean success = false;
- try
- {
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- success = true;
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error creating session: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
- finally
- {
- if (!success)
- {
- _protocolHandler.removeSessionByChannel(channelId);
- deregisterSession(channelId);
- }
- }
-
- if (_started)
{
- try
- {
- session.start();
- }
- catch (AMQException e)
+ public Object operation() throws JMSException
{
- throw new JMSAMQException(e);
+ int channelId = _idFactory.incrementAndGet();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Write channel open frame for channel id " + channelId);
+ }
+
+ // We must create the session and register it before actually sending the frame to the server to
+ // open it, so that there is no window where we could receive data on the channel and not be set
+ // up to handle it appropriately.
+ AMQSession session =
+ new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
+ prefetchLow);
+ _protocolHandler.addSessionByChannel(channelId, session);
+ registerSession(channelId, session);
+
+ boolean success = false;
+ try
+ {
+ createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ success = true;
+ }
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error creating session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ finally
+ {
+ if (!success)
+ {
+ _protocolHandler.removeSessionByChannel(channelId);
+ deregisterSession(channelId);
+ }
+ }
+
+ if (_started)
+ {
+ try
+ {
+ session.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ return session;
}
- }
- return session;
- }
- }.execute(this);
+ }.execute(this);
}
}
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException
+ throws AMQException
{
// TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(
- ChannelOpenBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- null), // outOfBand
- ChannelOpenOkBody.class);
-
- //todo send low water mark when protocol allows.
- //todo Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(
- BasicQosBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- false, // global
- prefetchHigh, // prefetchCount
- 0), // prefetchSize
- BasicQosOkBody.class);
+ _protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), null), // outOfBand
+ ChannelOpenOkBody.class);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ _protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), false, // global
+ prefetchHigh, // prefetchCount
+ 0), // prefetchSize
+ BasicQosOkBody.class);
if (transacted)
{
@@ -579,10 +576,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
// TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()),
- TxSelectOkBody.class);
+ _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class);
}
}
@@ -596,11 +591,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_protocolHandler.removeSessionByChannel(channelId);
deregisterSession(channelId);
- throw new AMQException("Error reopening channel " + channelId + " after failover: " + e);
+ throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e);
}
}
-
public void setFailoverPolicy(FailoverPolicy policy)
{
_failoverPolicy = policy;
@@ -645,12 +639,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private boolean channelLimitReached()
{
- return _maximumChannelCount != 0 && _sessions.size() == _maximumChannelCount;
+ return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount);
}
public String getClientID() throws JMSException
{
checkNotClosed();
+
return _clientName;
}
@@ -666,6 +661,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ConnectionMetaData getMetaData() throws JMSException
{
checkNotClosed();
+
return _connectionMetaData;
}
@@ -673,6 +669,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ExceptionListener getExceptionListener() throws JMSException
{
checkNotClosed();
+
return _exceptionListener;
}
@@ -706,6 +703,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new JMSAMQException(e);
}
}
+
_started = true;
}
}
@@ -726,13 +724,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new JMSAMQException(e);
}
}
+
_started = false;
}
}
public void close() throws JMSException
{
- close(-1);
+ close(DEFAULT_TIMEOUT);
}
public void close(long timeout) throws JMSException
@@ -752,7 +751,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
- //adjust timeout
+ // adjust timeout
long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
_taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
@@ -763,7 +762,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- //adjust timeout
+ // adjust timeout
timeout = adjustTimeout(timeout, startCloseTime);
_protocolHandler.closeConnection(timeout);
@@ -771,7 +770,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
catch (AMQException e)
{
- throw new JMSException("Error closing connection: " + e);
+ JMSException jmse = new JMSException("Error closing connection: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
}
@@ -785,6 +786,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
timeout = 0;
}
+
return timeout;
}
@@ -803,6 +805,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
session.markClosed();
}
+
_sessions.clear();
}
@@ -842,6 +845,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
}
+
_sessions.clear();
if (sessionException != null)
{
@@ -850,42 +854,42 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
- ServerSessionPool sessionPool,
- int maxMessages) throws JMSException
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
checkNotClosed();
+
return null;
}
- public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
- ServerSessionPool sessionPool,
- int maxMessages) throws JMSException
+ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException
{
checkNotClosed();
+
return null;
}
- public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
- ServerSessionPool sessionPool,
- int maxMessages) throws JMSException
+ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException
{
checkNotClosed();
+
return null;
}
- public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
- String messageSelector, ServerSessionPool sessionPool,
- int maxMessages)
- throws JMSException
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
// TODO Auto-generated method stub
checkNotClosed();
+
return null;
}
public long getMaximumChannelCount() throws JMSException
{
checkNotClosed();
+
return _maximumChannelCount;
}
@@ -974,6 +978,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
proceed = _connectionListener.preFailover(redirect);
}
+
return proceed;
}
@@ -994,6 +999,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
markAllSessionsClosed();
}
+
return resubscribe;
}
else
@@ -1057,12 +1063,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (cause instanceof AMQException)
{
- je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), "Exception thrown against " + toString() + ": " + cause);
+ je =
+ new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()),
+ "Exception thrown against " + toString() + ": " + cause);
}
else
{
je = new JMSException("Exception thrown against " + toString() + ": " + cause);
}
+
if (cause instanceof Exception)
{
je.setLinkedException((Exception) cause);
@@ -1090,6 +1099,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Closing AMQConnection due to :" + cause.getMessage());
}
+
_closed.set(true);
closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
}
@@ -1145,9 +1155,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
buf.append("Host: ").append(String.valueOf(bd.getHost()));
buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
}
+
buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost));
buf.append("\nClient ID: ").append(String.valueOf(_clientName));
- buf.append("\nActive session count: ").append(_sessions == null ? 0 : _sessions.size());
+ buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size());
+
return buf.toString();
}
@@ -1158,11 +1170,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public Reference getReference() throws NamingException
{
- return new Reference(
- AMQConnection.class.getName(),
- new StringRefAddr(AMQConnection.class.getName(), toURL()),
- AMQConnectionFactory.class.getName(),
- null); // factory location
+ return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()),
+ AMQConnectionFactory.class.getName(), null); // factory location
}
public SSLConfiguration getSSLConfiguration()
@@ -1175,19 +1184,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _defaultTopicExchangeName;
}
-
public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName)
{
_defaultTopicExchangeName = defaultTopicExchangeName;
}
-
public AMQShortString getDefaultQueueExchangeName()
{
return _defaultQueueExchangeName;
}
-
public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName)
{
_defaultQueueExchangeName = defaultQueueExchangeName;
@@ -1200,10 +1206,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQShortString getTemporaryQueueExchangeName()
{
- return _temporaryQueueExchangeName; //To change body of created methods use File | Settings | File Templates.
+ return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates.
}
-
public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)
{
_temporaryTopicExchangeName = temporaryTopicExchangeName;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
index 0dcc544ea8..b3fbd1f510 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
-
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
@@ -33,6 +27,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.StringTokenizer;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
public class AMQConnectionURL implements ConnectionURL
{
private String _url;
@@ -49,7 +49,6 @@ public class AMQConnectionURL implements ConnectionURL
private AMQShortString _temporaryTopicExchangeName;
private AMQShortString _temporaryQueueExchangeName;
-
public AMQConnectionURL(String fullURL) throws URLSyntaxException
{
_url = fullURL;
@@ -58,18 +57,18 @@ public class AMQConnectionURL implements ConnectionURL
_failoverOptions = new HashMap<String, String>();
// Connection URL format
- //amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''"
+ // amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''"
// Options are of course optional except for requiring a single broker in the broker list.
try
{
URI connection = new URI(fullURL);
- if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL)))
+ if ((connection.getScheme() == null) || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL)))
{
throw new URISyntaxException(fullURL, "Not an AMQP URL");
}
- if (connection.getHost() == null || connection.getHost().equals(""))
+ if ((connection.getHost() == null) || connection.getHost().equals(""))
{
String uid = AMQConnectionFactory.getUniqueClientID();
if (uid == null)
@@ -91,7 +90,7 @@ public class AMQConnectionURL implements ConnectionURL
if (userInfo == null)
{
- //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs
+ // Fix for Java 1.5 which doesn't parse UserInfo for non http URIs
userInfo = connection.getAuthority();
if (userInfo != null)
@@ -112,16 +111,16 @@ public class AMQConnectionURL implements ConnectionURL
if (userInfo == null)
{
- throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
- "User information not found on url", fullURL);
+ throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, "User information not found on url", fullURL);
}
else
{
parseUserInfo(userInfo);
}
+
String virtualHost = connection.getPath();
- if (virtualHost != null && (!virtualHost.equals("")))
+ if ((virtualHost != null) && (!virtualHost.equals("")))
{
setVirtualHost(virtualHost);
}
@@ -130,7 +129,7 @@ public class AMQConnectionURL implements ConnectionURL
int authLength = connection.getAuthority().length();
int start = AMQ_PROTOCOL.length() + 3;
int testIndex = start + authLength;
- if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?')
+ if ((testIndex < fullURL.length()) && (fullURL.charAt(testIndex) == '?'))
{
throw URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
}
@@ -141,14 +140,9 @@ public class AMQConnectionURL implements ConnectionURL
}
-
URLHelper.parseOptions(_options, connection.getQuery());
processOptions();
-
- //Fragment is #string (not used)
- //System.out.println(connection.getFragment());
-
}
catch (URISyntaxException uris)
{
@@ -165,11 +159,10 @@ public class AMQConnectionURL implements ConnectionURL
}
else
{
- if (slash != 0 && fullURL.charAt(slash - 1) == ':')
+ if ((slash != 0) && (fullURL.charAt(slash - 1) == ':'))
{
throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2,
- "Virtual host looks like a windows path, forward slash not allowed in URL",
- fullURL);
+ "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
}
else
{
@@ -182,14 +175,14 @@ public class AMQConnectionURL implements ConnectionURL
private void parseUserInfo(String userinfo) throws URLSyntaxException
{
- //user info = user:pass
+ // user info = user:pass
int colonIndex = userinfo.indexOf(':');
if (colonIndex == -1)
{
throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
- "Null password in user information not allowed.", _url);
+ "Null password in user information not allowed.", _url);
}
else
{
@@ -205,7 +198,7 @@ public class AMQConnectionURL implements ConnectionURL
{
String brokerlist = _options.get(OPTIONS_BROKERLIST);
- //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
+ // brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR);
while (st.hasMoreTokens())
@@ -244,19 +237,16 @@ public class AMQConnectionURL implements ConnectionURL
_defaultTopicExchangeName = new AMQShortString(_options.get(OPTIONS_DEFAULT_TOPIC_EXCHANGE));
}
-
if (_options.containsKey(OPTIONS_DEFAULT_QUEUE_EXCHANGE))
{
_defaultQueueExchangeName = new AMQShortString(_options.get(OPTIONS_DEFAULT_QUEUE_EXCHANGE));
}
-
if (_options.containsKey(OPTIONS_TEMPORARY_QUEUE_EXCHANGE))
{
_temporaryQueueExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_QUEUE_EXCHANGE));
}
-
if (_options.containsKey(OPTIONS_TEMPORARY_TOPIC_EXCHANGE))
{
_temporaryTopicExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_TOPIC_EXCHANGE));
@@ -439,12 +429,11 @@ public class AMQConnectionURL implements ConnectionURL
return sb.toString();
}
-
public static void main(String[] args) throws URLSyntaxException
{
-
- String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'";
- //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''";
+ String url2 =
+ "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'";
+ // "amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''";
ConnectionURL connectionurl2 = new AMQConnectionURL(url2);
@@ -452,5 +441,4 @@ public class AMQConnectionURL implements ConnectionURL
System.out.println(connectionurl2);
}
-
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 661372845a..585991d905 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -53,6 +53,8 @@ public abstract class AMQDestination implements Destination, Referenceable
private String _url;
private AMQShortString _urlAsShortString;
+ private boolean _validated;
+
private byte[] _byteEncoding;
private static final int IS_DURABLE_MASK = 0x1;
private static final int IS_EXCLUSIVE_MASK = 0x2;
@@ -198,12 +200,16 @@ public abstract class AMQDestination implements Destination, Referenceable
{
return toURL();
- /*
- return "Destination: " + _destinationName + ", " +
- "Queue Name: " + _queueName + ", Exchange: " + _exchangeName +
- ", Exchange class: " + _exchangeClass + ", Exclusive: " + _isExclusive +
- ", AutoDelete: " + _isAutoDelete + ", Routing Key: " + getRoutingKey();
- */
+ }
+
+ public boolean isValidated()
+ {
+ return _validated;
+ }
+
+ public void setValidated(boolean validated)
+ {
+ _validated = validated;
}
public String toURL()
@@ -348,15 +354,7 @@ public abstract class AMQDestination implements Destination, Referenceable
{
return false;
}
- /* if (_isExclusive != that._isExclusive)
- {
- return false;
- }
- if (_isAutoDelete != that._isAutoDelete)
- {
- return false;
- }
- */
+
return true;
}
@@ -370,8 +368,7 @@ public abstract class AMQDestination implements Destination, Referenceable
{
result = 29 * result + _queueName.hashCode();
}
-// result = result * (_isExclusive ? 13 : 7);
-// result = result * (_isAutoDelete ? 13 : 7);
+
return result;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 82f9a036d2..8bb5b622f7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -202,6 +202,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+ private AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
private class Dispatcher extends Thread
{
@@ -327,8 +328,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
") is closed rejecting(requeue)...");
}
}
-
- rejectMessage(message, true);
+ // Don't reject if we're already closing
+ if (!_closed.get())
+ {
+ rejectMessage(message, true);
+ }
}
else
{
@@ -995,42 +999,42 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
throw new java.lang.UnsupportedOperationException();
}
- public MessageProducer createProducer(Destination destination, boolean mandatory,
- boolean immediate, boolean waitUntilSent)
+ public BasicMessageProducer createProducer(Destination destination, boolean mandatory,
+ boolean immediate, boolean waitUntilSent)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
}
- public MessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
+ public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate);
}
- public MessageProducer createProducer(Destination destination, boolean immediate)
+ public BasicMessageProducer createProducer(Destination destination, boolean immediate)
throws JMSException
{
return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
}
- public MessageProducer createProducer(Destination destination) throws JMSException
+ public BasicMessageProducer createProducer(Destination destination) throws JMSException
{
return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
}
- private org.apache.qpid.jms.MessageProducer createProducerImpl(Destination destination, boolean mandatory,
- boolean immediate)
+ private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory,
+ boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, false);
}
- private org.apache.qpid.jms.MessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent)
+ private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
+ final boolean immediate, final boolean waitUntilSent)
throws JMSException
{
- return (org.apache.qpid.jms.MessageProducer) new FailoverSupport()
+ return (BasicMessageProducer) new FailoverSupport()
{
public Object operation() throws JMSException
{
@@ -1248,8 +1252,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
JMSException ex = new JMSException("Error registering consumer: " + e);
- //todo remove
- e.printStackTrace();
+ if (_logger.isDebugEnabled())
+ {
+ e.printStackTrace();
+ }
ex.setLinkedException(e);
throw ex;
}
@@ -1926,6 +1932,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized void startDistpatcherIfNecessary()
{
+ if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ {
+// if (!connectionStopped)
+ {
+ if (isSuspended() && _firstDispatcher.getAndSet(false))
+ {
+ try
+ {
+ suspendChannel(false);
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Suspending channel threw an exception:" + e);
+ }
+ }
+ }
+ }
+
startDistpatcherIfNecessary(false);
}
@@ -1974,6 +1998,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
+ // The dispatcher will be null if we have just created this session
+ // so suspend the channel before we register our consumer so that we don't
+ // start prefetching until a receive/mListener is set.
+ if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ {
+ if (_dispatcher == null)
+ {
+ if (!isSuspended())
+ {
+ try
+ {
+ suspendChannel(true);
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Suspending channel threw an exception:" + e);
+ }
+ }
+ }
+ }
+
try
{
consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
@@ -2089,7 +2134,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Remove the consumer from the map
BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
if (consumer != null)
- {
+ {
// fixme this isn't right.. needs to check if _queue contains data for this consumer
if (consumer.isAutoClose())// && _queue.isEmpty())
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 73010ce517..1c3cdbcb65 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.client;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -34,6 +34,7 @@ import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
@@ -138,10 +139,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
- protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
- boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
- int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -160,7 +161,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_autoClose = autoClose;
_noConsume = noConsume;
- //Force queue browsers not to use acknowledge modes.
+ // Force queue browsers not to use acknowledge modes.
if (_noConsume)
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
@@ -175,12 +176,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public String getMessageSelector() throws JMSException
{
checkPreConditions();
+
return _messageSelector;
}
public MessageListener getMessageListener() throws JMSException
{
checkPreConditions();
+
return _messageListener.get();
}
@@ -198,14 +201,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
checkPreConditions();
- //if the current listener is non-null and the session is not stopped, then
- //it is an error to call this method.
+ // if the current listener is non-null and the session is not stopped, then
+ // it is an error to call this method.
- //i.e. it is only valid to call this method if
+ // i.e. it is only valid to call this method if
//
- // (a) the connection is stopped, in which case the dispatcher is not running
- // OR
- // (b) the listener is null AND we are not receiving synchronously at present
+ // (a) the connection is stopped, in which case the dispatcher is not running
+ // OR
+ // (b) the listener is null AND we are not receiving synchronously at present
//
if (!_session.getAMQConnection().started())
@@ -215,7 +218,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_logger.isDebugEnabled())
{
- _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
+ _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination "
+ + _destination);
}
}
else
@@ -224,6 +228,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
}
+
if (!_messageListener.compareAndSet(null, messageListener))
{
throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
@@ -233,7 +238,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (messageListener != null)
{
- //handle case where connection has already been started, and the dispatcher has alreaded started
+ // handle case where connection has already been started, and the dispatcher has alreaded started
// putting values on the _synchronousQueue
synchronized (_session)
@@ -263,10 +268,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
throw new javax.jms.IllegalStateException("Another thread is already receiving.");
}
+
if (isMessageListenerSet())
{
throw new javax.jms.IllegalStateException("A listener has already been set.");
}
+
_receivingThread = Thread.currentThread();
}
@@ -331,6 +338,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
return null;
}
+
Object o = null;
if (l > 0)
{
@@ -340,6 +348,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
o = _synchronousQueue.take();
}
+
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
@@ -352,6 +361,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
catch (InterruptedException e)
{
_logger.warn("Interrupted: " + e);
+
return null;
}
finally
@@ -365,6 +375,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
{
close(false);
+
return true;
}
else
@@ -387,6 +398,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
return null;
}
+
Object o = _synchronousQueue.poll();
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -414,8 +426,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a
* JMSException is created with the linked exception set appropriately
*/
- private AbstractJMSMessage returnMessageOrThrow(Object o)
- throws JMSException
+ private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException
{
// errors are passed via the queue too since there is no way of interrupting the poll() via the API.
if (o instanceof Throwable)
@@ -425,6 +436,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
e.setLinkedException((Exception) o);
}
+
throw e;
}
else
@@ -433,7 +445,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
-
public void close() throws JMSException
{
close(true);
@@ -441,7 +452,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close(boolean sendClose) throws JMSException
{
- //synchronized (_closed)
+ // synchronized (_closed)
if (_logger.isInfoEnabled())
{
@@ -456,7 +467,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ _logger.trace(_consumerTag + " close():"
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -464,14 +476,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6);
}
}
+
if (sendClose)
{
// TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- _consumerTag, // consumerTag
- false); // nowait
+ final AMQFrame cancelFrame =
+ BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
+ false); // nowait
try
{
@@ -485,25 +497,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (AMQException e)
{
- _logger.error("Error closing consumer: " + e, e);
- throw new JMSException("Error closing consumer: " + e);
+ // _logger.error("Error closing consumer: " + e, e);
+ JMSException jmse = new JMSException("Error closing consumer: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
else
{
-// //fixme this probably is not right
-// if (!isNoConsume())
- { //done in BasicCancelOK Handler but not sending one so just deregister.
+ // //fixme this probably is not right
+ // if (!isNoConsume())
+ { // done in BasicCancelOK Handler but not sending one so just deregister.
deregisterConsumer();
}
}
- if (_messageListener != null && _receiving.get())
+ if ((_messageListener != null) && _receiving.get())
{
if (_logger.isInfoEnabled())
{
_logger.info("Interrupting thread: " + _receivingThread);
}
+
_receivingThread.interrupt();
}
}
@@ -516,7 +531,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
void markClosed()
{
-// synchronized (_closed)
+ // synchronized (_closed)
{
_closed.set(true);
@@ -524,7 +539,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " markClosed():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " markClosed():"
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -533,6 +549,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
}
+
deregisterConsumer();
}
@@ -551,22 +568,22 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag);
}
+
try
{
- AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
- messageFrame.getDeliverBody().redelivered,
- messageFrame.getDeliverBody().exchange,
- messageFrame.getDeliverBody().routingKey,
- messageFrame.getContentHeader(),
- messageFrame.getBodies());
+ AbstractJMSMessage jmsMessage =
+ _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
+ messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
+ messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
if (debug)
{
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
}
-// synchronized (_closed)
+ // synchronized (_closed)
+
{
-// if (!_closed.get())
+ // if (!_closed.get())
{
jmsMessage.setConsumer(this);
@@ -575,12 +592,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
notifyMessage(jmsMessage, channelId);
}
-// else
-// {
-// _logger.error("MESSAGE REJECTING!");
-// _session.rejectMessage(jmsMessage, true);
-// //_logger.error("MESSAGE JUST DROPPED!");
-// }
+ // else
+ // {
+ // _logger.error("MESSAGE REJECTING!");
+ // _session.rejectMessage(jmsMessage, true);
+ // //_logger.error("MESSAGE JUST DROPPED!");
+ // }
}
}
catch (Exception e)
@@ -606,11 +623,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (isMessageListenerSet())
{
- //we do not need a lock around the test above, and the dispatch below as it is invalid
- //for an application to alter an installed listener while the session is started
-// synchronized (_closed)
+ // we do not need a lock around the test above, and the dispatch below as it is invalid
+ // for an application to alter an installed listener while the session is started
+ // synchronized (_closed)
{
-// if (!_closed.get())
+ // if (!_closed.get())
{
preApplicationProcessing(jmsMessage);
@@ -641,9 +658,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
switch (_acknowledgeMode)
{
+
case Session.PRE_ACKNOWLEDGE:
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
break;
+
case Session.CLIENT_ACKNOWLEDGE:
// we set the session so that when the user calls acknowledge() it can call the method on session
// to send out the appropriate frame
@@ -657,17 +676,21 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
+
case Session.CLIENT_ACKNOWLEDGE:
if (isNoConsume())
{
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
}
+
break;
+
case Session.DUPS_OK_ACKNOWLEDGE:
if (++_outstanding >= _prefetchHigh)
{
_dups_ok_acknowledge_send = true;
}
+
if (_outstanding <= _prefetchLow)
{
_dups_ok_acknowledge_send = false;
@@ -680,14 +703,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_session.acknowledgeMessage(msg.getDeliveryTag(), true);
}
}
+
break;
+
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
if (!_session.isInRecovery())
{
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
}
+
break;
+
case Session.SESSION_TRANSACTED:
if (isNoConsume())
{
@@ -697,6 +724,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_receivedDeliveryTags.add(msg.getDeliveryTag());
}
+
break;
}
}
@@ -721,14 +749,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
void notifyError(Throwable cause)
{
-// synchronized (_closed)
+ // synchronized (_closed)
{
_closed.set(true);
if (_logger.isTraceEnabled())
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " notifyError():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " notifyError():"
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously" + _closedStack.toString());
}
else
@@ -737,7 +766,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
}
- //QPID-293 can "request redelivery of this error through dispatcher"
+ // QPID-293 can "request redelivery of this error through dispatcher"
// we have no way of propagating the exception to a message listener - a JMS limitation - so we
// deal with the case where we have a synchronous receive() waiting for a message to arrive
@@ -749,10 +778,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_logger.debug("Passed exception to synchronous queue for propagation to receive()");
}
}
+
deregisterConsumer();
}
-
/**
* Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in
* the case of an error occurring.
@@ -782,7 +811,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
this.checkNotClosed();
- if (_session == null || _session.isClosed())
+ if ((_session == null) || _session.isClosed())
{
throw new javax.jms.IllegalStateException("Invalid Session");
}
@@ -817,7 +846,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return _autoClose;
}
-
public boolean isNoConsume()
{
return _noConsume;
@@ -827,10 +855,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_closeWhenNoMessages = b;
- if (_closeWhenNoMessages
- && _synchronousQueue.isEmpty()
- && _receiving.get()
- && _messageListener != null)
+ if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null))
{
_receivingThread.interrupt();
}
@@ -846,13 +871,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_logger.debug("Rejecting received messages in _receivedDTs (RQ)");
}
- //rollback received but not committed messages
+ // rollback received but not committed messages
while (!_receivedDeliveryTags.isEmpty())
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" +
- "for consumer with tag:" + _consumerTag);
+ _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
+ + "for consumer with tag:" + _consumerTag);
}
Long tag = _receivedDeliveryTags.poll();
@@ -876,14 +901,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- //rollback pending messages
+ // rollback pending messages
if (_synchronousQueue.size() > 0)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" +
- "for consumer with tag:" + _consumerTag);
+ _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)"
+ + "for consumer with tag:" + _consumerTag);
}
+
Iterator iterator = _synchronousQueue.iterator();
while (iterator.hasNext())
@@ -898,13 +924,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
}
+
iterator.remove();
}
else
{
- _logger.error("Queue contained a :" + o.getClass() +
- " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ _logger.error("Queue contained a :" + o.getClass()
+ + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
iterator.remove();
}
}
@@ -919,7 +946,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
-
public String debugIdentity()
{
return String.valueOf(_consumerTag);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index b01e087ce1..bd7cc94582 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import java.io.UnsupportedEncodingException;
+import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
@@ -118,6 +119,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private final boolean _mandatory;
private final boolean _waitUntilSent;
+
+ private boolean _disableMessageId;
+
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
@@ -172,15 +176,14 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
checkPreConditions();
checkNotClosed();
- // IGNORED
+ _disableMessageId = b;
}
public boolean getDisableMessageID() throws JMSException
{
checkNotClosed();
- // Always false for AMQP
- return false;
+ return _disableMessageId;
}
public void setDisableMessageTimestamp(boolean b) throws JMSException
@@ -450,6 +453,18 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
origMessage.setJMSDestination(destination);
AbstractJMSMessage message = convertToNativeMessage(origMessage);
+
+ if(_disableMessageId)
+ {
+ message.setJMSMessageID(null);
+ }
+ else
+ {
+ if (message.getJMSMessageID() == null)
+ {
+ message.setJMSMessageID(UUID.randomUUID().toString());
+ }
+ }
int type;
if (destination instanceof Topic)
@@ -667,4 +682,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
return _session;
}
+
+ public boolean isBound(AMQDestination destination) throws JMSException
+ {
+ return _session.isQueueBound(destination.getExchangeName(),null,destination.getRoutingKey());
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
index d2ab6bd2c2..d1237cff49 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
@@ -1,5 +1,25 @@
/*
*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*
+ *
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,10 +42,35 @@ import javax.jms.JMSException;
import org.apache.qpid.AMQException;
/**
+ * JMSException does not accept wrapped exceptions in its constructor. Presumably this is because it is a relatively old
+ * Java exception class, before this was added as a default to Throwable. This exception class accepts wrapped exceptions
+ * as well as error messages, through its constructor, but is a JMSException.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Accept wrapped exceptions as a JMSException.
+ * </table>
+ *
* @author Apache Software Foundation
*/
public class JMSAMQException extends JMSException
{
+ /**
+ * Creates a JMSException, wrapping another exception class.
+ *
+ * @param message The error message.
+ * @param cause The underlying exception that caused this one. May be null if none is to be set.
+ */
+ public JMSAMQException(String message, Exception cause)
+ {
+ super(message);
+
+ if (cause != null)
+ {
+ setLinkedException(cause);
+ }
+ }
+
public JMSAMQException(AMQException s)
{
super(s.getMessage(), String.valueOf(s.getErrorCode()));
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
index c9d29d8077..e0c4b61333 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
@@ -7,14 +7,15 @@ import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueSender;
+import javax.jms.InvalidDestinationException;
public class QueueSenderAdapter implements QueueSender {
- private MessageProducer _delegate;
+ private BasicMessageProducer _delegate;
private Queue _queue;
private boolean closed = false;
- public QueueSenderAdapter(MessageProducer msgProducer, Queue queue){
+ public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue){
_delegate = msgProducer;
_queue = queue;
}
@@ -122,12 +123,13 @@ public class QueueSenderAdapter implements QueueSender {
_delegate.setTimeToLive(timeToLive);
}
- private void checkPreConditions() throws IllegalStateException, IllegalStateException
+ private void checkPreConditions() throws JMSException
{
checkPreConditions(_queue);
}
- private void checkPreConditions(Queue queue) throws IllegalStateException, IllegalStateException {
+ private void checkPreConditions(Queue queue) throws JMSException
+ {
if (closed){
throw new javax.jms.IllegalStateException("Publisher is closed");
}
@@ -137,5 +139,28 @@ public class QueueSenderAdapter implements QueueSender {
if(session == null || session.isClosed()){
throw new javax.jms.IllegalStateException("Invalid Session");
}
- }
+
+ if(!(queue instanceof AMQDestination))
+ {
+ throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue");
+ }
+ AMQDestination destination = (AMQDestination) queue;
+ if(!destination.isValidated() && checkQueueBeforePublish())
+ {
+
+ if (_delegate.isBound(destination))
+ {
+ destination.setValidated(true);
+ }
+ else
+ {
+ throw new InvalidDestinationException("Queue: " + queue + " is not a valid destination (no bindings on server");
+ }
+ }
+ }
+
+ private boolean checkQueueBeforePublish()
+ {
+ return "true".equalsIgnoreCase(System.getProperty("org.apache.qpid.client.verifyQueueBindingBeforePublish", "true"));
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
index f67b984658..02a408465b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
@@ -175,5 +175,10 @@ public class TopicPublisherAdapter implements TopicPublisher
{
throw new InvalidDestinationException("Destination " + topic + " is not a topic");
}
+ if(!(topic instanceof AMQDestination))
+ {
+ throw new InvalidDestinationException("Destination " + topic + " is not a Qpid topic");
+ }
+
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index e2b101ab79..f62baf2c3a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -94,6 +94,8 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
}
}
+ //fixme why is this only done when the close is expected...
+ // should the above forced closes not also cause a close?
protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
index 8938130417..af254fbbaf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
@@ -27,6 +27,7 @@ import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -72,7 +73,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
}
AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
@@ -93,7 +94,9 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
}
catch (IOException e)
{
- throw new JMSException(e.toString());
+ JMSException jmse = new JMSException(e.toString());
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
@@ -112,6 +115,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
{
return null;
}
+
int pos = _data.position();
_data.rewind();
// one byte left is for the end of frame marker
@@ -119,12 +123,14 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
{
// this is really redundant since pos must be zero
_data.position(pos);
+
return null;
}
else
{
String data = _data.getString(Charset.forName("UTF8").newDecoder());
_data.position(pos);
+
return data;
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 66524edce3..f87b4027f6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.message;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Map;
+import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -32,12 +33,7 @@ import javax.jms.MessageNotWriteableException;
import org.apache.commons.collections.map.ReferenceMap;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQUndefinedDestination;
-import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.client.*;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
@@ -123,7 +119,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
if (getContentHeaderProperties().getMessageIdAsString() == null)
{
- getContentHeaderProperties().setMessageId("ID:" + _deliveryTag);
+ getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID());
}
return getContentHeaderProperties().getMessageIdAsString();
@@ -183,7 +179,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
catch (URLSyntaxException e)
{
- throw new JMSException("Illegal value in JMS_ReplyTo property: " + replyToEncoding);
+ throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
}
_destinationCache.put(replyToEncoding, dest);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
index 6352f7029f..348a0bd152 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
@@ -384,7 +384,9 @@ public final class JMSHeaderAdapter
}
catch (AMQPInvalidClassException aice)
{
- throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ mfe.setLinkedException(aice);
+ throw mfe;
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index df1400b167..caf8741280 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -33,6 +33,7 @@ import javax.jms.MessageFormatException;
import javax.jms.ObjectMessage;
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -61,14 +62,15 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
_data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
_data.setAutoExpand(true);
}
+
getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
}
/**
* Creates read only message for delivery to consumers
*/
- JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+ ByteBuffer data) throws AMQException
{
super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
}
@@ -79,6 +81,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
{
_data.release();
}
+
_data = null;
}
@@ -116,11 +119,13 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
}
catch (IOException e)
{
- throw new MessageFormatException("Message not serializable: " + e);
+ MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e);
+ mfe.setLinkedException(e);
+ throw mfe;
}
}
-
+
public Serializable getObject() throws JMSException
{
ObjectInputStream in = null;
@@ -133,17 +138,20 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
{
_data.rewind();
in = new ObjectInputStream(_data.asInputStream());
+
return (Serializable) in.readObject();
}
catch (IOException e)
{
- e.printStackTrace();
- throw new MessageFormatException("Could not deserialize message: " + e);
+ MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
+ mfe.setLinkedException(e);
+ throw mfe;
}
catch (ClassNotFoundException e)
{
- e.printStackTrace();
- throw new MessageFormatException("Could not deserialize message: " + e);
+ MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
+ mfe.setLinkedException(e);
+ throw mfe;
}
finally
{
@@ -162,8 +170,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
}
}
catch (IOException ignore)
- {
- }
+ { }
}
private static String toString(ByteBuffer data)
@@ -172,6 +179,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
{
return null;
}
+
int pos = data.position();
try
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index d0cc52271a..5bc1555df7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -34,6 +34,7 @@ import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.SSLConfiguration;
@@ -248,6 +249,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
sessionClosed(session);
}
+
+ //FIXME Need to correctly handle other exceptions. Things like ...
+// if (cause instanceof AMQChannelClosedException)
+ // which will cause the JMSSession to end due to a channel close and so that Session needs
+ // to be removed from the map so we can correctly still call close without an exception when trying to close
+ // the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception
}
// we reach this point if failover was attempted and failed therefore we need to let the calling app
// know since we cannot recover the situation
@@ -510,11 +517,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_protocolSession.closeSession(session);
}
- public void closeConnection() throws AMQException
- {
- closeConnection(-1);
- }
-
public void closeConnection(long timeout) throws AMQException
{
getStateManager().changeState(AMQState.CONNECTION_CLOSING);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties b/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
index 50e6f1efaa..89ee8337f8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
@@ -16,5 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#
+CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java
index f8ee22a5d9..04db8044de 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.java
@@ -20,10 +20,6 @@
*/
package org.apache.qpid.client.security;
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.security.Security;
@@ -34,6 +30,7 @@ import java.util.TreeMap;
import javax.security.sasl.SaslClientFactory;
+
import org.apache.log4j.Logger;
import org.apache.qpid.util.FileUtils;
@@ -50,14 +47,11 @@ import org.apache.qpid.util.FileUtils;
* mechanism=fully.qualified.class.name
* </pre>
*
- * <p/>Where mechanism is an IANA-registered mechanism name and the fully qualified class name refers to a
- * class that implements javax.security.sasl.SaslClientFactory and provides the specified mechanism.
+ * <p/>Where mechanism is an IANA-registered mechanism name and the fully qualified class name refers to a class that
+ * implements javax.security.sasl.SaslClientFactory and provides the specified mechanism.
*
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Parse SASL mechanism properties.
- * <tr><td> Create and register security provider for SASL mechanisms.
- * </table>
+ * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Parse SASL
+ * mechanism properties. <tr><td> Create and register security provider for SASL mechanisms. </table>
*/
public class DynamicSaslRegistrar
{
@@ -69,10 +63,7 @@ public class DynamicSaslRegistrar
/** The default name of the SASL properties file resource. */
public static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/client/security/DynamicSaslRegistrar.properties";
- /**
- * Reads the properties file, and creates a dynamic security provider to register the SASL implementations
- * with.
- */
+ /** Reads the properties file, and creates a dynamic security provider to register the SASL implementations with. */
public static void registerSaslProviders()
{
_logger.debug("public static void registerSaslProviders(): called");
@@ -80,8 +71,8 @@ public class DynamicSaslRegistrar
// Open the SASL properties file, using the default name is one is not specified.
String filename = System.getProperty(FILE_PROPERTY);
InputStream is =
- FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
- DynamicSaslRegistrar.class.getClassLoader());
+ FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
+ DynamicSaslRegistrar.class.getClassLoader());
try
{
@@ -94,7 +85,7 @@ public class DynamicSaslRegistrar
if (factories.size() > 0)
{
- Security.addProvider(new JCAProvider(factories));
+ Security.insertProviderAt(new JCAProvider(factories), 0);
_logger.debug("Dynamic SASL provider added as a security provider");
}
}
@@ -170,15 +161,15 @@ public class DynamicSaslRegistrar
* @return A map from SASL mechanism names to implementing client factory classes.
*
* @todo Why tree map here? Do really want mechanisms in alphabetical order? Seems more likely that the declared
- * order of the mechanisms is intended to be preserved, so that they are registered in the declared order
- * of preference. Consider LinkedHashMap instead.
+ * order of the mechanisms is intended to be preserved, so that they are registered in the declared order of
+ * preference. Consider LinkedHashMap instead.
*/
private static Map<String, Class<? extends SaslClientFactory>> parseProperties(Properties props)
{
Enumeration e = props.propertyNames();
TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
- new TreeMap<String, Class<? extends SaslClientFactory>>();
+ new TreeMap<String, Class<? extends SaslClientFactory>>();
while (e.hasMoreElements())
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties b/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties
index c2a7d7928c..1bff43142b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties
@@ -17,3 +17,4 @@
# under the License.
#
AMQPLAIN=org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory
+CRAM-MD5-HASHED=org.apache.qpid.client.security.crammd5hashed.CRAMMD5HashedSaslClientFactory
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java
index 2fa8dcddde..5bf120454e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/JCAProvider.java
@@ -52,7 +52,7 @@ public class JCAProvider extends Provider
super("AMQSASLProvider", 1.0, "A JCA provider that registers all "
+ "AMQ SASL providers that want to be registered");
register(providerMap);
- Security.addProvider(this);
+// Security.addProvider(this);
}
/**
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java
new file mode 100644
index 0000000000..46323e8c09
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.security;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+
+import com.sun.crypto.provider.HmacMD5;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+
+public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler
+{
+ private static final Logger _logger = Logger.getLogger(UsernameHashedPasswordCallbackHandler.class);
+
+ private AMQProtocolSession _protocolSession;
+
+ public void initialise(AMQProtocolSession protocolSession)
+ {
+ _protocolSession = protocolSession;
+ }
+
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
+ {
+ for (int i = 0; i < callbacks.length; i++)
+ {
+ Callback cb = callbacks[i];
+ if (cb instanceof NameCallback)
+ {
+ ((NameCallback) cb).setName(_protocolSession.getUsername());
+ }
+ else if (cb instanceof PasswordCallback)
+ {
+ try
+ {
+ ((PasswordCallback) cb).setPassword(getHash(_protocolSession.getPassword()));
+ }
+ catch (NoSuchAlgorithmException e)
+ {
+ UnsupportedCallbackException uce = new UnsupportedCallbackException(cb);
+ uce.initCause(e);
+ throw uce;
+ }
+ }
+ else
+ {
+ throw new UnsupportedCallbackException(cb);
+ }
+ }
+ }
+
+ private char[] getHash(String text) throws NoSuchAlgorithmException, UnsupportedEncodingException
+ {
+
+ byte[] data = text.getBytes("utf-8");
+
+ MessageDigest md = MessageDigest.getInstance("MD5");
+
+ for (byte b : data)
+ {
+ md.update(b);
+ }
+
+ byte[] digest = md.digest();
+
+ char[] hash = new char[digest.length ];
+
+ int index = 0;
+ for (byte b : digest)
+ {
+ hash[index++] = (char) b;
+ }
+
+ return hash;
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java
new file mode 100644
index 0000000000..22bb1ac156
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client.security.crammd5hashed;
+
+import org.apache.qpid.client.security.amqplain.AmqPlainSaslClient;
+
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.Sasl;
+import javax.security.auth.callback.CallbackHandler;
+import java.util.Map;
+import java.security.Security;
+
+public class CRAMMD5HashedSaslClientFactory implements SaslClientFactory
+{
+ /** The name of this mechanism */
+ public static final String MECHANISM = "CRAM-MD5-HASHED";
+
+
+ public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh) throws SaslException
+ {
+ for (int i = 0; i < mechanisms.length; i++)
+ {
+ if (mechanisms[i].equals(MECHANISM))
+ {
+ if (cbh == null)
+ {
+ throw new SaslException("CallbackHandler must not be null");
+ }
+
+ String[] mechs = {"CRAM-MD5"};
+ return Sasl.createSaslClient(mechs, authorizationId, protocol, serverName, props, cbh);
+ }
+ }
+ return null;
+ }
+
+ public String[] getMechanismNames(Map props)
+ {
+ if (props != null)
+ {
+ if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE))
+ {
+ // returned array must be non null according to interface documentation
+ return new String[0];
+ }
+ }
+
+ return new String[]{MECHANISM};
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
index 104c5bfc44..1ec3adc2eb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,7 +33,7 @@ public class AMQNoTransportForProtocolException extends AMQTransportConnectionEx
public AMQNoTransportForProtocolException(BrokerDetails details, String message)
{
- super(message);
+ super(null, message, null);
_details = details;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
index 4b17661bc3..fec7ff693c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,12 +21,12 @@
package org.apache.qpid.client.transport;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
public class AMQTransportConnectionException extends AMQException
{
- public AMQTransportConnectionException(String message)
+ public AMQTransportConnectionException(AMQConstant errorCode, String message, Throwable cause)
{
- super(message);
-
+ super(errorCode, message, cause);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 8368eee125..0bc83e9804 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,12 +26,14 @@ import java.util.Iterator;
import java.util.Map;
import org.apache.log4j.Logger;
+
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
import org.apache.qpid.client.AMQBrokerDetails;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
@@ -64,13 +66,11 @@ public class TransportConnection
int transport = getTransport(details.getTransport());
if (transport == -1)
-
{
throw new AMQNoTransportForProtocolException(details);
}
if (transport == _currentInstance)
-
{
if (transport == VM)
{
@@ -88,40 +88,42 @@ public class TransportConnection
_currentInstance = transport;
switch (transport)
-
{
- case TCP:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
- {
- public IoConnector newSocketConnector()
+
+ case TCP:
+ _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
{
- SocketConnector result;
- //FIXME - this needs to be sorted to use the new Mina MultiThread SA.
- if (Boolean.getBoolean("qpidnio"))
+ public IoConnector newSocketConnector()
{
- _logger.fatal("Using Qpid NIO - sysproperty 'qpidnio' is set.");
-// result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
+ SocketConnector result;
+ // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
+ if (Boolean.getBoolean("qpidnio"))
+ {
+ _logger.fatal("Using Qpid NIO - sysproperty 'qpidnio' is set.");
+ // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
+ }
+ // else
+
+ {
+ _logger.info("Using Mina NIO");
+ result = new SocketConnector(); // non-blocking connector
+ }
+
+ // Don't have the connector's worker thread wait around for other connections (we only use
+ // one SocketConnector per connection at the moment anyway). This allows short-running
+ // clients (like unit tests) to complete quickly.
+ result.setWorkerTimeout(0);
+
+ return result;
}
-// else
- {
- _logger.info("Using Mina NIO");
- result = new SocketConnector(); // non-blocking connector
- }
-
- // Don't have the connector's worker thread wait around for other connections (we only use
- // one SocketConnector per connection at the moment anyway). This allows short-running
- // clients (like unit tests) to complete quickly.
- result.setWorkerTimeout(0);
+ });
+ break;
- return result;
- }
- });
- break;
- case VM:
- {
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
- break;
- }
+ case VM:
+ {
+ _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+ break;
+ }
}
return _instance;
@@ -142,7 +144,8 @@ public class TransportConnection
return -1;
}
- private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException
+ private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
+ throws AMQVMBrokerCreationException
{
int port = details.getPort();
@@ -154,14 +157,14 @@ public class TransportConnection
}
else
{
- throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled.");
+ throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
+ + " does not exist. Auto create disabled.", null);
}
}
return new VmPipeTransportConnection(port);
}
-
public static void createVMBroker(int port) throws AMQVMBrokerCreationException
{
if (_acceptor == null)
@@ -192,7 +195,7 @@ public class TransportConnection
{
_logger.error(e);
- //Try and unbind provider
+ // Try and unbind provider
try
{
VmPipeAddress pipe = new VmPipeAddress(port);
@@ -203,7 +206,7 @@ public class TransportConnection
}
catch (Exception ignore)
{
- //ignore
+ // ignore
}
if (provider == null)
@@ -227,7 +230,7 @@ public class TransportConnection
because = e.getCause().toString();
}
- throw new AMQVMBrokerCreationException(port, because + " Stopped binding of InVM Qpid.AMQP");
+ throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
}
}
}
@@ -246,14 +249,14 @@ public class TransportConnection
// can't use introspection to get Provider as it is a server class.
// need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access.
- //get right constructor and pass in instancec ID - "port"
+ // get right constructor and pass in instancec ID - "port"
IoHandlerAdapter provider;
try
{
- Class[] cnstr = {Integer.class};
- Object[] params = {port};
+ Class[] cnstr = { Integer.class };
+ Object[] params = { port };
provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
- //Give the broker a second to create
+ // Give the broker a second to create
_logger.info("Created VMBroker Instance:" + port);
}
catch (Exception e)
@@ -270,8 +273,10 @@ public class TransportConnection
because = e.getCause().toString();
}
-
- throw new AMQVMBrokerCreationException(port, because + " Stopped InVM Qpid.AMQP creation");
+ AMQVMBrokerCreationException amqbce =
+ new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
+ amqbce.initCause(e);
+ throw amqbce;
}
return provider;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
index 607ddcc26a..4b2982fe9c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,19 +21,25 @@
package org.apache.qpid.client.vmbroker;
import org.apache.qpid.client.transport.AMQTransportConnectionException;
+import org.apache.qpid.protocol.AMQConstant;
public class AMQVMBrokerCreationException extends AMQTransportConnectionException
{
private int _port;
+ /**
+ * @param port
+ *
+ * @deprecated
+ */
public AMQVMBrokerCreationException(int port)
{
- this(port, "Unable to create vm broker");
+ this(null, port, "Unable to create vm broker", null);
}
- public AMQVMBrokerCreationException(int port, String message)
+ public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause)
{
- super(message);
+ super(errorCode, message, cause);
_port = port;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
index 9adf04e182..6ad3fb4bae 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
@@ -101,7 +101,7 @@ public class FailoverPolicy
}
catch (Exception cnfe)
{
- throw new IllegalArgumentException("Unknown failover method:" + failoverMethod);
+ throw new IllegalArgumentException("Unknown failover method:" + failoverMethod, cnfe);
}
}
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index a406f9f86e..794fd5c8c1 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -65,6 +65,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
+
protected void setUp() throws Exception
{
super.setUp();
@@ -122,30 +123,39 @@ public class MessageListenerMultiConsumerTest extends TestCase
TransportConnection.killAllVMBrokers();
}
+// public void testRecieveC1thenC2() throws Exception
+// {
+//
+// for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+// {
+//
+// assertTrue(_consumer1.receive() != null);
+// }
+//
+// for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+// {
+// assertTrue(_consumer2.receive() != null);
+// }
+// }
- public void testRecieveC1thenC2() throws Exception
+ public void testRecieveInterleaved() throws Exception
{
-
- for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ int msg = 0;
+ int MAX_LOOPS = MSG_COUNT * 2;
+ for (int loops = 0; msg < MSG_COUNT || loops < MAX_LOOPS; loops++)
{
- assertTrue(_consumer1.receive() != null);
- }
-
- for (int msg = 0; msg < MSG_COUNT / 2; msg++)
- {
- assertTrue(_consumer2.receive() != null);
+ if (_consumer1.receive(100) != null)
+ {
+ msg++;
+ }
+ if (_consumer2.receive(100) != null)
+ {
+ msg++;
+ }
}
- }
-
- public void testRecieveInterleaved() throws Exception
- {
- for (int msg = 0; msg < MSG_COUNT / 2; msg++)
- {
- assertTrue(_consumer1.receive() != null);
- assertTrue(_consumer2.receive() != null);
- }
+ assertEquals("Not all messages received.", MSG_COUNT, msg);
}
@@ -161,7 +171,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
if (receivedCount1 == MSG_COUNT / 2)
{
- _allMessagesSent.countDown();
+ _allMessagesSent.countDown();
}
}
@@ -196,6 +206,18 @@ public class MessageListenerMultiConsumerTest extends TestCase
assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
}
+ public void testRecieveC2Only_OnlyRunWith_REGISTER_CONSUMERS_FLOWED() throws Exception
+ {
+ if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ {
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg,
+ _consumer2.receive(1000) != null);
+ }
+ }
+ }
+
public static junit.framework.Test suite()
{
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
index 5fb77af4db..7b5957ac8c 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
@@ -144,6 +144,36 @@ public class MessageListenerTest extends TestCase implements MessageListener
}
+ public void testRecieveTheUseMessageListener() throws Exception
+ {
+
+ _logger.error("Test disabled as initial receive is not called first");
+ // Perform initial receive to start connection
+// assertTrue(_consumer.receive(2000) != null);
+// receivedCount++;
+
+ // Sleep to ensure remaining 4 msgs end up on _synchronousQueue
+// Thread.sleep(1000);
+
+ // Set the message listener and wait for the messages to come in.
+ _consumer.setMessageListener(this);
+
+ _logger.info("Waiting 3 seconds for messages");
+
+ try
+ {
+ _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+ //Should have recieved all async messages
+ assertEquals(MSG_COUNT, receivedCount);
+
+ }
+
+
public void onMessage(Message message)
{
_logger.info("Received Message(" + receivedCount + "):" + message);
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
index 10bf1a8d6d..42594fff8e 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
@@ -83,7 +83,7 @@ public class ResetMessageListenerTest extends TestCase
Hashtable<String, String> env = new Hashtable<String, String>();
env.put("connectionfactory.connection", "amqp://guest:guest@MLT_ID/test?brokerlist='vm://:1'");
- env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+ env.put("queue.queue", "direct://amq.direct//ResetMessageListenerTest");
_context = factory.getInitialContext(env);
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
new file mode 100644
index 0000000000..1b5da2631d
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
@@ -0,0 +1,109 @@
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import junit.framework.TestCase;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.QueueSession;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+import javax.jms.TextMessage;
+import javax.jms.InvalidDestinationException;
+
+public class InvalidDestinationTest extends TestCase
+{
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private MessageConsumer _consumer;
+
+ private static final String VM_BROKER = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ createVMBroker();
+ _connection = new AMQConnection(VM_BROKER, "guest", "guest", "ReceiveTestClient", "test");
+ }
+
+ public void createVMBroker()
+ {
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ fail("Unable to create broker: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ TransportConnection.killVMBroker(1);
+ super.tearDown();
+ }
+
+
+
+ public void testInvalidDestination() throws Exception
+ {
+ Queue invalidDestination = new AMQQueue("amq.direct","unknownQ");
+ AMQQueue validDestination = new AMQQueue("amq.direct","knownQ");
+ QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // This is the only easy way to create and bind a queue from the API :-(
+ MessageConsumer consumer = queueSession.createConsumer(validDestination);
+
+ QueueSender sender = queueSession.createSender(invalidDestination);
+ TextMessage msg = queueSession.createTextMessage("Hello");
+ try
+ {
+ sender.send(msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.close();
+
+ sender = queueSession.createSender(null);
+ invalidDestination = new AMQQueue("amq.direct","unknownQ");
+
+ try
+ {
+ sender.send(invalidDestination,msg);
+ fail("Expected InvalidDestinationException");
+ }
+ catch (InvalidDestinationException ex)
+ {
+ // pass
+ }
+ sender.send(validDestination,msg);
+ sender.close();
+ validDestination = new AMQQueue("amq.direct","knownQ");
+ sender = queueSession.createSender(validDestination);
+ sender.send(msg);
+
+
+
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+
+ return new junit.framework.TestSuite(InvalidDestinationTest.class);
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index 7762cb3fe9..62234ad21f 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -330,7 +330,7 @@ public class MessageRequeueTest extends TestCase
public void testRequeue() throws JMSException, AMQException, URLSyntaxException
{
int run = 0;
- while (run < 10)
+// while (run < 10)
{
run++;
@@ -350,17 +350,10 @@ public class MessageRequeueTest extends TestCase
_logger.debug("Create Consumer");
MessageConsumer consumer = session.createConsumer(q);
- try
- {
- Thread.sleep(2000);
- }
- catch (InterruptedException e)
- {
- //
- }
+ conn.start();
_logger.debug("Receiving msg");
- Message msg = consumer.receive(1000);
+ Message msg = consumer.receive(2000);
assertNotNull("Message should not be null", msg);
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 0828ab398c..190b3861f0 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -100,7 +100,9 @@ public class DurableSubscriptionTest extends TestCase
AMQTopic topic = new AMQTopic(con,"MyTopic");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
- MessageProducer producer = session1.createProducer(topic);
+
+ Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ MessageProducer producer = sessionProd.createProducer(topic);
Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
@@ -112,12 +114,12 @@ public class DurableSubscriptionTest extends TestCase
Message msg;
msg = consumer1.receive();
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(100);
assertEquals(null, msg);
msg = consumer2.receive();
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(100);
assertEquals(null, msg);
consumer2.close();
@@ -127,14 +129,14 @@ public class DurableSubscriptionTest extends TestCase
producer.send(session1.createTextMessage("B"));
- msg = consumer1.receive();
+ msg = consumer1.receive(100);
assertEquals("B", ((TextMessage) msg).getText());
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(100);
assertEquals(null, msg);
- msg = consumer3.receive();
+ msg = consumer3.receive(100);
assertEquals("B", ((TextMessage) msg).getText());
- msg = consumer3.receive(1000);
+ msg = consumer3.receive(100);
assertEquals(null, msg);
con.close();
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 2abc139ced..685fe20048 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -53,12 +53,15 @@ public class CommitRollbackTest extends TestCase
Queue _jmsQueue;
private static final Logger _logger = Logger.getLogger(CommitRollbackTest.class);
+ private static final String BROKER = "vm://:1";
protected void setUp() throws Exception
{
super.setUp();
- TransportConnection.createVMBroker(1);
-
+ if (BROKER.startsWith("vm"))
+ {
+ TransportConnection.createVMBroker(1);
+ }
testMethod++;
queue += testMethod;
@@ -68,7 +71,7 @@ public class CommitRollbackTest extends TestCase
private void newConnection() throws AMQException, URLSyntaxException, JMSException
{
- conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'");
+ conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='" + BROKER + "'");
_session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@@ -87,7 +90,10 @@ public class CommitRollbackTest extends TestCase
super.tearDown();
conn.close();
- TransportConnection.killVMBroker(1);
+ if (BROKER.startsWith("vm"))
+ {
+ TransportConnection.killVMBroker(1);
+ }
}
/**
@@ -261,7 +267,7 @@ public class CommitRollbackTest extends TestCase
assertTrue("session is not transacted", _pubSession.getTransacted());
_logger.info("sending test message");
- String MESSAGE_TEXT = "testGetThenDisconnect";
+ String MESSAGE_TEXT = "testGetThenRollback";
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
_pubSession.commit();
@@ -394,16 +400,60 @@ public class CommitRollbackTest extends TestCase
_logger.info("receiving result");
result = _consumer.receive(1000);
assertNotNull("test message was consumed and rolled back, but is gone", result);
- assertEquals("1", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
-
- result = _consumer.receive(1000);
- assertNotNull("test message was consumed and rolled back, but is gone", result);
- assertEquals("2", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
-
+ if (result.getJMSRedelivered())
+ {
+ assertEquals("1", ((TextMessage) result).getText());
+
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("2", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+ }
+ else
+ {
+ assertEquals("2", ((TextMessage) result).getText());
+ assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
+
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("1", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+
+ }
result = _consumer.receive(1000);
assertNull("test message should be null:" + result, result);
+
+ }
+
+
+ public void testPutThenRollbackThenGet() throws Exception
+ {
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending test message");
+ String MESSAGE_TEXT = "testPutThenRollbackThenGet";
+
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+ _pubSession.commit();
+
+ assertNotNull(_consumer.receive(100));
+
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _logger.info("rolling back");
+ _pubSession.rollback();
+
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
+ assertNull("test message was put and rolled back, but is still present", result);
+
+ _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
+
+ _pubSession.commit();
+
+ assertNotNull(_consumer.receive(100));
+
}
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/Config.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/Config.java
index 8109d20a33..b777cf93b6 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/testutil/Config.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/Config.java
@@ -172,7 +172,7 @@ public class Config
}
catch(NumberFormatException e)
{
- throw new RuntimeException("Bad port number: " + value);
+ throw new RuntimeException("Bad port number: " + value, e);
}
}
else if("-name".equalsIgnoreCase(key))
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
index f2afa472ab..195ed79dab 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -3,6 +3,7 @@ package org.apache.qpid.testutil;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.log4j.Logger;
@@ -70,7 +71,7 @@ public class QpidClientConnection implements ExceptionListener
}
catch (URLSyntaxException e)
{
- throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e);
}
}
}