summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/systests
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2013-09-23 23:45:59 +0000
committerRobert Gemmell <robbie@apache.org>2013-09-23 23:45:59 +0000
commit9b5723a1f2a3b72c81c143f78d7b8dc5ee6b7271 (patch)
tree7a71d4b9db6fe7054fd234f2a03477c7b2352180 /qpid/java/bdbstore/systests
parente0b22df37ecacc0c807e07b76280765ecf0343d5 (diff)
downloadqpid-python-9b5723a1f2a3b72c81c143f78d7b8dc5ee6b7271.tar.gz
QPID-5162: move the bdbstore-specific systests to their own module
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1525741 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/systests')
-rw-r--r--qpid/java/bdbstore/systests/build.xml28
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java193
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java612
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java544
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java167
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java289
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java217
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java266
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java429
9 files changed, 2745 insertions, 0 deletions
diff --git a/qpid/java/bdbstore/systests/build.xml b/qpid/java/bdbstore/systests/build.xml
new file mode 100644
index 0000000000..ca809ad0cb
--- /dev/null
+++ b/qpid/java/bdbstore/systests/build.xml
@@ -0,0 +1,28 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+nn - or more contributor license agreements. See the NOTICE file
+ -n distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="BDBStoreSystemTests" default="build">
+ <property name="module.depends" value="qpid-test-utils systests client management/common broker-core broker-core/tests common amqp-1-0-common common/tests jca ra broker-plugins/access-control broker-plugins/management-http broker-plugins/management-jmx broker-plugins/memory-store broker-plugins/derby-store broker-plugins/amqp-0-8-protocol broker-plugins/amqp-0-10-protocol broker-plugins/amqp-1-0-protocol broker-plugins/amqp-msg-conv-0-8-to-0-10 broker-plugins/amqp-msg-conv-0-8-to-1-0 broker-plugins/amqp-msg-conv-0-10-to-1-0 bdbstore bdbstore/jmx"/>
+ <property name="module.test.src" location="src/main/java"/>
+
+ <import file="../../module.xml"/>
+
+ <property name="module.src.resources.extra" location="../src/test/resources"/>
+</project>
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
new file mode 100644
index 0000000000..7c04d83e79
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.utils.Piper;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.FileUtils;
+
+/**
+ * Tests the BDB backup script can successfully perform a backup and that
+ * backup can be restored and used by the Broker.
+ */
+public class BDBBackupTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(BDBBackupTest.class);
+
+ private static final String BACKUP_SCRIPT = "/bin/backup.sh";
+ private static final String BACKUP_COMPLETE_MESSAGE = "Hot Backup Completed";
+
+ private static final String TEST_VHOST = "test";
+ private static final String SYSTEM_TMP_DIR = System.getProperty("java.io.tmpdir");
+
+ private File _backupToDir;
+ private File _backupFromDir;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName());
+ _backupToDir.mkdirs();
+
+ final String qpidWork = getBroker(DEFAULT_PORT).getWorkingDirectory();
+
+ // It would be preferable to lookup the store path using #getConfigurationStringProperty("virtualhosts...")
+ // but the config as known to QBTC does not pull-in the virtualhost section from its separate source file
+ _backupFromDir = new File(qpidWork + File.separator + TEST_VHOST + "-store");
+ boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory();
+ assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ FileUtils.delete(_backupToDir, true);
+ }
+ }
+
+ public void testBackupAndRestoreMaintainsMessages() throws Exception
+ {
+ sendNumberedMessages(0, 10);
+ invokeBdbBackup(_backupFromDir, _backupToDir);
+ sendNumberedMessages(10, 20);
+ confirmBrokerHasMessages(0, 20);
+ stopBroker();
+
+ deleteStore(_backupFromDir);
+ replaceStoreWithBackup(_backupToDir, _backupFromDir);
+
+ startBroker();
+ confirmBrokerHasMessages(0, 10);
+ }
+
+ private void sendNumberedMessages(final int startIndex, final int endIndex) throws JMSException, Exception
+ {
+ Connection con = getConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(getTestQueueName());
+ // Create queue by consumer side-effect
+ session.createConsumer(destination).close();
+
+ final int numOfMessages = endIndex - startIndex;
+ final int batchSize = 0;
+ sendMessage(session, destination, numOfMessages, startIndex, batchSize);
+ con.close();
+ }
+
+ private void confirmBrokerHasMessages(final int startIndex, final int endIndex) throws Exception
+ {
+ Connection con = getConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ con.start();
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ for (int i = startIndex; i < endIndex; i++)
+ {
+ Message msg = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message " + i + " not received", msg);
+ assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX));
+ }
+
+ Message msg = consumer.receive(100);
+ if(msg != null)
+ {
+ fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX));
+ }
+ con.close();
+ }
+
+ private void invokeBdbBackup(final File backupFromDir, final File backupToDir) throws Exception
+ {
+ if (String.valueOf(System.getProperty("os.name")).toLowerCase().contains("windows"))
+ {
+ BDBBackup.main(new String[]{"-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()});
+ }
+ else
+ {
+ runBdbBackupScript(backupFromDir, backupToDir);
+ }
+ }
+
+ private void runBdbBackupScript(final File backupFromDir, final File backupToDir) throws IOException,
+ InterruptedException
+ {
+ Process backupProcess = null;
+ try
+ {
+ String qpidHome = System.getProperty(QPID_HOME);
+ ProcessBuilder pb = new ProcessBuilder(qpidHome + BACKUP_SCRIPT, "-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath());
+ pb.redirectErrorStream(true);
+ Map<String, String> env = pb.environment();
+ env.put(QPID_HOME, qpidHome);
+
+ LOGGER.debug("Backup command is " + pb.command());
+ backupProcess = pb.start();
+ Piper piper = new Piper(backupProcess.getInputStream(), _testcaseOutputStream, null, BACKUP_COMPLETE_MESSAGE);
+ piper.start();
+ piper.await(2, TimeUnit.SECONDS);
+ backupProcess.waitFor();
+ piper.join();
+
+ LOGGER.debug("Backup command completed " + backupProcess.exitValue());
+ assertEquals("Unexpected exit value from backup script", 0, backupProcess.exitValue());
+ }
+ finally
+ {
+ if (backupProcess != null)
+ {
+ backupProcess.getErrorStream().close();
+ backupProcess.getInputStream().close();
+ backupProcess.getOutputStream().close();
+ }
+ }
+ }
+
+ private void replaceStoreWithBackup(File source, File dst) throws Exception
+ {
+ LOGGER.debug("Copying store " + source + " to " + dst);
+ FileUtils.copyRecursive(source, dst);
+ }
+
+ private void deleteStore(File storeDir)
+ {
+ LOGGER.debug("Deleting store " + storeDir);
+ FileUtils.delete(storeDir, true);
+ }
+
+}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
new file mode 100644
index 0000000000..76b990038d
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -0,0 +1,612 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10;
+import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
+import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8;
+import org.apache.qpid.server.store.MessageStoreTest;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+
+/**
+ * Subclass of MessageStoreTest which runs the standard tests from the superclass against
+ * the BDB Store as well as additional tests specific to the BDB store-implementation.
+ */
+public class BDBMessageStoreTest extends MessageStoreTest
+{
+ private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+
+ /**
+ * Tests that message metadata and content are successfully read back from a
+ * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
+ * verify their ability to co-exist within the store and be successful retrieved.
+ */
+ public void testBDBMessagePersistence() throws Exception
+ {
+ MessageStore store = getVirtualHost().getMessageStore();
+
+ AbstractBDBMessageStore bdbStore = assertBDBStore(store);
+
+ // Create content ByteBuffers.
+ // Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
+ // Use a single chunk for the 0-10 message as per broker behaviour.
+ String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
+
+ ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
+ ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes());
+
+ ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes());
+ int bodySize = completeContentBody_0_10.limit();
+
+ /*
+ * Create and insert a 0-8 message (metadata and multi-chunk content)
+ */
+ MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
+ BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
+
+ ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
+
+ MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8);
+
+ long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ storedMessage_0_8.addContent(0, firstContentBytes_0_8);
+ storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
+ storedMessage_0_8.flushToStore();
+
+ /*
+ * Create and insert a 0-10 message (metadata and content)
+ */
+ MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
+ DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
+ Header header_0_10 = new Header(delProps_0_10, msgProps_0_10);
+
+ MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
+
+ MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
+ StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10);
+
+ long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime();
+ long messageid_0_10 = storedMessage_0_10.getMessageNumber();
+
+ storedMessage_0_10.addContent(0, completeContentBody_0_10);
+ storedMessage_0_10.flushToStore();
+
+ /*
+ * reload the store only (read-only)
+ */
+ AbstractBDBMessageStore readOnlyStore = reloadStore(bdbStore);
+
+ /*
+ * Read back and validate the 0-8 message metadata and content
+ */
+ StorableMessageMetaData storeableMMD_0_8 = readOnlyStore.getMessageMetaData(messageid_0_8);
+
+ assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal());
+ assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
+ MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8;
+
+ assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime());
+
+ MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo();
+ assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange());
+ assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate());
+ assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory());
+ assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey());
+
+ ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody();
+ assertEquals("ContentHeader ClassID has changed", chb_0_8.getClassId(), returnedHeaderBody_0_8.getClassId());
+ assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight());
+ assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize());
+
+ BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties();
+ assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString());
+ assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
+
+ ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ;
+ long recoveredCount_0_8 = readOnlyStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
+ assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8);
+ String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
+ assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
+
+ /*
+ * Read back and validate the 0-10 message metadata and content
+ */
+ StorableMessageMetaData storeableMMD_0_10 = readOnlyStore.getMessageMetaData(messageid_0_10);
+
+ assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal());
+ assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
+ MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10;
+
+ assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime());
+
+ DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties();
+ assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10);
+ assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate());
+ assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey());
+ assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange());
+ assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration());
+ assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority());
+
+ MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties();
+ assertNotNull("MessageProperties were not returned", returnedMsgProps);
+ assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId()));
+ assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength());
+ assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
+
+ ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
+ long recoveredCount = readOnlyStore.getContent(messageid_0_10, 0, recoveredContent);
+ assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
+
+ String returnedPayloadString_0_10 = new String(recoveredContent.array());
+ assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
+
+ readOnlyStore.close();
+ }
+
+ private DeliveryProperties createDeliveryProperties_0_10()
+ {
+ DeliveryProperties delProps_0_10 = new DeliveryProperties();
+
+ delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+ delProps_0_10.setImmediate(true);
+ delProps_0_10.setExchange("exchange12345");
+ delProps_0_10.setRoutingKey("routingKey12345");
+ delProps_0_10.setExpiration(5);
+ delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE);
+
+ return delProps_0_10;
+ }
+
+ private MessageProperties createMessageProperties_0_10(int bodySize)
+ {
+ MessageProperties msgProps_0_10 = new MessageProperties();
+ msgProps_0_10.setContentLength(bodySize);
+ msgProps_0_10.setCorrelationId("qwerty".getBytes());
+ msgProps_0_10.setContentType("text/html");
+
+ return msgProps_0_10;
+ }
+
+ /**
+ * Close the provided store and create a new (read-only) store to read back the data.
+ *
+ * Use this method instead of reloading the virtual host like other tests in order
+ * to avoid the recovery handler deleting the message for not being on a queue.
+ */
+ private AbstractBDBMessageStore reloadStore(AbstractBDBMessageStore messageStore) throws Exception
+ {
+ messageStore.close();
+
+ AbstractBDBMessageStore newStore = new BDBMessageStore();
+ newStore.configure(getVirtualHostModel(),true);
+
+ newStore.startWithNoRecover();
+
+ return newStore;
+ }
+
+ private MessagePublishInfo createPublishInfoBody_0_8()
+ {
+ return new MessagePublishInfo()
+ {
+ public AMQShortString getExchange()
+ {
+ return new AMQShortString("exchange12345");
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return true;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("routingKey12345");
+ }
+ };
+
+ }
+
+ private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length)
+ {
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+ return new ContentHeaderBody(classForBasic, 1, props, length);
+ }
+
+ private BasicContentHeaderProperties createContentHeaderProperties_0_8()
+ {
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+ props.setContentType("text/html");
+ props.getHeaders().setString("Test", "MST");
+ return props;
+ }
+
+ public void testGetContentWithOffset() throws Exception
+ {
+ MessageStore store = getVirtualHost().getMessageStore();
+ AbstractBDBMessageStore bdbStore = assertBDBStore(store);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ // normal case: offset is 0
+ ByteBuffer dst = ByteBuffer.allocate(10);
+ int length = bdbStore.getContent(messageid_0_8, 0, dst);
+ assertEquals("Unexpected length", CONTENT_BYTES.length, length);
+ byte[] array = dst.array();
+ assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array));
+
+ // offset is in the middle
+ dst = ByteBuffer.allocate(10);
+ length = bdbStore.getContent(messageid_0_8, 5, dst);
+ assertEquals("Unexpected length", 5, length);
+ array = dst.array();
+ byte[] expected = new byte[10];
+ System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5);
+ assertTrue("Unexpected content", Arrays.equals(expected, array));
+
+ // offset beyond the content length
+ dst = ByteBuffer.allocate(10);
+ try
+ {
+ bdbStore.getContent(messageid_0_8, 15, dst);
+ fail("Should fail for the offset greater than message size");
+ }
+ catch (RuntimeException e)
+ {
+ assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id "
+ + messageid_0_8 + "!", e.getMessage());
+ }
+
+ // buffer is smaller then message size
+ dst = ByteBuffer.allocate(5);
+ length = bdbStore.getContent(messageid_0_8, 0, dst);
+ assertEquals("Unexpected length", 5, length);
+ array = dst.array();
+ expected = new byte[5];
+ System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5);
+ assertTrue("Unexpected content", Arrays.equals(expected, array));
+
+ // buffer is smaller then message size, offset is not 0
+ dst = ByteBuffer.allocate(5);
+ length = bdbStore.getContent(messageid_0_8, 2, dst);
+ assertEquals("Unexpected length", 5, length);
+ array = dst.array();
+ expected = new byte[5];
+ System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5);
+ assertTrue("Unexpected content", Arrays.equals(expected, array));
+ }
+ /**
+ * Tests that messages which are added to the store and then removed using the
+ * public MessageStore interfaces are actually removed from the store by then
+ * interrogating the store with its own implementation methods and verifying
+ * expected exceptions are thrown to indicate the message is not present.
+ */
+ public void testMessageCreationAndRemoval() throws Exception
+ {
+ MessageStore store = getVirtualHost().getMessageStore();
+ AbstractBDBMessageStore bdbStore = assertBDBStore(store);
+
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ bdbStore.removeMessage(messageid_0_8, true);
+
+ //verify the removal using the BDB store implementation methods directly
+ try
+ {
+ // the next line should throw since the message id should not be found
+ bdbStore.getMessageMetaData(messageid_0_8);
+ fail("No exception thrown when message id not found getting metadata");
+ }
+ catch (AMQStoreException e)
+ {
+ // pass since exception expected
+ }
+
+ //expecting no content, allocate a 1 byte
+ ByteBuffer dst = ByteBuffer.allocate(1);
+
+ assertEquals("Retrieved content when none was expected",
+ 0, bdbStore.getContent(messageid_0_8, 0, dst));
+ }
+ private AbstractBDBMessageStore assertBDBStore(MessageStore store)
+ {
+
+ assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass());
+
+ return (AbstractBDBMessageStore) store;
+ }
+
+ private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
+ {
+ ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES);
+
+ int bodySize = CONTENT_BYTES.length;
+
+ //create and store the message using the MessageStore interface
+ MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
+ BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
+
+ ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
+
+ MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
+
+ storedMessage_0_8.addContent(0, chunk1);
+ storedMessage_0_8.flushToStore();
+
+ return storedMessage_0_8;
+ }
+
+ /**
+ * Tests transaction commit by utilising the enqueue and dequeue methods available
+ * in the TransactionLog interface implemented by the store, and verifying the
+ * behaviour using BDB implementation methods.
+ */
+ public void testTranCommit() throws Exception
+ {
+ MessageStore log = getVirtualHost().getMessageStore();
+
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
+
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ @Override
+ public UUID getId()
+ {
+ return mockQueueId;
+ }
+ };
+
+ Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, new MockMessage(1L));
+ txn.enqueueMessage(mockQueue, new MockMessage(5L));
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 1L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 5L, val.longValue());
+ }
+
+
+ /**
+ * Tests transaction rollback before a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
+ * implementation methods.
+ */
+ public void testTranRollbackBeforeCommit() throws Exception
+ {
+ MessageStore log = getVirtualHost().getMessageStore();
+
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
+
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ @Override
+ public UUID getId()
+ {
+ return mockQueueId;
+ }
+ };
+
+ Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, new MockMessage(21L));
+ txn.abortTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, new MockMessage(22L));
+ txn.enqueueMessage(mockQueue, new MockMessage(23L));
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 22L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 23L, val.longValue());
+ }
+
+ public void testOnDelete() throws Exception
+ {
+ MessageStore log = getVirtualHost().getMessageStore();
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
+ String storeLocation = bdbStore.getStoreLocation();
+
+ File location = new File(storeLocation);
+ assertTrue("Store does not exist at " + storeLocation, location.exists());
+
+ bdbStore.close();
+ assertTrue("Store does not exist at " + storeLocation, location.exists());
+
+ bdbStore.onDelete();
+ assertFalse("Store exists at " + storeLocation, location.exists());
+ }
+
+ /**
+ * Tests transaction rollback after a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
+ * implementation methods.
+ */
+ public void testTranRollbackAfterCommit() throws Exception
+ {
+ MessageStore log = getVirtualHost().getMessageStore();
+
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
+
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ @Override
+ public UUID getId()
+ {
+ return mockQueueId;
+ }
+ };
+
+ Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, new MockMessage(30L));
+ txn.commitTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, new MockMessage(31L));
+ txn.abortTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, new MockMessage(32L));
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 30L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 32L, val.longValue());
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static class MockMessage implements ServerMessage, EnqueableMessage
+ {
+ private long _messageId;
+
+ public MockMessage(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public String getRoutingKey()
+ {
+ return null;
+ }
+
+ public AMQMessageHeader getMessageHeader()
+ {
+ return null;
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ return null;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public long getSize()
+ {
+ return 0;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public long getExpiration()
+ {
+ return 0;
+ }
+
+ public MessageReference newReference()
+ {
+ return null;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public long getArrivalTime()
+ {
+ return 0;
+ }
+
+ public int getContent(ByteBuffer buf, int offset)
+ {
+ return 0;
+ }
+
+ public ByteBuffer getContent(int offset, int length)
+ {
+ return null;
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
new file mode 100644
index 0000000000..755168ca9c
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -0,0 +1,544 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.io.InputStream;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularDataSupport;
+
+import org.apache.qpid.management.common.mbeans.ManagedExchange;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests upgrading a BDB store on broker startup.
+ * The store will then be used to verify that the upgrade is completed
+ * properly and that once upgraded it functions as expected.
+ *
+ * Store prepared using old client/broker with BDBStoreUpgradeTestPreparer.
+ */
+public class BDBUpgradeTest extends QpidBrokerTestCase
+{
+ protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class);
+
+ private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK");
+
+ private static final String STRING_1024 = generateString(1024);
+ private static final String STRING_1024_256 = generateString(1024*256);
+
+ private static final String TOPIC_NAME="myUpgradeTopic";
+ private static final String SUB_NAME="myDurSubName";
+ private static final String SELECTOR_SUB_NAME="mySelectorDurSubName";
+ private static final String SELECTOR_TOPIC_NAME="mySelectorUpgradeTopic";
+ private static final String QUEUE_NAME="myUpgradeQueue";
+ private static final String NON_DURABLE_QUEUE_NAME="queue-non-durable";
+ private static final String PRIORITY_QUEUE_NAME="myPriorityQueue";
+ private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ";
+
+ private String _storeLocation;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
+ _storeLocation = getWorkDirBaseDir() + File.separator + "test-store";
+
+ //Clear the two target directories if they exist.
+ File directory = new File(_storeLocation);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+ directory.mkdirs();
+
+ // copy store files
+ InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v4/test-store/00000000.jdb");
+ FileUtils.copy(src, new File(_storeLocation, "00000000.jdb"));
+
+ getBrokerConfiguration().addJmxManagementConfiguration();
+ super.setUp();
+ }
+
+ private String getWorkDirBaseDir()
+ {
+ return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort());
+ }
+
+ /**
+ * Test that the selector applied to the DurableSubscription was successfully
+ * transfered to the new store, and functions as expected with continued use
+ * by monitoring message count while sending new messages to the topic and then
+ * consuming them.
+ */
+ public void testSelectorDurability() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SELECTOR_SUB_NAME);
+ assertEquals("DurableSubscription backing queue should have 1 message on it initially",
+ new Integer(1), dursubQueue.getMessageCount());
+
+ // Create a connection and start it
+ TopicConnection connection = (TopicConnection) getConnection();
+ connection.start();
+
+ // Send messages which don't match and do match the selector, checking message count
+ TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
+ Topic topic = pubSession.createTopic(SELECTOR_TOPIC_NAME);
+ TopicPublisher publisher = pubSession.createPublisher(topic);
+
+ publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
+ pubSession.commit();
+ assertEquals("DurableSubscription backing queue should still have 1 message on it",
+ Integer.valueOf(1), dursubQueue.getMessageCount());
+
+ publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+ pubSession.commit();
+ assertEquals("DurableSubscription backing queue should now have 2 messages on it",
+ Integer.valueOf(2), dursubQueue.getMessageCount());
+
+ TopicSubscriber durSub = pubSession.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false);
+ Message m = durSub.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ m = durSub.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ pubSession.commit();
+
+ pubSession.close();
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+ }
+
+ /**
+ * Test that the DurableSubscription without selector was successfully
+ * transfered to the new store, and functions as expected with continued use.
+ */
+ public void testDurableSubscriptionWithoutSelector() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME);
+ assertEquals("DurableSubscription backing queue should have 1 message on it initially",
+ new Integer(1), dursubQueue.getMessageCount());
+
+ // Create a connection and start it
+ TopicConnection connection = (TopicConnection) getConnection();
+ connection.start();
+
+ // Send new message matching the topic, checking message count
+ TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(TOPIC_NAME);
+ TopicPublisher publisher = session.createPublisher(topic);
+
+ publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "indifferent");
+ session.commit();
+ assertEquals("DurableSubscription backing queue should now have 2 messages on it",
+ Integer.valueOf(2), dursubQueue.getMessageCount());
+
+ TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME);
+ Message m = durSub.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ m = durSub.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+
+ session.commit();
+ session.close();
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+ }
+
+ /**
+ * Test that the backing queue for the durable subscription created was successfully
+ * detected and set as being exclusive during the upgrade process, and that the
+ * regular queue was not.
+ */
+ public void testQueueExclusivity() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME);
+ assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive());
+
+ ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME);
+ assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive());
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+ }
+
+ /**
+ * Test that the upgraded queue continues to function properly when used
+ * for persistent messaging and restarting the broker.
+ *
+ * Sends the new messages to the queue BEFORE consuming those which were
+ * sent before the upgrade. In doing so, this also serves to test that
+ * the queue bindings were successfully transitioned during the upgrade.
+ */
+ public void testBindingAndMessageDurabability() throws Exception
+ {
+ // Create a connection and start it
+ TopicConnection connection = (TopicConnection) getConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageProducer messageProducer = session.createProducer(queue);
+
+ // Send a new message
+ sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1);
+
+ session.close();
+
+ // Restart the broker
+ restartBroker();
+
+ // Drain the queue of all messages
+ connection = (TopicConnection) getConnection();
+ connection.start();
+ consumeQueueMessages(connection, true);
+ }
+
+ /**
+ * Test that all of the committed persistent messages previously sent to
+ * the broker are properly received following update of the MetaData and
+ * Content entries during the store upgrade process.
+ */
+ public void testConsumptionOfUpgradedMessages() throws Exception
+ {
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ consumeDurableSubscriptionMessages(connection, true);
+ consumeDurableSubscriptionMessages(connection, false);
+ consumeQueueMessages(connection, false);
+ }
+
+ /**
+ * Tests store migration containing messages for non-existing queue.
+ *
+ * @throws Exception
+ */
+ public void testMigrationOfMessagesForNonDurableQueues() throws Exception
+ {
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ // consume a message for non-existing store
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ for (int i = 1; i <= 3; i++)
+ {
+ Message message = messageConsumer.receive(1000);
+ assertNotNull("Message was not migrated!", message);
+ assertTrue("Unexpected message received!", message instanceof TextMessage);
+ assertEquals("ID property did not match", i, message.getIntProperty("ID"));
+ }
+ }
+
+ /**
+ * Tests store upgrade has maintained the priority queue configuration,
+ * such that sending messages with priorities out-of-order and then consuming
+ * them gets the messages back in priority order.
+ */
+ public void testPriorityQueue() throws Exception
+ {
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ // send some messages to the priority queue
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(PRIORITY_QUEUE_NAME);
+ MessageProducer producer = session.createProducer(queue);
+
+ producer.setPriority(4);
+ producer.send(createMessage(1, false, session, producer));
+ producer.setPriority(1);
+ producer.send(createMessage(2, false, session, producer));
+ producer.setPriority(9);
+ producer.send(createMessage(3, false, session, producer));
+ session.close();
+
+ //consume the messages, expected order: msg 3, msg 1, msg 2.
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Message msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(3, msg.getIntProperty("msg"));
+ msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(1, msg.getIntProperty("msg"));
+ msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(2, msg.getIntProperty("msg"));
+ }
+
+ /**
+ * Test that the queue configured to have a DLQ was recovered and has the alternate exchange
+ * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the
+ * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ.
+ *
+ * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments
+ * that turned it on for this specific queue.
+ */
+ public void testRecoveryOfQueueWithDLQ() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ //verify the DLE exchange exists, has the expected type, and a single binding for the DLQ
+ ManagedExchange exchange = jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE");
+ assertEquals("Wrong exchange type", "fanout", exchange.getExchangeType());
+ TabularDataSupport bindings = (TabularDataSupport) exchange.bindings();
+ assertEquals(1, bindings.size());
+ for(Object o : bindings.values())
+ {
+ CompositeData binding = (CompositeData) o;
+
+ String bindingKey = (String) binding.get(ManagedExchange.BINDING_KEY);
+ String[] queueNames = (String[]) binding.get(ManagedExchange.QUEUE_NAMES);
+
+ //Because its a fanout exchange, we just return a single '*' key with all bound queues
+ assertEquals("unexpected binding key", "*", bindingKey);
+ assertEquals("unexpected number of queues bound", 1, queueNames.length);
+ assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueNames[0]);
+ }
+
+ //verify the queue exists, has the expected alternate exchange and max delivery count
+ ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME);
+ assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange());
+ assertEquals("Unexpected maximum delivery count", Integer.valueOf(2), queue.getMaximumDeliveryCount());
+
+ ManagedQueue dlQqueue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ");
+ assertNull("Queue should not have an AlternateExchange", dlQqueue.getAlternateExchange());
+ assertEquals("Unexpected maximum delivery count", Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount());
+
+ String dlqDlqObjectNameString = jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" + "_DLQ");
+ assertFalse("a DLQ should not exist for the DLQ itself", jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString));
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+ }
+
+ private void consumeDurableSubscriptionMessages(Connection connection, boolean selector) throws Exception
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = null;
+ TopicSubscriber durSub = null;
+
+ if(selector)
+ {
+ topic = session.createTopic(SELECTOR_TOPIC_NAME);
+ durSub = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false);
+ }
+ else
+ {
+ topic = session.createTopic(TOPIC_NAME);
+ durSub = session.createDurableSubscriber(topic, SUB_NAME);
+ }
+
+
+ // Retrieve the matching message
+ Message m = durSub.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ if(selector)
+ {
+ assertEquals("Selector property did not match", "true", m.getStringProperty("testprop"));
+ }
+ assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", generateString(1024) , ((TextMessage)m).getText());
+
+ // Verify that no more messages are received
+ m = durSub.receive(1000);
+ assertNull("No more messages should have been recieved", m);
+
+ durSub.close();
+ session.close();
+ }
+
+ private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message m;
+
+ // Retrieve the initial pre-upgrade messages
+ for (int i=1; i <= 5 ; i++)
+ {
+ m = consumer.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("ID property did not match", i, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
+ }
+ for (int i=1; i <= 5 ; i++)
+ {
+ m = consumer.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("ID property did not match", i, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText());
+ }
+
+ if(extraMessage)
+ {
+ //verify that the extra message is received
+ m = consumer.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
+ }
+
+ // Verify that no more messages are received
+ m = consumer.receive(1000);
+ assertNull("No more messages should have been recieved", m);
+
+ consumer.close();
+ session.close();
+ }
+
+ private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msgId);
+ send.setIntProperty("msg", msgId);
+
+ return send;
+ }
+
+ /**
+ * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
+ *
+ * @param length number of characters in the string
+ * @return string sequence of the given length
+ */
+ private static String generateString(int length)
+ {
+ char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'};
+ char[] chars = new char[length];
+ for (int i = 0; i < (length); i++)
+ {
+ chars[i] = base_chars[i % 10];
+ }
+ return new String(chars);
+ }
+
+ private static void sendMessages(Session session, MessageProducer messageProducer,
+ Destination dest, int deliveryMode, int length, int numMesages) throws JMSException
+ {
+ for (int i = 1; i <= numMesages; i++)
+ {
+ Message message = session.createTextMessage(generateString(length));
+ message.setIntProperty("ID", i);
+ messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ }
+
+ private static void publishMessages(Session session, TopicPublisher publisher,
+ Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException
+ {
+ for (int i = 1; i <= numMesages; i++)
+ {
+ Message message = session.createTextMessage(generateString(length));
+ message.setIntProperty("ID", i);
+ message.setStringProperty("testprop", selectorProperty);
+ publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
new file mode 100644
index 0000000000..0464269efc
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestUtils;
+
+/**
+ * The HA black box tests test the BDB cluster as a opaque unit. Client connects to
+ * the cluster via a failover url
+ *
+ * @see HAClusterWhiteboxTest
+ */
+public class HAClusterBlackboxTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(HAClusterBlackboxTest.class);
+
+ private static final String VIRTUAL_HOST = "test";
+ private static final int NUMBER_OF_NODES = 3;
+
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+
+ private FailoverAwaitingListener _failoverAwaitingListener;
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+
+ setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties");
+
+ _clusterCreator.configureClusterNodes();
+
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+
+ _clusterCreator.startCluster();
+ _failoverAwaitingListener = new FailoverAwaitingListener();
+
+ super.setUp();
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testLossOfMasterNodeCausesClientToFailover() throws Exception
+ {
+ final Connection connection = getConnection(_brokerFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+
+ final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ _clusterCreator.stopNode(activeBrokerPort);
+ LOGGER.info("Node is stopped");
+ _failoverAwaitingListener.assertFailoverOccurs(20000);
+ LOGGER.info("Listener has finished");
+ // any op to ensure connection remains
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception
+ {
+ LOGGER.info("Connecting to " + _brokerFailoverUrl);
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ LOGGER.info("Got connection to cluster");
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+ final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+ final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection);
+
+ LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort);
+
+ _clusterCreator.stopNode(inactiveBrokerPort);
+
+ _failoverAwaitingListener.assertFailoverDoesNotOccur(2000);
+
+ // any op to ensure connection remains
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private final class FailoverAwaitingListener implements ConnectionListener
+ {
+ private final CountDownLatch _failoverLatch = new CountDownLatch(1);
+
+ @Override
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public void assertFailoverOccurs(long delay) throws InterruptedException
+ {
+ if (!_failoverLatch.await(delay, TimeUnit.MILLISECONDS))
+ {
+ LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n");
+ }
+ assertEquals("Failover did not occur", 0, _failoverLatch.getCount());
+ }
+
+ public void assertFailoverDoesNotOccur(long delay) throws InterruptedException
+ {
+ _failoverLatch.await(delay, TimeUnit.MILLISECONDS);
+ assertEquals("Failover occurred unexpectedly", 1L, _failoverLatch.getCount());
+ }
+
+
+ @Override
+ public void failoverComplete()
+ {
+ _failoverLatch.countDown();
+ }
+
+ @Override
+ public void bytesSent(long count)
+ {
+ }
+
+ @Override
+ public void bytesReceived(long count)
+ {
+ }
+ }
+
+}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
new file mode 100644
index 0000000000..0e25c4e17a
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import com.sleepycat.je.EnvironmentFailureException;
+
+/**
+ * System test verifying the ability to control a cluster via the Management API.
+ *
+ * @see HAClusterBlackboxTest
+ */
+public class HAClusterManagementTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class);
+
+ private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));;
+ private static final String VIRTUAL_HOST = "test";
+
+ private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST);
+ private static final int NUMBER_OF_NODES = 4;
+
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+ private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);
+
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ _clusterCreator.configureClusterNodes();
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+ _clusterCreator.startCluster();
+
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testReadonlyMBeanAttributes() throws Exception
+ {
+ final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
+ final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber);
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
+ assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName());
+ assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName());
+ assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort());
+ assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort());
+ // As we have chosen an arbitrary broker from the cluster, we cannot predict its state
+ assertNotNull("Store state must not be null", storeBean.getNodeState());
+ }
+
+ public void testStateOfActiveBrokerIsMaster() throws Exception
+ {
+ final Connection activeConnection = getConnection(_brokerFailoverUrl);
+ final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection);
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber);
+ assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState());
+ }
+
+ public void testStateOfNonActiveBrokerIsNotMaster() throws Exception
+ {
+ final Connection activeConnection = getConnection(_brokerFailoverUrl);
+ final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection);
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber);
+ final String nodeState = storeBean.getNodeState();
+ assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState));
+ }
+
+ public void testGroupMembers() throws Exception
+ {
+ final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
+ awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES);
+
+ final TabularData groupMembers = storeBean.getAllNodesInGroup();
+ assertNotNull(groupMembers);
+
+ for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers())
+ {
+ final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber);
+ final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber);
+
+ CompositeData row = groupMembers.get(new Object[] {nodeName});
+ assertNotNull("Table does not contain row for node name " + nodeName, row);
+ assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ }
+ }
+
+ public void testRemoveNodeFromGroup() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next();
+ final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next();
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation);
+ awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES);
+
+ final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved));
+ _clusterCreator.stopNode(brokerPortNumberToBeRemoved);
+ storeBean.removeNodeFromGroup(removedNodeName);
+
+ final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size();
+ assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval);
+ }
+
+ /**
+ * Updates the address of a node.
+ *
+ * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit
+ * assert.
+ *
+ * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case
+ */
+ public void testUpdateAddress() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next();
+ final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate);
+
+ _clusterCreator.stopNode(brokerPortNumberToBeMoved);
+
+ final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
+ final int newBdbPort = getNextAvailable(oldBdbPort + 1);
+
+ storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort);
+
+ _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
+
+ _clusterCreator.startNode(brokerPortNumberToBeMoved);
+ }
+
+ /**
+ * @see #testUpdateAddress()
+ */
+ public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
+
+ _clusterCreator.stopNode(brokerPortNumberToBeMoved);
+
+ final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
+ final int newBdbPort = getNextAvailable(oldBdbPort + 1);
+
+ // now deliberately don't call updateAddress
+
+ _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
+
+ try
+ {
+ _clusterCreator.startNode(brokerPortNumberToBeMoved);
+ fail("Exception not thrown");
+ }
+ catch(RuntimeException rte)
+ {
+ //check cause was BDBs EnvironmentFailureException
+ assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName()));
+ // PASS
+ }
+ }
+
+ public void testVirtualHostOperationsDeniedForNonMasterNode() throws Exception
+ {
+ final Connection activeConnection = getConnection(_brokerFailoverUrl);
+ final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection);
+
+ ManagedBroker inactiveBroker = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPortNumber);
+
+ try
+ {
+ inactiveBroker.createNewQueue(getTestQueueName(), null, true);
+ fail("Exception not thrown");
+ }
+ catch (Exception e)
+ {
+ String message = e.getMessage();
+ assertEquals("The virtual hosts state of PASSIVE does not permit this operation.", message);
+ }
+
+ try
+ {
+ inactiveBroker.createNewExchange(getName(), "direct", true);
+ fail("Exception not thrown");
+ }
+ catch (Exception e)
+ {
+ String message = e.getMessage();
+ assertEquals("The virtual hosts state of PASSIVE does not permit this operation.", message);
+ }
+ }
+
+ private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception
+ {
+ _jmxUtils.open(brokerPortNumber);
+
+ return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
+ }
+
+ private ManagedBroker getManagedBrokerBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception
+ {
+ _jmxUtils.open(brokerPortNumber);
+
+ return _jmxUtils.getManagedBroker(VIRTUAL_HOST);
+ }
+
+ private void awaitAllNodesJoiningGroup(ManagedBDBHAMessageStore storeBean, int expectedNumberOfNodes) throws Exception
+ {
+ long totalTimeWaited = 0l;
+ long waitInterval = 100l;
+ long maxWaitTime = 10000;
+
+ int currentNumberOfNodes = storeBean.getAllNodesInGroup().size();
+ while (expectedNumberOfNodes > currentNumberOfNodes || totalTimeWaited > maxWaitTime)
+ {
+ LOGGER.debug("Still awaiting nodes to join group; expecting "
+ + expectedNumberOfNodes + " node(s) but only have " + currentNumberOfNodes
+ + " after " + totalTimeWaited + " ms.");
+
+ totalTimeWaited += waitInterval;
+ Thread.sleep(waitInterval);
+
+ currentNumberOfNodes = storeBean.getAllNodesInGroup().size();
+ }
+
+ assertEquals("Unexpected number of nodes in group after " + totalTimeWaited + " ms",
+ expectedNumberOfNodes ,currentNumberOfNodes);
+ }
+}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
new file mode 100644
index 0000000000..95626f7fa5
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class HAClusterTwoNodeTest extends QpidBrokerTestCase
+{
+ private static final long RECEIVE_TIMEOUT = 5000l;
+
+ private static final String VIRTUAL_HOST = "test";
+
+ private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST);
+ private static final int NUMBER_OF_NODES = 2;
+
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+ private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);
+
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ private void startCluster(boolean designedPrimary) throws Exception
+ {
+ setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties");
+
+ String storeConfigKeyPrefix = _clusterCreator.getStoreConfigKeyPrefix();
+
+ setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT);
+ setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", "2 s");
+
+ setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
+ setVirtualHostConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0");
+
+ _clusterCreator.configureClusterNodes();
+ _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary);
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+ _clusterCreator.startCluster();
+ }
+
+ public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception
+ {
+ startCluster(true);
+ final Connection initialConnection = getConnection(_brokerFailoverUrl);
+ int masterPort = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection);
+ assertProducingConsuming(initialConnection);
+ initialConnection.close();
+ _clusterCreator.stopCluster();
+ _clusterCreator.startNode(masterPort);
+ final Connection secondConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(secondConnection);
+ secondConnection.close();
+ }
+
+ public void testClusterRestartWithoutDesignatedPrimary() throws Exception
+ {
+ startCluster(false);
+ final Connection initialConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(initialConnection);
+ initialConnection.close();
+ _clusterCreator.stopCluster();
+ _clusterCreator.startClusterParallel();
+ final Connection secondConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(secondConnection);
+ secondConnection.close();
+ }
+
+ public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception
+ {
+ startCluster(true);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to primary", connection);
+ assertProducingConsuming(connection);
+ }
+
+ public void testPersistentOperationsFailOnNonDesignatedPrimarysAfterSecondaryStopped() throws Exception
+ {
+ startCluster(false);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to primary", connection);
+ try
+ {
+ assertProducingConsuming(connection);
+ fail("JMS peristent operations succeded on Master 'not designated primary' buy they should fail as replica is not available");
+ }
+ catch(JMSException e)
+ {
+ // JMSException should be thrown on transaction start/commit
+ }
+ }
+
+ public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception
+ {
+ startCluster(true);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary());
+
+ try
+ {
+ getConnection(_brokerFailoverUrl);
+ fail("Connection not expected");
+ }
+ catch (JMSException e)
+ {
+ // PASS
+ }
+ }
+
+ public void testInitialDesignatedPrimaryStateOfNodes() throws Exception
+ {
+ startCluster(true);
+ final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary());
+ assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary());
+
+ final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+ assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary());
+ }
+
+ public void testSecondaryDesignatedAsPrimaryAfterOrginalPrimaryStopped() throws Exception
+ {
+ startCluster(true);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary());
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+
+ assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary());
+ storeBean.setDesignatedPrimary(true);
+ assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary());
+
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to new primary", connection);
+ assertProducingConsuming(connection);
+ }
+
+ private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(
+ final int activeBrokerPortNumber) throws Exception
+ {
+ _jmxUtils.open(activeBrokerPortNumber);
+
+ ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
+ return storeBean;
+ }
+
+ private void assertProducingConsuming(final Connection connection) throws JMSException, Exception
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendMessage(session, destination, 1);
+ connection.start();
+ Message m1 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 1 is not received", m1);
+ assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
+ session.commit();
+ }
+
+}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
new file mode 100644
index 0000000000..408643b98a
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * The HA white box tests test the BDB cluster where the test retains the knowledge of the
+ * individual test nodes. It uses this knowledge to examine the nodes to ensure that they
+ * remain in the correct state throughout the test.
+ *
+ * @see HAClusterBlackboxTest
+ */
+public class HAClusterWhiteboxTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(HAClusterWhiteboxTest.class);
+
+ private static final String VIRTUAL_HOST = "test";
+
+ private final int NUMBER_OF_NODES = 3;
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+
+ setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties");
+
+ _clusterCreator.configureClusterNodes();
+ _clusterCreator.startCluster();
+
+ super.setUp();
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testClusterPermitsConnectionToOnlyOneNode() throws Exception
+ {
+ int connectionSuccesses = 0;
+ int connectionFails = 0;
+
+ for (int brokerPortNumber : getBrokerPortNumbers())
+ {
+ try
+ {
+ getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber));
+ connectionSuccesses++;
+ }
+ catch(JMSException e)
+ {
+ assertTrue(e.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active"));
+ connectionFails++;
+ }
+ }
+
+ assertEquals("Unexpected number of failed connections", NUMBER_OF_NODES - 1, connectionFails);
+ assertEquals("Unexpected number of successful connections", 1, connectionSuccesses);
+ }
+
+ public void testClusterThatLosesNodeStillAllowsConnection() throws Exception
+ {
+ final Connection initialConnection = getConnectionToNodeInCluster();
+ assertNotNull(initialConnection);
+
+ closeConnectionAndKillBroker(initialConnection);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+ assertNotNull(subsequentConnection);
+
+ // verify that JMS persistence operations are working
+ assertProducingConsuming(subsequentConnection);
+
+ closeConnection(initialConnection);
+ }
+
+ public void testClusterThatLosesAllButOneNodeRefusesConnection() throws Exception
+ {
+ final Connection initialConnection = getConnectionToNodeInCluster();
+ assertNotNull(initialConnection);
+
+ closeConnectionAndKillBroker(initialConnection);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+ assertNotNull(subsequentConnection);
+ final int subsequentPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(subsequentConnection);
+
+ killBroker(subsequentPortNumber);
+
+ final Connection finalConnection = getConnectionToNodeInCluster();
+ assertNull(finalConnection);
+
+ closeConnection(initialConnection);
+ }
+
+ public void testClusterWithRestartedNodeStillAllowsConnection() throws Exception
+ {
+ final Connection connection = getConnectionToNodeInCluster();
+ assertNotNull(connection);
+
+ final int brokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ connection.close();
+
+ _clusterCreator.stopNode(brokerPortNumber);
+ _clusterCreator.startNode(brokerPortNumber);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+ assertNotNull(subsequentConnection);
+ }
+
+ public void testClusterLosingNodeRetainsData() throws Exception
+ {
+ final Connection initialConnection = getConnectionToNodeInCluster();
+
+ final String queueNamePrefix = getTestQueueName();
+ final String inbuiltExchangeQueueUrl = "direct://amq.direct/" + queueNamePrefix + "1/" + queueNamePrefix + "1?durable='true'";
+ final String customExchangeQueueUrl = "direct://my.exchange/" + queueNamePrefix + "2/" + queueNamePrefix + "2?durable='true'";
+
+ populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl);
+
+ closeConnectionAndKillBroker(initialConnection);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+
+ assertNotNull("no valid connection obtained", subsequentConnection);
+
+ checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl);
+ }
+
+ public void xtestRecoveryOfOutOfDateNode() throws Exception
+ {
+ /*
+ * TODO: Implement
+ *
+ * Cant yet find a way to control cleaning in a deterministic way to allow provoking
+ * a node to become out of date. We do now know that even a new joiner to the group
+ * can throw the InsufficientLogException, so ensuring an existing cluster of nodes has
+ * done *any* cleaning and then adding a new node should be sufficient to cause this.
+ */
+ }
+
+ private void populateBrokerWithData(final Connection connection, final String... queueUrls) throws JMSException, Exception
+ {
+ populateBrokerWithData(connection, 1, queueUrls);
+ }
+
+ private void populateBrokerWithData(final Connection connection, int noOfMessages, final String... queueUrls) throws JMSException, Exception
+ {
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ for (final String queueUrl : queueUrls)
+ {
+ final Queue queue = session.createQueue(queueUrl);
+ session.createConsumer(queue).close();
+ sendMessage(session, queue, noOfMessages);
+ }
+ }
+
+ private void checkBrokerData(final Connection connection, final String... queueUrls) throws JMSException
+ {
+ connection.start();
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ for (final String queueUrl : queueUrls)
+ {
+ final Queue queue = session.createQueue(queueUrl);
+ final MessageConsumer consumer = session.createConsumer(queue);
+ final Message message = consumer.receive(1000);
+ session.commit();
+ assertNotNull("Queue " + queue + " should have message", message);
+ assertEquals("Queue " + queue + " message has unexpected content", 0, message.getIntProperty(INDEX));
+ }
+ }
+
+ private Connection getConnectionToNodeInCluster() throws URLSyntaxException
+ {
+ Connection connection = null;
+ Set<Integer> runningBrokerPorts = getBrokerPortNumbers();
+
+ for (int brokerPortNumber : runningBrokerPorts)
+ {
+ try
+ {
+ connection = getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber));
+ break;
+ }
+ catch(JMSException je)
+ {
+ assertTrue(je.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active"));
+ }
+ }
+ return connection;
+ }
+
+ private void closeConnectionAndKillBroker(final Connection initialConnection) throws Exception
+ {
+ final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection);
+ initialConnection.close();
+
+ killBroker(initialPortNumber); // kill awaits the death of the child
+ }
+
+ private void assertProducingConsuming(final Connection connection) throws JMSException, Exception
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendMessage(session, destination, 2);
+ connection.start();
+ Message m1 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 1 is not received", m1);
+ assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
+ Message m2 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 2 is not received", m2);
+ assertEquals("Unexpected second message received", 1, m2.getIntProperty(INDEX));
+ session.commit();
+ }
+
+ private void closeConnection(final Connection initialConnection)
+ {
+ try
+ {
+ initialConnection.close();
+ }
+ catch(Exception e)
+ {
+ // ignore.
+ // java.net.SocketException is seen sometimes on active connection
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
new file mode 100644
index 0000000000..353c3a0ec5
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+public class HATestClusterCreator
+{
+ protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class);
+
+ private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
+ private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
+
+ private static final int FAILOVER_CYCLECOUNT = 10;
+ private static final int FAILOVER_RETRIES = 1;
+ private static final int FAILOVER_CONNECTDELAY = 1000;
+
+ private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
+ private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'";
+
+ private static final int RETRIES = 60;
+ private static final int CONNECTDELAY = 75;
+
+ private final QpidBrokerTestCase _testcase;
+ private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>();
+ private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>();
+ private final String _virtualHostName;
+ private final String _vhostStoreConfigKeyPrefix;
+
+ private final String _ipAddressOfBroker;
+ private final String _groupName ;
+ private final int _numberOfNodes;
+ private int _bdbHelperPort;
+ private int _primaryBrokerPort;
+ private String _vhostConfigKeyPrefix;
+
+ public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes)
+ {
+ _testcase = testcase;
+ _virtualHostName = virtualHostName;
+ _groupName = "group" + _testcase.getName();
+ _ipAddressOfBroker = getIpAddressOfBrokerHost();
+ _numberOfNodes = numberOfNodes;
+ _vhostConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".";
+ _vhostStoreConfigKeyPrefix = _vhostConfigKeyPrefix + "store.";
+ _bdbHelperPort = 0;
+ }
+
+ public void configureClusterNodes() throws Exception
+ {
+ int brokerPort = _testcase.findFreePort();
+
+ for (int i = 0; i < _numberOfNodes; i++)
+ {
+ int bdbPort = _testcase.getNextAvailable(brokerPort + 1);
+ _brokerPortToBdbPortMap.put(brokerPort, bdbPort);
+
+ LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort);
+ if (_bdbHelperPort == 0)
+ {
+ _bdbHelperPort = bdbPort;
+ }
+
+ configureClusterNode(brokerPort, bdbPort);
+ TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort);
+ brokerConfiguration.addJmxManagementConfiguration();
+ collectConfig(brokerPort, brokerConfiguration, _testcase.getTestVirtualhosts());
+
+ brokerPort = _testcase.getNextAvailable(bdbPort + 1);
+ }
+ }
+
+ public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception
+ {
+ if (_numberOfNodes != 2)
+ {
+ throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
+ }
+
+ final Entry<Integer, BrokerConfigHolder> brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next();
+ final String configKey = getConfigKey("highAvailability.designatedPrimary");
+ brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(designatedPrimary));
+ _primaryBrokerPort = brokerConfigEntry.getKey();
+ }
+
+ /**
+ * @param configKeySuffix "highAvailability.designatedPrimary", for example
+ * @return "virtualhost.test.store.highAvailability.designatedPrimary", for example
+ */
+ private String getConfigKey(String configKeySuffix)
+ {
+ final String configKey = StringUtils.substringAfter(_vhostStoreConfigKeyPrefix + configKeySuffix, "virtualhosts.");
+ return configKey;
+ }
+
+ public void startNode(final int brokerPortNumber) throws Exception
+ {
+ final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
+
+ _testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts());
+
+ _testcase.startBroker(brokerPortNumber);
+ }
+
+ public void startCluster() throws Exception
+ {
+ for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ {
+ startNode(brokerPortNumber);
+ }
+ }
+
+ public void startClusterParallel() throws Exception
+ {
+ final ExecutorService executor = Executors.newFixedThreadPool(_brokerConfigurations.size());
+ try
+ {
+ List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>();
+ for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ {
+ final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
+ Future<Object> future = executor.submit(new Callable<Object>()
+ {
+ public Object call()
+ {
+ try
+ {
+ _testcase.startBroker(brokerPortNumber, brokerConfigHolder.getTestConfiguration(),
+ brokerConfigHolder.getTestVirtualhosts());
+ return "OK";
+ }
+ catch (Exception e)
+ {
+ return e;
+ }
+ }
+ });
+ brokers.add(future);
+ }
+ for (Future<Object> future : brokers)
+ {
+ Object result = future.get(30, TimeUnit.SECONDS);
+ LOGGER.debug("Node startup result:" + result);
+ if (result instanceof Exception)
+ {
+ throw (Exception) result;
+ }
+ else if (!"OK".equals(result))
+ {
+ throw new Exception("One of the cluster nodes is not started");
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ stopCluster();
+ throw e;
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+
+ }
+
+ public void stopNode(final int brokerPortNumber)
+ {
+ _testcase.killBroker(brokerPortNumber);
+ }
+
+ public void stopCluster() throws Exception
+ {
+ for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ {
+ try
+ {
+ stopNode(brokerPortNumber);
+ }
+ catch(Exception e)
+ {
+ LOGGER.warn("Failed to stop node on port:" + brokerPortNumber);
+ }
+ }
+ }
+
+ public int getBrokerPortNumberFromConnection(Connection connection)
+ {
+ final AMQConnection amqConnection = (AMQConnection)connection;
+ return amqConnection.getActiveBrokerDetails().getPort();
+ }
+
+ public int getPortNumberOfAnInactiveBroker(final Connection activeConnection)
+ {
+ final Set<Integer> allBrokerPorts = _testcase.getBrokerPortNumbers();
+ LOGGER.debug("Broker ports:" + allBrokerPorts);
+ final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection);
+ allBrokerPorts.remove(activeBrokerPort);
+ LOGGER.debug("Broker ports:" + allBrokerPorts);
+ final int inactiveBrokerPort = allBrokerPorts.iterator().next();
+ return inactiveBrokerPort;
+ }
+
+ public int getBdbPortForBrokerPort(final int brokerPortNumber)
+ {
+ return _brokerPortToBdbPortMap.get(brokerPortNumber);
+ }
+
+ public Set<Integer> getBdbPortNumbers()
+ {
+ return new HashSet<Integer>(_brokerPortToBdbPortMap.values());
+ }
+
+ public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception
+ {
+ final StringBuilder brokerList = new StringBuilder();
+
+ for(Iterator<Integer> itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); )
+ {
+ int brokerPortNumber = itr.next();
+
+ brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES));
+ if (itr.hasNext())
+ {
+ brokerList.append(";");
+ }
+ }
+
+ return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT));
+ }
+
+ public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException
+ {
+ return getConnectionUrlForSingleNode(brokerPortNumber, false);
+ }
+
+ public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException
+ {
+ return getConnectionUrlForSingleNode(brokerPortNumber, true);
+ }
+
+ private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException
+ {
+ final String url;
+ if (retryAllowed)
+ {
+ url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES);
+ }
+ else
+ {
+ url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber);
+ }
+
+ return new AMQConnectionURL(url);
+ }
+
+ public String getGroupName()
+ {
+ return _groupName;
+ }
+
+ public String getNodeNameForNodeAt(final int bdbPort)
+ {
+ return "node" + _testcase.getName() + bdbPort;
+ }
+
+ public String getNodeHostPortForNodeAt(final int bdbPort)
+ {
+ return _ipAddressOfBroker + ":" + bdbPort;
+ }
+
+ public String getHelperHostPort()
+ {
+ if (_bdbHelperPort == 0)
+ {
+ throw new IllegalStateException("Helper port not yet assigned.");
+ }
+
+ return _ipAddressOfBroker + ":" + _bdbHelperPort;
+ }
+
+ public void setHelperHostPort(int bdbHelperPort)
+ {
+ _bdbHelperPort = bdbHelperPort;
+ }
+
+ public int getBrokerPortNumberOfPrimary()
+ {
+ if (_numberOfNodes != 2)
+ {
+ throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
+ }
+
+ return _primaryBrokerPort;
+ }
+
+ public int getBrokerPortNumberOfSecondaryNode()
+ {
+ final Set<Integer> portNumbers = getBrokerPortNumbersForNodes();
+ portNumbers.remove(getBrokerPortNumberOfPrimary());
+ return portNumbers.iterator().next();
+ }
+
+ public Set<Integer> getBrokerPortNumbersForNodes()
+ {
+ return new HashSet<Integer>(_brokerConfigurations.keySet());
+ }
+
+ private void configureClusterNode(final int brokerPort, final int bdbPort) throws Exception
+ {
+ final String nodeName = getNodeNameForNodeAt(bdbPort);
+
+
+ _testcase.setVirtualHostConfigurationProperty(_vhostConfigKeyPrefix + "type", BDBHAVirtualHostFactory.TYPE);
+ _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
+
+ _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.groupName", _groupName);
+ _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeName", nodeName);
+ _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
+ _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort());
+ }
+
+ public String getIpAddressOfBrokerHost()
+ {
+ String brokerHost = _testcase.getBroker().getHost();
+ try
+ {
+ return InetAddress.getByName(brokerHost).getHostAddress();
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e);
+ }
+ }
+
+ private void collectConfig(final int brokerPortNumber, TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
+ {
+ _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder(testConfiguration,
+ (XMLConfiguration) testVirtualhosts.clone()));
+ }
+
+ public class BrokerConfigHolder
+ {
+ private final TestBrokerConfiguration _testConfiguration;
+ private final XMLConfiguration _testVirtualhosts;
+
+ public BrokerConfigHolder(TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
+ {
+ _testConfiguration = testConfiguration;
+ _testVirtualhosts = testVirtualhosts;
+ }
+
+ public TestBrokerConfiguration getTestConfiguration()
+ {
+ return _testConfiguration;
+ }
+
+ public XMLConfiguration getTestVirtualhosts()
+ {
+ return _testVirtualhosts;
+ }
+ }
+
+ public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort)
+ {
+ final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumberToBeMoved);
+ final XMLConfiguration virtualHostConfig = brokerConfigHolder.getTestVirtualhosts();
+
+ final String configKey = getConfigKey("highAvailability.nodeHostPort");
+ final String oldBdbHostPort = virtualHostConfig.getString(configKey);
+
+ final String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":");
+ final String oldHost = oldHostAndPort[0];
+
+ final String newBdbHostPort = oldHost + ":" + newBdbPort;
+
+ virtualHostConfig.setProperty(configKey, newBdbHostPort);
+ collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig);
+ }
+
+ public String getStoreConfigKeyPrefix()
+ {
+ return _vhostStoreConfigKeyPrefix;
+ }
+
+
+}