summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-08-30 12:19:31 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-08-30 12:19:31 +0000
commit94b0978c09b14308c25b7c5b02792c192b0c5521 (patch)
treea8a9786f94d9a21cd8afc4bb6554c285de9280d2 /qpid/java/client/src
parent0ba1cb2a8be4a9b638a8cfe014cfffc36180bc5f (diff)
downloadqpid-python-94b0978c09b14308c25b7c5b02792c192b0c5521.tar.gz
Remerge of M2. All tests pass locally
Testing done in Intelij and mvn command line via windows/cygwin. Python tests removed from auto build pending Jython-siztion. Tested running broker in intelij and python run-tests from cygwin. All tests pass. (CombinedTest still exhibts a race condition. but that has always been so.) Additional Race condition identified (around MsgReject/AutoDeleteQueues) during testing patch to follow. systests are inconsistent Some use TestableMemoryMessageStore some use MemoryMessgaeStore. Lets not roll back this change if issues are discovered. Lets work together to go forward and address any issues. I have spent a lot of time ensuring the tests work for me so I hope that they work for you. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@571129 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java21
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java117
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java25
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java25
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java72
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java10
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java25
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java4
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java17
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java118
13 files changed, 270 insertions, 186 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index d59412fdba..1ac43f4388 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -46,11 +46,9 @@ import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
@@ -68,7 +66,6 @@ import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
-
import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
@@ -1106,6 +1103,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else
{
+ //Should never get here as all AMQEs are required to have an ErrorCode!
je = new JMSException("Exception thrown against " + toString() + ": " + cause);
}
@@ -1148,7 +1146,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else
{
- _logger.info("Not a hard-error connection not closing.");
+ _logger.info("Not a hard-error connection not closing: " + cause.getMessage());
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
index b3fbd1f510..eff6360d91 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
@@ -20,6 +20,14 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
@@ -35,6 +43,8 @@ import org.apache.qpid.url.URLSyntaxException;
public class AMQConnectionURL implements ConnectionURL
{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionURL.class);
+
private String _url;
private String _failoverMethod;
private HashMap<String, String> _failoverOptions;
@@ -182,7 +192,7 @@ public class AMQConnectionURL implements ConnectionURL
if (colonIndex == -1)
{
throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
- "Null password in user information not allowed.", _url);
+ "Null password in user information not allowed.", _url);
}
else
{
@@ -387,7 +397,14 @@ public class AMQConnectionURL implements ConnectionURL
if (_password != null)
{
sb.append(':');
- sb.append(_password);
+ if (_logger.isDebugEnabled())
+ {
+ sb.append(_password);
+ }
+ else
+ {
+ sb.append("********");
+ }
}
sb.append('@');
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 879578bd6c..af469ee291 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -73,7 +73,6 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,7 +100,6 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
-
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -293,6 +291,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
private final boolean _strictAMQPFATAL;
+ private final Object _messageDeliveryLock = new Object();
/**
* Creates a new session on a connection.
@@ -505,49 +504,53 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+ Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
- // We must close down all producers and consumers in an orderly fashion. This is the only method
- // that can be called from a different thread of control from the one controlling the session.
- synchronized (_connection.getFailoverMutex())
+ synchronized (_messageDeliveryLock)
{
- // Ensure we only try and close an open session.
- if (!_closed.getAndSet(true))
- {
- // we pass null since this is not an error case
- closeProducersAndConsumers(null);
- try
+ // We must close down all producers and consumers in an orderly fashion. This is the only method
+ // that can be called from a different thread of control from the one controlling the session.
+ synchronized (_connection.getFailoverMutex())
+ {
+ // Ensure we only try and close an open session.
+ if (!_closed.getAndSet(true))
{
+ // we pass null since this is not an error case
+ closeProducersAndConsumers(null);
- getProtocolHandler().closeSession(this);
+ try
+ {
- final AMQFrame frame =
- ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client closing channel")); // replyText
+ getProtocolHandler().closeSession(this);
- getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+ final AMQFrame frame =
+ ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client closing channel")); // replyText
- // When control resumes at this point, a reply will have been received that
- // indicates the broker has closed the channel successfully.
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error closing session: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
- // This is ignored because the channel is already marked as closed so the fail-over process will
- // not re-open it.
- catch (FailoverException e)
- {
- _logger.debug(
- "Got FailoverException during channel close, ignored as channel already marked as closed.");
- }
- finally
- {
- _connection.deregisterSession(_channelId);
+ getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+
+ // When control resumes at this point, a reply will have been received that
+ // indicates the broker has closed the channel successfully.
+ }
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error closing session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ // This is ignored because the channel is already marked as closed so the fail-over process will
+ // not re-open it.
+ catch (FailoverException e)
+ {
+ _logger.debug(
+ "Got FailoverException during channel close, ignored as channel already marked as closed.");
+ }
+ finally
+ {
+ _connection.deregisterSession(_channelId);
+ }
}
}
}
@@ -560,23 +563,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void closed(Throwable e) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized (_messageDeliveryLock)
{
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- _closed.set(true);
- AMQException amqe;
- if (e instanceof AMQException)
- {
- amqe = (AMQException) e;
- }
- else
+ synchronized (_connection.getFailoverMutex())
{
- amqe = new AMQException(null, "Closing session forcibly", e);
- }
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ _closed.set(true);
+ AMQException amqe;
+ if (e instanceof AMQException)
+ {
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException(null, "Closing session forcibly", e);
+ }
- _connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+ _connection.deregisterSession(_channelId);
+ closeProducersAndConsumers(amqe);
+ }
}
}
@@ -1279,7 +1285,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- public void declareAndBind(AMQDestination amqd)
+ public void declareAndBind(AMQDestination amqd)
throws
AMQException
{
@@ -2664,7 +2670,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_lock.wait();
}
- dispatchMessage(message);
+ synchronized (_messageDeliveryLock)
+ {
+ dispatchMessage(message);
+ }
while (connectionStopped())
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
index 8f9a84a3a6..862a9be8d4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.client.handler;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
index 81228b4cdc..65060d44d2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.client.handler;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index a00078b010..2c435aba6c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -25,7 +25,6 @@ import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
-
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -55,7 +54,6 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.SSLContextFactory;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -273,10 +271,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter
if (_failoverState != FailoverState.IN_PROGRESS)
{
- _logger.info("sessionClose() not allowed to failover");
- _connection.exceptionReceived(
- new AMQDisconnectedException("Server closed connection and reconnection " +
- "not permitted.", null));
+ _logger.debug("sessionClose() not allowed to failover");
+ _connection.exceptionReceived(new AMQDisconnectedException(
+ "Server closed connection and reconnection " + "not permitted.", null));
}
else
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index bef3180041..9f430d76a7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -51,7 +51,6 @@ import org.apache.qpid.framing.ExchangeBoundOkBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -254,8 +253,10 @@ public class AMQStateManager implements AMQMethodListener
if (_currentState != s)
{
- _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
- throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s, null);
+ _logger.warn("State not achieved within permitted time. Current state " + _currentState
+ + ", desired state: " + s);
+ throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState
+ + ", desired state: " + s, null);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 459579d920..4cda53a6a1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -26,11 +26,9 @@ import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
-
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,40 +88,40 @@ public class TransportConnection
switch (transport)
{
- case TCP:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ case TCP:
+ _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
{
- public IoConnector newSocketConnector()
+ SocketConnector result;
+ // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
+ if (Boolean.getBoolean("qpidnio"))
{
- SocketConnector result;
- // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
- if (Boolean.getBoolean("qpidnio"))
- {
- _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set.");
- // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
- }
- // else
-
- {
- _logger.info("Using Mina NIO");
- result = new SocketConnector(); // non-blocking connector
- }
-
- // Don't have the connector's worker thread wait around for other connections (we only use
- // one SocketConnector per connection at the moment anyway). This allows short-running
- // clients (like unit tests) to complete quickly.
- result.setWorkerTimeout(0);
-
- return result;
+ _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set.");
+ // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
}
- });
- break;
+ // else
- case VM:
- {
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
- break;
- }
+ {
+ _logger.info("Using Mina NIO");
+ result = new SocketConnector(); // non-blocking connector
+ }
+
+ // Don't have the connector's worker thread wait around for other connections (we only use
+ // one SocketConnector per connection at the moment anyway). This allows short-running
+ // clients (like unit tests) to complete quickly.
+ result.setWorkerTimeout(0);
+
+ return result;
+ }
+ });
+ break;
+
+ case VM:
+ {
+ _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+ break;
+ }
}
return _instance;
@@ -145,7 +143,7 @@ public class TransportConnection
}
private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
- throws AMQVMBrokerCreationException
+ throws AMQVMBrokerCreationException
{
int port = details.getPort();
@@ -158,7 +156,7 @@ public class TransportConnection
else
{
throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
- + " does not exist. Auto create disabled.", null);
+ + " does not exist. Auto create disabled.", null);
}
}
@@ -253,8 +251,8 @@ public class TransportConnection
IoHandlerAdapter provider;
try
{
- Class[] cnstr = { Integer.class };
- Object[] params = { port };
+ Class[] cnstr = {Integer.class};
+ Object[] params = {port};
provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
// Give the broker a second to create
_logger.info("Created VMBroker Instance:" + port);
@@ -273,7 +271,7 @@ public class TransportConnection
}
AMQVMBrokerCreationException amqbce =
- new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
+ new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
amqbce.initCause(e);
throw amqbce;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
index 1818132be0..dc0d9b8c78 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
@@ -37,6 +37,16 @@ public class AMQVMBrokerCreationException extends AMQTransportConnectionExceptio
{
private int _port;
+ /**
+ * @param port
+ *
+ * @deprecated
+ */
+ public AMQVMBrokerCreationException(int port)
+ {
+ this(null, port, "Unable to create vm broker", null);
+ }
+
public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause)
{
super(errorCode, message, cause);
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
index d19a6095d5..9600d1e9d3 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.test.unit.client.channelclose;
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index 588c82221e..56394fee27 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -47,12 +47,12 @@ public class ConnectionTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
-// TransportConnection.createVMBroker(1);
+ TransportConnection.createVMBroker(1);
}
protected void tearDown() throws Exception
{
-// TransportConnection.killVMBroker(1);
+ TransportConnection.killVMBroker(1);
}
public void testSimpleConnection()
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
index 9c354ee260..9cde24dd92 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
@@ -21,9 +21,7 @@
package org.apache.qpid.test.unit.client.forwardall;
import junit.framework.TestCase;
-
import org.apache.qpid.testutil.VMBrokerSetup;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +32,7 @@ import org.slf4j.LoggerFactory;
public class CombinedTest extends TestCase
{
private static final Logger _logger = LoggerFactory.getLogger(CombinedTest.class);
+ private int run = 0;
protected void setUp() throws Exception
{
@@ -48,14 +47,18 @@ public class CombinedTest extends TestCase
public void testForwardAll() throws Exception
{
- int services = 2;
- ServiceCreator.start("vm://:1", services);
+ while (run < 10)
+ {
+ int services = 2;
+ ServiceCreator.start("vm://:1", services);
+
+ _logger.info("Starting " + ++run + " client...");
- _logger.info("Starting client...");
+ new Client("vm://:1", services).shutdownWhenComplete();
- new Client("vm://:1", services).shutdownWhenComplete();
- _logger.info("Completed successfully!");
+ _logger.info("Completed " + run + " successfully!");
+ }
}
public static junit.framework.Test suite()
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index df2a38d0fc..1a45773907 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -21,12 +21,10 @@
package org.apache.qpid.test.unit.transacted;
import junit.framework.TestCase;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +55,9 @@ public class CommitRollbackTest extends TestCase
private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class);
private static final String BROKER = "vm://:1";
+ private boolean _gotone = false;
+ private boolean _gottwo = false;
+ private boolean _gottwoRedelivered = false;
protected void setUp() throws Exception
{
@@ -340,57 +341,98 @@ public class CommitRollbackTest extends TestCase
*
* @throws Exception On error
*/
- /*public void testSend2ThenRollback() throws Exception
+ public void testSend2ThenRollback() throws Exception
{
- assertTrue("session is not transacted", _session.getTransacted());
- assertTrue("session is not transacted", _pubSession.getTransacted());
+ int run = 0;
+ while (run < 10)
+ {
+ run++;
+ _logger.info("Run:" + run);
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
- _logger.info("sending two test messages");
- _publisher.send(_pubSession.createTextMessage("1"));
- _publisher.send(_pubSession.createTextMessage("2"));
- _pubSession.commit();
+ _logger.info("sending two test messages");
+ _publisher.send(_pubSession.createTextMessage("1"));
+ _publisher.send(_pubSession.createTextMessage("2"));
+ _pubSession.commit();
- _logger.info("getting test message");
- assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText());
+ _logger.info("getting test message");
+ assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText());
- _logger.info("rolling back");
- _session.rollback();
+ _logger.info("rolling back");
+ _session.rollback();
- _logger.info("receiving result");
- Message result = _consumer.receive(1000);
+ _logger.info("receiving result");
+ Message result = _consumer.receive(1000);
- assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ // Message Order is:
- if (((TextMessage) result).getText().equals("2"))
- {
- assertTrue("Messasge is marked as redelivered", !result.getJMSRedelivered());
+ // Send 1 , 2
+ // Retrieve 1 and then rollback
+ // Receieve 1 (redelivered) , 2 (may or may not be redelivered??)
- result = _consumer.receive(1000);
- assertEquals("1", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+ verifyMessages(result);
+
+ // Occassionally get message 2 first!
+// assertEquals("Should get message one first", "1", ((TextMessage) result).getText());
+// assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
+//
+// result = _consumer.receive(1000);
+// assertEquals("Second message should be message 2", "2", ((TextMessage) result).getText());
+// assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
+//
+// result = _consumer.receive(1000);
+// assertNull("There should be no more messages", result);
+
+ _session.commit();
}
- else
+ }
+
+ private void verifyMessages(Message result) throws JMSException
+ {
+
+ if (result == null)
{
- assertEquals("1", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
- result = _consumer.receive(1000);
- assertNotNull("test message was consumed and rolled back, but is gone", result);
- assertEquals("2", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
+ assertTrue("Didn't receive redelivered message one", _gotone);
+ assertTrue("Didn't receive message two at all", _gottwo | _gottwoRedelivered);
+ _gotone = false;
+ _gottwo = false;
+ _gottwoRedelivered = false;
+ return;
}
- result = _consumer.receive(1000);
+ if (((TextMessage) result).getText().equals("1"))
+ {
+ _logger.info("Got 1 redelivered");
+ assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
+ assertFalse("Already received message one", _gotone);
+ _gotone = true;
- if (result != null)
+ }
+ else
{
assertEquals("2", ((TextMessage) result).getText());
- assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered());
- result = _consumer.receive(1000);
+
+ if (result.getJMSRedelivered())
+ {
+ _logger.info("Got 2 redelivered, message was prefetched");
+ assertFalse("Already received message redelivered two", _gottwoRedelivered);
+
+ _gottwoRedelivered = true;
+ }
+ else
+ {
+ _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured");
+ assertFalse("Already received message two", _gottwo);
+
+ _gottwo = true;
+ }
}
- assertNull("test message should be null", result);
- }*/
+ verifyMessages(_consumer.receive(1000));
+ }
public void testSend2ThenCloseAfter1andTryAgain() throws Exception
{
@@ -417,12 +459,12 @@ public class CommitRollbackTest extends TestCase
_logger.info("receiving result");
- // NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected.
- // Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet.
+// NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected.
+// Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet.
result = _consumer.receive(1000);
assertNotNull("test message was consumed and rolled back, but is gone", result);
- // The first message back will be either 1 or 2 being redelivered
+// The first message back will be either 1 or 2 being redelivered
if (result.getJMSRedelivered())
{
assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());