summaryrefslogtreecommitdiff
path: root/java/systests/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2010-01-31 00:31:49 +0000
committerRobert Godfrey <rgodfrey@apache.org>2010-01-31 00:31:49 +0000
commit2b8bb96fca71909d1dc185e1f62ee5fdaad02abd (patch)
tree919f0119bd3d23d97b497c5fa486121d4b5e286d /java/systests/src
parentf038a9ea62f563979678c2f1251d1eda82f1f20f (diff)
downloadqpid-python-2b8bb96fca71909d1dc185e1f62ee5fdaad02abd.tar.gz
QPID-2379 : Initial work on adding QMF and federation to the Java Broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@904934 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java435
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java83
4 files changed, 486 insertions, 40 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java b/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java
index 1683c5e3d6..458c8b44c3 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java
@@ -21,13 +21,14 @@
package org.apache.qpid.server.failover;
import junit.framework.TestCase;
+
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.url.URLSyntaxException;
import javax.jms.ExceptionListener;
@@ -91,7 +92,7 @@ public class FailoverMethodTest extends TestCase implements ExceptionListener
// then TCP NoDelay 0 Delay 1 Delay 2 Delay 3
// so 3 delays of 2s in total for connection
// as this is a tcp connection it will take 1second per connection to fail
- // so max time is 6seconds of delay plus 4 seconds of TCP Delay + 1 second of runtime. == 11 seconds
+ // so max time is 6seconds of delay plus 4 seconds of TCP Delay + 1 second of runtime. == 11 seconds
// Ensure we actually had the delay
assertTrue("Failover took less than 6 seconds", duration > 6000);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
new file mode 100644
index 0000000000..515d93a4e5
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
@@ -0,0 +1,435 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.url.AMQBindingURL;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConflationQueueTest extends QpidTestCase
+{
+ private static final int TIMEOUT = 1500;
+
+
+ private static final Logger _logger = Logger.getLogger(ConflationQueueTest.class);
+
+
+
+ protected final String VHOST = "/test";
+ protected final String QUEUE = "ConflationQueue";
+
+ private static final int MSG_COUNT = 400;
+
+ private Connection producerConnection;
+ private MessageProducer producer;
+ private Session producerSession;
+ private Queue queue;
+ private Connection consumerConnection;
+ private Session consumerSession;
+
+
+ private MessageConsumer consumer;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ producerConnection = getConnection();
+ producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ producerConnection.start();
+
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ producerConnection.close();
+ consumerConnection.close();
+ super.tearDown();
+ }
+
+ public void testConflation() throws Exception
+ {
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.last_value_queue_key","key");
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
+ queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ }
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+ Message received;
+
+ List<Message> messages = new ArrayList<Message>();
+ while((received = consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ }
+
+
+ }
+
+
+ public void testConflationWithRelease() throws Exception
+ {
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.last_value_queue_key","key");
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
+ queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT/2; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+
+ }
+
+ // HACK to do something synchronous
+ ((AMQSession)producerSession).sync();
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+ Message received;
+ List<Message> messages = new ArrayList<Message>();
+ while((received = consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg"));
+ }
+
+ consumerSession.close();
+ consumerConnection.close();
+
+
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ }
+
+
+ // HACK to do something synchronous
+ ((AMQSession)producerSession).sync();
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ messages = new ArrayList<Message>();
+ while((received = consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ }
+
+ }
+
+
+
+ public void testConflationWithReleaseAfterNewPublish() throws Exception
+ {
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.last_value_queue_key","key");
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
+ queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT/2; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ }
+
+ // HACK to do something synchronous
+ ((AMQSession)producerSession).sync();
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+ Message received;
+ List<Message> messages = new ArrayList<Message>();
+ while((received = consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg"));
+ }
+
+ consumer.close();
+
+ for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ }
+
+ // HACK to do something synchronous
+ ((AMQSession)producerSession).sync();
+
+
+ // this causes the "old" messages to be released
+ consumerSession.close();
+ consumerConnection.close();
+
+
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ messages = new ArrayList<Message>();
+ while((received = consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ }
+
+ }
+
+ public void testConflationBrowser() throws Exception
+ {
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.last_value_queue_key","key");
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
+ queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+
+ }
+
+ ((AMQSession)producerSession).sync();
+
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'&durable='true'");
+ AMQQueue browseQueue = new AMQQueue(url);
+
+ consumer = consumerSession.createConsumer(browseQueue);
+ consumerConnection.start();
+ Message received;
+ List<Message> messages = new ArrayList<Message>();
+ while((received = consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received",10,messages.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ }
+
+ messages.clear();
+
+ producer.send(nextMessage(MSG_COUNT, producerSession));
+
+ ((AMQSession)producerSession).sync();
+
+ while((received = consumer.receive(1000))!=null)
+ {
+ messages.add(received);
+ }
+ assertEquals("Unexpected number of messages received",1,messages.size());
+ assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty("msg"));
+
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+
+
+ }
+
+
+ public void testConflation2Browsers() throws Exception
+ {
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.last_value_queue_key","key");
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
+ queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+
+ }
+
+ ((AMQSession)producerSession).sync();
+
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'&durable='true'");
+ AMQQueue browseQueue = new AMQQueue(url);
+
+ consumer = consumerSession.createConsumer(browseQueue);
+ MessageConsumer consumer2 = consumerSession.createConsumer(browseQueue);
+ consumerConnection.start();
+ List<Message> messages = new ArrayList<Message>();
+ List<Message> messages2 = new ArrayList<Message>();
+ Message received = consumer.receive(1000);
+ Message received2 = consumer2.receive(1000);
+
+ while(received!=null || received2!=null)
+ {
+ if(received != null)
+ {
+ messages.add(received);
+ }
+ if(received2 != null)
+ {
+ messages2.add(received2);
+ }
+
+
+ received = consumer.receive(1000);
+ received2 = consumer2.receive(1000);
+
+ }
+
+ assertEquals("Unexpected number of messages received on first browser",10,messages.size());
+ assertEquals("Unexpected number of messages received on second browser",10,messages2.size());
+
+ for(int i = 0 ; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ msg = messages2.get(i);
+ assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ }
+
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+
+
+ }
+
+
+
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msg);
+
+ send.setStringProperty("key", String.valueOf(msg % 10));
+ send.setIntProperty("msg", msg);
+
+ return send;
+ }
+
+
+}
+
+
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
index ded2e0913b..395ced436b 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
@@ -24,12 +24,13 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
index d9b0a93132..84ff7055c5 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -21,12 +21,16 @@ import junit.framework.TestCase;
import junit.framework.TestResult;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.log4j.Level;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.server.configuration.ServerConfiguration;
@@ -35,9 +39,6 @@ import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.store.DerbyMessageStore;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.util.LogMonitor;
-import org.apache.log4j.Level;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -58,7 +59,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.io.PrintStream;
-import java.io.Reader;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -78,7 +78,7 @@ public class QpidTestCase extends TestCase
protected static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
protected static final int LOGMONITOR_TIMEOUT = 5000;
-
+
protected long RECEIVE_TIMEOUT = 1000l;
private Map<String, String> _propertiesSetForTestOnly = new HashMap<String, String>();
@@ -166,7 +166,7 @@ public class QpidTestCase extends TestCase
private static final String TEST_OUTPUT = "test.output";
private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave";
private static final String BROKER_LOG_PREFIX = "broker.log.prefix";
-
+
// values
protected static final String JAVA = "java";
protected static final String CPP = "cpp";
@@ -187,14 +187,14 @@ public class QpidTestCase extends TestCase
private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS);
private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
private String _output = System.getProperty(TEST_OUTPUT);
-
- private static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: ");
+
+ private static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: ");
protected static boolean _interleaveBrokerLog = Boolean.getBoolean(BROKER_LOG_INTERLEAVE);
-
+
protected File _outputFile;
-
+
protected PrintStream _brokerOutputStream;
-
+
private Map<Integer, Process> _brokers = new HashMap<Integer, Process>();
private InitialContext _initialContext;
@@ -216,11 +216,11 @@ public class QpidTestCase extends TestCase
public QpidTestCase()
{
- super("QpidTestCase");
+ this("QpidTestCase");
}
public void runBare() throws Throwable
- {
+ {
_testName = getClass().getSimpleName() + "." + getName();
String qname = getClass().getName() + "." + getName();
@@ -231,7 +231,7 @@ public class QpidTestCase extends TestCase
PrintStream oldErr = System.err;
PrintStream out = null;
PrintStream err = null;
-
+
boolean redirected = _output != null && _output.length() > 0;
if (redirected)
{
@@ -240,15 +240,15 @@ public class QpidTestCase extends TestCase
err = new PrintStream(String.format("%s/TEST-%s.err", _output, qname));
System.setOut(out);
System.setErr(err);
-
+
if (_interleaveBrokerLog)
{
- _brokerOutputStream = out;
+ _brokerOutputStream = out;
}
else
{
_brokerOutputStream = new PrintStream(new FileOutputStream(String
- .format("%s/TEST-%s.broker.out", _output, qname)), true);
+ .format("%s/TEST-%s.broker.out", _output, qname)), true);
}
}
@@ -267,7 +267,7 @@ public class QpidTestCase extends TestCase
{
_logger.error("exception stopping broker", e);
}
-
+
if(_brokerCleanBetweenTests)
{
try
@@ -279,7 +279,7 @@ public class QpidTestCase extends TestCase
_logger.error("exception cleaning up broker", e);
}
}
-
+
_logger.info("========== stop " + _testName + " ==========");
if (redirected)
@@ -289,7 +289,7 @@ public class QpidTestCase extends TestCase
err.close();
out.close();
if (!_interleaveBrokerLog)
- {
+ {
_brokerOutputStream.close();
}
}
@@ -380,13 +380,13 @@ public class QpidTestCase extends TestCase
{
String line;
while ((line = in.readLine()) != null)
- {
+ {
if (_interleaveBrokerLog)
{
line = _brokerLogPrefix + line;
}
out.println(line);
-
+
if (latch != null && line.contains(ready))
{
seenReady = true;
@@ -433,7 +433,7 @@ public class QpidTestCase extends TestCase
*/
protected int getManagementPort(int mainPort)
{
- return mainPort + (DEFAULT_MANAGEMENT_PORT - DEFAULT_PORT);
+ return mainPort + (DEFAULT_MANAGEMENT_PORT - (_broker.equals(VM) ? DEFAULT_VM_PORT : DEFAULT_PORT));
}
/**
@@ -484,7 +484,16 @@ public class QpidTestCase extends TestCase
setConfigurationProperty("management.jmxport", String.valueOf(getManagementPort(port)));
saveTestConfiguration();
// create an in_VM broker
- ApplicationRegistry.initialise(new ConfigurationFileApplicationRegistry(_configFile), port);
+ final ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(_configFile);
+ try
+ {
+ ApplicationRegistry.initialise(registry, port);
+ }
+ catch (Exception e)
+ {
+ registry.close();
+ throw e;
+ }
TransportConnection.createVMBroker(port);
}
else if (!_broker.equals(EXTERNAL))
@@ -511,12 +520,12 @@ public class QpidTestCase extends TestCase
// Use the environment variable to set amqj.logging.level for the broker
// The value used is a 'server' value in the test configuration to
- // allow a differentiation between the client and broker logging levels.
+ // allow a differentiation between the client and broker logging levels.
if (System.getProperty("amqj.server.logging.level") != null)
{
setBrokerEnvironment("AMQJ_LOGGING_LEVEL", System.getProperty("amqj.server.logging.level"));
}
-
+
// Add all the environment settings the test requested
if (!_env.isEmpty())
{
@@ -766,7 +775,7 @@ public class QpidTestCase extends TestCase
_propertiesSetForBroker.put(property, value);
}
- }
+ }
/**
* Set a System (-D) property for this test run.
@@ -790,7 +799,7 @@ public class QpidTestCase extends TestCase
* Java Broker via a -D value defined in QPID_OPTS.
*
* If the value should not be set on the broker then use
- * setTestClientSystemProperty().
+ * setTestClientSystemProperty().
*
* @param property the property to set
* @param value the new value to use
@@ -800,7 +809,7 @@ public class QpidTestCase extends TestCase
// Record the value for the external broker
_propertiesSetForBroker.put(property, value);
- //Set the value for the test client vm aswell.
+ //Set the value for the test client vm aswell.
setTestClientSystemProperty(property, value);
}
@@ -816,7 +825,7 @@ public class QpidTestCase extends TestCase
{
// Record the current value so we can revert it later.
_propertiesSetForTestOnly.put(property, System.getProperty(property));
- }
+ }
System.setProperty(property, value);
}
@@ -1153,7 +1162,7 @@ public class QpidTestCase extends TestCase
int count, int batchSize) throws Exception
{
return sendMessage(session, destination, count, 0, batchSize);
- }
+ }
/**
* Send messages to the given destination.
@@ -1199,7 +1208,7 @@ public class QpidTestCase extends TestCase
// Ensure we commit the last messages
// Commit the session if we are transacted and
// we have no batchSize or
- // our count is not divible by batchSize.
+ // our count is not divible by batchSize.
if (session.getTransacted() &&
( batchSize == 0 || count % batchSize != 0))
{
@@ -1249,13 +1258,13 @@ public class QpidTestCase extends TestCase
{
reloadBroker(0);
}
-
+
public void reloadBroker(int port) throws ConfigurationException, IOException
{
if (_broker.equals(VM))
{
ApplicationRegistry.getInstance().getConfiguration().reparseConfigFileSecuritySections();
- }
+ }
else // FIXME: should really use the JMX interface to do this
{
/*
@@ -1266,11 +1275,11 @@ public class QpidTestCase extends TestCase
BufferedReader reader = new BufferedReader (new InputStreamReader(p.getInputStream()));
String cmd = "/bin/kill -SIGHUP " + reader.readLine();
p = Runtime.getRuntime().exec(cmd);
-
+
LogMonitor _monitor = new LogMonitor(_outputFile);
assertTrue("The expected server security configuration reload did not occur",
_monitor.waitForMessage(ServerConfiguration.SECURITY_CONFIG_RELOADED, LOGMONITOR_TIMEOUT));
-
+
}
}
}