From 3debc8267203e6bb2cc6a800d0ffff7e87f1ef65 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 23 Oct 2009 20:55:13 +0000 Subject: Merged from trunk up to 823464 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829235 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/handler/BasicRejectMethodHandler.java | 2 +- .../qpid/server/handler/ChannelOpenHandler.java | 7 + .../apache/qpid/server/queue/SimpleAMQQueue.java | 10 +- .../security/access/PrincipalPermissions.java | 6 + .../java/org/apache/qpid/client/AMQConnection.java | 2 +- .../apache/qpid/client/AMQConnectionDelegate.java | 9 + .../qpid/client/AMQConnectionDelegate_0_10.java | 12 + .../qpid/client/AMQConnectionDelegate_8_0.java | 12 + .../java/org/apache/qpid/client/AMQSession.java | 38 ++- .../org/apache/qpid/client/XAConnectionImpl.java | 2 +- .../client/configuration/ClientProperties.java | 2 +- .../qpid/client/failover/FailoverHandler.java | 2 + .../MessageDisappearWithIOExceptionTest.java | 330 +++++++++++++++++++++ .../qpid/server/queue/ProducerFlowControlTest.java | 89 +++++- .../qpid/server/security/acl/SimpleACLTest.java | 104 +++++-- .../unit/ack/AcknowledgeAfterFailoverTest.java | 2 +- .../test/unit/ack/AcknowledgeOnMessageTest.java | 3 + .../apache/qpid/test/utils/FailoverBaseCase.java | 4 +- qpid/java/test-profiles/010Excludes | 2 + qpid/java/test-profiles/Excludes | 1 + qpid/java/test-profiles/test-provider.properties | 2 +- 21 files changed, 582 insertions(+), 59 deletions(-) create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index 6132c890c3..62dd76f832 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -71,7 +71,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener _capacity) { + _overfull.set(true); //Overfull log message _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity)); @@ -1347,10 +1349,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if(_capacity != 0L) { - if(_atomicQueueSize.get() <= _flowResumeCapacity) + if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity) { - //Underfull log message - _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + if(_overfull.compareAndSet(true,false)) + {//Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + } for(AMQChannel c : _blockedChannels.keySet()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java index 33213055ca..c3aa68cc3f 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java @@ -494,6 +494,12 @@ public class PrincipalPermissions { AMQQueue queue = ((AMQQueue) parameters[0]); Map queuePermissions = (Map) _permissions.get(permission); + + if (queuePermissions == null) + { + //if the outer map is null, the user has no CONSUME rights at all + return AuthzResult.DENIED; + } List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 2f5d07e396..b57c834598 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -313,7 +313,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect protected AMQConnectionDelegate _delegate; // this connection maximum number of prefetched messages - protected int _maxPrefetch; + private int _maxPrefetch; //Indicates whether persistent messages are synchronized private boolean _syncPersistence; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index e5980d8b7d..e6c3473cb1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -39,6 +39,15 @@ public interface AMQConnectionDelegate Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException; + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException thrown if there is a problem creating the session. + */ + XASession createXASession() throws JMSException; + XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; void failoverPrep(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 927929c94a..211f72e9ba 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -99,6 +99,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return session; } + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException + */ + public XASession createXASession() throws JMSException + { + return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); + } + /** * create an XA Session and start it if required. */ diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 9876393d4c..45a134a0e6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -191,6 +191,18 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate }, _conn).execute(); } + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException thrown if there is a problem creating the session. + */ + public XASession createXASession() throws JMSException + { + return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); + } + private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException, FailoverException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index fc81e32e4d..dd9a00ce10 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -21,7 +21,6 @@ package org.apache.qpid.client; import java.io.Serializable; -import java.io.IOException; import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; @@ -92,7 +91,6 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.thread.Threading; import org.apache.qpid.url.AMQBindingURL; -import org.apache.mina.common.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,8 +113,6 @@ import org.slf4j.LoggerFactory; public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession { - - public static final class IdToConsumerMap { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -263,10 +259,10 @@ public abstract class AMQSession messages = sendNumberedBytesMessage(_session, _queue, 10); + + // Consume first messasge + Message received = _consumer.receive(2000); + + // Verify received messages + assertNotNull("First message not received.", received); + assertEquals("Incorrect message Received", + messages.remove(0).getIntProperty("count"), + received.getIntProperty("count")); + + // Allow ack to be sent to broker, by performing a synchronous command + // along the session. +// _session.createConsumer(_session.createTemporaryQueue()).close(); + + //Retain IO Layer + AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession(); + + // Send IO Exception - causing failover + _connection.getProtocolHandler(). + exception(new WriteTimeoutException("WriteTimeoutException to cause failover.")); + + // Verify Failover occured through ConnectionListener + assertTrue("Failover did not occur", + _failoverOccured.await(4000, TimeUnit.MILLISECONDS)); + + //Verify new protocolSession is not the same as the original + assertNotSame("Protocol Session has not changed", + protocolSession, + _connection.getProtocolHandler().getProtocolSession()); + + /***********************************/ + // This verifies that the bug has been resolved + + // Attempt to consume again. Expect 9 messages + for (int count = 1; count < 10; count++) + { + received = _consumer.receive(2000); + assertNotNull("Expected message not received:" + count, received); + assertEquals(messages.remove(0).getIntProperty("count"), + received.getIntProperty("count")); + } + + //Verify there are no more messages + received = _consumer.receive(1000); + assertNull("Message receieved when there should be none:" + received, + received); + +// /***********************************/ +// // This verifies that the bug exists +// +// // Attempt to consume remaining 9 messages.. Expecting NONE. +// // receiving just one message should fail so no need to fail 9 times +// received = _consumer.receive(1000); +// assertNull("Message receieved when it should be null:" + received, received); +// +//// //Close the Connection which you would assume would free the messages +//// _connection.close(); +//// +//// // Reconnect +//// initialiseConnection(); +//// +//// // We should still be unable to receive messages +//// received = _consumer.receive(1000); +//// assertNull("Message receieved when it should be null:" + received, received); +//// +//// _connection.close(); +// +// // Close original IO layer. Expecting messages to be released +// protocolSession.closeProtocolSession(); +// +// // Reconnect and all should be good. +//// initialiseConnection(); +// +// // Attempt to consume again. Expect 9 messages +// for (int count = 1; count < 10; count++) +// { +// received = _consumer.receive(2000); +// assertNotNull("Expected message not received:" + count, received); +// assertEquals(messages.remove(0).getIntProperty("count"), +// received.getIntProperty("count")); +// } +// +// //Verify there are no more messages +// received = _consumer.receive(1000); +// assertNull("Message receieved when there should be none:" + received, +// received); + } + + private void initialiseConnection() + throws Exception + { + //Create Connection + _connection = (AMQConnection) getConnection(); + _connection.setConnectionListener(this); + + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _queue = _session.createQueue(getName()); + + // Create Consumer + _consumer = _session.createConsumer(_queue); + + //Start connection + _connection.start(); + } + + /** QpidTestCase back port to this release */ + + // modified from QTC as sendMessage is not testable. + // - should be renamed sendBlankBytesMessage + // - should be renamed sendNumberedBytesMessage + public List sendNumberedBytesMessage(Session session, Destination destination, + int count) throws Exception + { + List messages = new ArrayList(count); + + MessageProducer producer = session.createProducer(destination); + + for (int i = 0; i < count; i++) + { + Message next = session.createMessage(); + + next.setIntProperty("count", count); + + producer.send(next); + + messages.add(next); + } + + producer.close(); + return messages; + } + + public void bytesSent(long count) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + //Allow failover to occur + return true; + } + + public boolean preResubscribe() + { + //Allow failover to occur + return true; + } + + public void failoverComplete() + { + _failoverOccured.countDown(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index 97147904e1..f220760511 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -26,15 +26,18 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.server.logging.AbstractTestLogging; import org.apache.qpid.framing.AMQShortString; import javax.jms.*; import javax.naming.NamingException; import java.util.HashMap; import java.util.Map; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.io.IOException; -public class ProducerFlowControlTest extends QpidTestCase +public class ProducerFlowControlTest extends AbstractTestLogging { private static final int TIMEOUT = 1500; @@ -56,10 +59,12 @@ public class ProducerFlowControlTest extends QpidTestCase private MessageConsumer consumer; private final AtomicInteger _sentMessages = new AtomicInteger(); - protected void setUp() throws Exception + public void setUp() throws Exception { super.setUp(); + _monitor.reset(); + producerConnection = getConnection(); producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -70,7 +75,7 @@ public class ProducerFlowControlTest extends QpidTestCase } - protected void tearDown() throws Exception + public void tearDown() throws Exception { producerConnection.close(); consumerConnection.close(); @@ -115,6 +120,76 @@ public class ProducerFlowControlTest extends QpidTestCase assertEquals("Message not sent after two messages received", 5, _sentMessages.get()); + } + + public void testBrokerLogMessages() + throws JMSException, NamingException, AMQException, InterruptedException, IOException + { + final Map arguments = new HashMap(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + List results = _monitor.findMatches("QUE-1003"); + + assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + while(consumer.receive(1000) != null); + + results = _monitor.findMatches("QUE-1004"); + + assertEquals("Did not find correct number of QUE_1004 queue underfull messages", 1, results.size()); + + + + } + + + public void testClientLogMessages() + throws JMSException, NamingException, AMQException, InterruptedException, IOException + { + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); + setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000"); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + final Map arguments = new HashMap(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) session).declareAndBind((AMQDestination)queue); + producer = session.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(10000); + List results = _monitor.findMatches("Message send delayed by"); + assertEquals("Incorrect number of delay messages logged by client",3,results.size()); + results = _monitor.findMatches("Message send failed due to timeout waiting on broker enforced flow control"); + assertEquals("Incorrect number of send failure messages logged by client",1,results.size()); + + + } @@ -131,7 +206,6 @@ public class ProducerFlowControlTest extends QpidTestCase _sentMessages.set(0); - // try to send 5 messages (should block after 4) sendMessagesAsync(producer, producerSession, 5, 50L); @@ -212,8 +286,7 @@ public class ProducerFlowControlTest extends QpidTestCase public void testSendTimeout() throws JMSException, NamingException, AMQException, InterruptedException { - long origTimeoutValue = Long.getLong("qpid.flow_control_wait_failure",AMQSession.DEFAULT_FLOW_CONTROL_WAIT_FAILURE); - System.setProperty("qpid.flow_control_wait_failure","3000"); + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -237,10 +310,6 @@ public class ProducerFlowControlTest extends QpidTestCase assertNotNull("No timeout exception on sending", e); - System.setProperty("qpid.flow_control_wait_failure",String.valueOf(origTimeoutValue)); - - - } private MessageSender sendMessagesAsync(final MessageProducer producer, diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java index bb7b5efc75..c198b33bee 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java @@ -21,31 +21,27 @@ package org.apache.qpid.server.security.acl; - -import junit.framework.TestCase; - -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.Logger; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.client.*; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.AMQConnectionFailureException; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.AMQException; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.client.*; +import org.apache.qpid.framing.AMQShortString; import javax.jms.*; import javax.jms.IllegalStateException; import javax.naming.NamingException; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + public class SimpleACLTest extends QpidTestCase implements ConnectionListener, ExceptionListener { private String BROKER = "vm://:"+ApplicationRegistry.DEFAULT_INSTANCE;//"tcp://localhost:5672"; @@ -53,6 +49,14 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E private CountDownLatch _awaitError = new CountDownLatch(51); public void setUp() throws Exception + { + //Performing setUp here would result in a broker with the default ACL test config + + //Each test now calls the private setUpACLTest to allow them to make + //individual customisations to the base ACL settings + } + + private void setUpACLTest() throws Exception { final String QPID_HOME = System.getProperty("QPID_HOME"); @@ -73,8 +77,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E return "amqp://" + username + ":" + password + "@clientid/test?brokerlist='" + getBroker() + "?retries='0''"; } - public void testAccessAuthorized() throws AMQException, URLSyntaxException + public void testAccessAuthorized() throws AMQException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -96,6 +102,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testAccessNoRights() throws Exception { + setUpACLTest(); + try { Connection conn = getConnection("guest", "guest"); @@ -120,8 +128,40 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E } } - public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException + public void testGuestConsumeWithCreateRightsAndWithoutConsumeRights() throws NamingException, ConfigurationException, IOException, Exception + { + //Customise the ACL config to give the guest user some create (could be any, non-consume) rights to + //force creation of a PrincipalPermissions instance to perform the consume rights check against. + setConfigurationProperty("virtualhosts.virtualhost.test.security.access_control_list.create.queues.queue.users.user", "guest"); + + setUpACLTest(); + + try + { + Connection conn = getConnection("guest", "guest"); + + Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + sesh.createConsumer(sesh.createQueue("example.RequestQueue")); + + conn.close(); + } + catch (JMSException e) + { + Throwable cause = e.getLinkedException(); + + assertNotNull("There was no liked exception", cause); + assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass()); + assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode()); + } + } + + public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -142,6 +182,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testClientConsumeFromNamedQueueInvalid() throws NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -167,8 +209,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E } } - public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException + public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -191,6 +235,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testClientCreateNamedQueue() throws NamingException, JMSException, AMQException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -212,8 +258,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E } } - public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException + public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -239,8 +287,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E } } - public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException + public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -271,6 +321,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -311,8 +363,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException); } - public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException + public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("server", "guest"); @@ -333,6 +387,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException, NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -358,6 +414,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException, NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("server", "guest"); @@ -391,8 +449,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E return (Connection) connection; } - public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException + public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("server", "guest"); @@ -414,6 +474,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testServerCreateNamedQueueInvalid() throws JMSException, URLSyntaxException, AMQException, NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("server", "guest"); @@ -436,6 +498,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testServerCreateTemporaryQueueInvalid() throws NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("server", "guest"); @@ -461,6 +525,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception { + setUpACLTest(); + Connection connection = null; try { @@ -492,6 +558,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E */ public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception { + setUpACLTest(); + //Set up the Server Connection serverConnection = getConnection("server", "guest"); @@ -572,6 +640,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("server", "guest"); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java index 30cc48691f..eb36522fac 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java @@ -61,7 +61,7 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con * @param transacted create a transacted session for this test * @param mode if not transacted what ack mode to use for this test * @throws Exception if a problem occured during test setup. - */ + */ @Override protected void init(boolean transacted, int mode) throws Exception { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java index 73308d41c0..4254727d36 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java @@ -77,6 +77,9 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message { break; } + // Remember the currentCount as the lastCount for the next cycle. + // so we can exit if things get locked up. + lastCount = currentCount; complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index fc82ff1778..b1d14721bd 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -57,7 +57,7 @@ public class FailoverBaseCase extends QpidTestCase super.setUp(); // Set QPID_WORK to $QPID_WORK/ // or /tmp/ if QPID_WORK not set. - setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort()); + setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK") + "/" + getFailingPort()); startBroker(getFailingPort()); } @@ -97,7 +97,7 @@ public class FailoverBaseCase extends QpidTestCase // Ensure we shutdown any secondary brokers, even if we are unable // to cleanly tearDown the QTC. stopBroker(getFailingPort()); - FileUtils.deleteDirectory(System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort()); + FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort()); } } diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes index 488ae24022..757a1e425c 100755 --- a/qpid/java/test-profiles/010Excludes +++ b/qpid/java/test-profiles/010Excludes @@ -99,3 +99,5 @@ org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError //QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker) org.apache.qpid.server.queue.ProducerFlowControlTest#* +//QPID-1950 : Commit to test this failure. This is a MINA only failure so it cannot be tested when using 010. +org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#* diff --git a/qpid/java/test-profiles/Excludes b/qpid/java/test-profiles/Excludes index d14d467b89..aa60554c04 100644 --- a/qpid/java/test-profiles/Excludes +++ b/qpid/java/test-profiles/Excludes @@ -30,3 +30,4 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testClientAck // QPID-143 : Failover can occur between receive and ack but we don't stop the ack. org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testAutoAck org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDupsOk + diff --git a/qpid/java/test-profiles/test-provider.properties b/qpid/java/test-profiles/test-provider.properties index 70a2672263..8cea012c1d 100644 --- a/qpid/java/test-profiles/test-provider.properties +++ b/qpid/java/test-profiles/test-provider.properties @@ -34,7 +34,7 @@ connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20'' connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20'' -connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1' +connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'&failover='roundrobin?cyclecount='20'' connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}' connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt}' -- cgit v1.2.1