diff options
| author | Keith Wall <kwall@apache.org> | 2012-03-27 11:04:02 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-03-27 11:04:02 +0000 |
| commit | 543aefb12560de0fb374c3a25fe3dc0b809a221e (patch) | |
| tree | a2ed89ff318dc598c4070a6221c03154c569a416 /qpid/java/bdbstore/src/test | |
| parent | 30f5a131dfd3b9d628f3b9cb2dc91ada79993b60 (diff) | |
| download | qpid-python-543aefb12560de0fb374c3a25fe3dc0b809a221e.tar.gz | |
QPID-3913: Add functionality to upgrade bdbstore automatically on broker start-up. Store message content using single chunk. Change store version to 6. Remove implementations of tuple bindings for previous versions.
Applied patch from Phil Harvey<phil@philharveyonline.com> Oleksandr Rudyy<orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1305809 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/test')
12 files changed, 1010 insertions, 354 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 3d30f02b42..74fba168a9 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -58,9 +58,11 @@ import java.util.List; */ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest { + private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + /** - * Tests that message metadata and content are successfully read back from a - * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to + * Tests that message metadata and content are successfully read back from a + * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to * verify their ability to co-exist within the store and be successful retrieved. */ public void testBDBMessagePersistence() throws Exception @@ -73,10 +75,10 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. // Use a single chunk for the 0-10 message as per broker behaviour. String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf"; - + ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes()); ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes()); - + ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes()); int bodySize = completeContentBody_0_10.limit(); @@ -100,12 +102,12 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto /* * Create and insert a 0-10 message (metadata and content) - */ + */ MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize); DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10(); Header header_0_10 = new Header(delProps_0_10, msgProps_0_10); - MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, + MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); @@ -190,14 +192,14 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto private DeliveryProperties createDeliveryProperties_0_10() { DeliveryProperties delProps_0_10 = new DeliveryProperties(); - + delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT); delProps_0_10.setImmediate(true); delProps_0_10.setExchange("exchange12345"); delProps_0_10.setRoutingKey("routingKey12345"); delProps_0_10.setExpiration(5); delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE); - + return delProps_0_10; } @@ -207,14 +209,14 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto msgProps_0_10.setContentLength(bodySize); msgProps_0_10.setCorrelationId("qwerty".getBytes()); msgProps_0_10.setContentType("text/html"); - + return msgProps_0_10; } - /** + /** * Close the provided store and create a new (read-only) store to read back the data. - * - * Use this method instead of reloading the virtual host like other tests in order + * + * Use this method instead of reloading the virtual host like other tests in order * to avoid the recovery handler deleting the message for not being on a queue. */ private BDBMessageStore reloadStoreReadOnly(BDBMessageStore messageStore) throws Exception @@ -275,7 +277,61 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto props.getHeaders().setString("Test", "MST"); return props; } - + + public void testGetContentWithOffset() throws Exception + { + MessageStore store = getVirtualHost().getMessageStore(); + BDBMessageStore bdbStore = assertBDBStore(store); + StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + + // normal case: offset is 0 + ByteBuffer dst = ByteBuffer.allocate(10); + int length = bdbStore.getContent(messageid_0_8, 0, dst); + assertEquals("Unexpected length", CONTENT_BYTES.length, length); + byte[] array = dst.array(); + assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array)); + + // offset is in the middle + dst = ByteBuffer.allocate(10); + length = bdbStore.getContent(messageid_0_8, 5, dst); + assertEquals("Unexpected length", 5, length); + array = dst.array(); + byte[] expected = new byte[10]; + System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5); + assertTrue("Unexpected content", Arrays.equals(expected, array)); + + // offset beyond the content length + dst = ByteBuffer.allocate(10); + try + { + bdbStore.getContent(messageid_0_8, 15, dst); + fail("Should fail for the offset greater than message size"); + } + catch (RuntimeException e) + { + assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id " + + messageid_0_8 + "!", e.getMessage()); + } + + // buffer is smaller then message size + dst = ByteBuffer.allocate(5); + length = bdbStore.getContent(messageid_0_8, 0, dst); + assertEquals("Unexpected length", 5, length); + array = dst.array(); + expected = new byte[5]; + System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5); + assertTrue("Unexpected content", Arrays.equals(expected, array)); + + // buffer is smaller then message size, offset is not 0 + dst = ByteBuffer.allocate(5); + length = bdbStore.getContent(messageid_0_8, 2, dst); + assertEquals("Unexpected length", 5, length); + array = dst.array(); + expected = new byte[5]; + System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5); + assertTrue("Unexpected content", Arrays.equals(expected, array)); + } /** * Tests that messages which are added to the store and then removed using the * public MessageStore interfaces are actually removed from the store by then @@ -287,11 +343,10 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto MessageStore store = getVirtualHost().getMessageStore(); BDBMessageStore bdbStore = assertBDBStore(store); - StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreMultiChunkMessage_0_8(store); + StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - //remove the message in the fashion the broker normally would - storedMessage_0_8.remove(); + + bdbStore.removeMessage(messageid_0_8, true); //verify the removal using the BDB store implementation methods directly try @@ -308,10 +363,10 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto //expecting no content, allocate a 1 byte ByteBuffer dst = ByteBuffer.allocate(1); - assertEquals("Retrieved content when none was expected", + assertEquals("Retrieved content when none was expected", 0, bdbStore.getContent(messageid_0_8, 0, dst)); } - + private BDBMessageStore assertBDBStore(Object store) { if(!(store instanceof BDBMessageStore)) @@ -322,15 +377,11 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto return (BDBMessageStore) store; } - private StoredMessage<MessageMetaData> createAndStoreMultiChunkMessage_0_8(MessageStore store) + private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store) { - byte[] body10Bytes = "0123456789".getBytes(); - byte[] body5Bytes = "01234".getBytes(); - - ByteBuffer chunk1 = ByteBuffer.wrap(body10Bytes); - ByteBuffer chunk2 = ByteBuffer.wrap(body5Bytes); + ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES); - int bodySize = body10Bytes.length + body5Bytes.length; + int bodySize = CONTENT_BYTES.length; //create and store the message using the MessageStore interface MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); @@ -342,7 +393,6 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8); storedMessage_0_8.addContent(0, chunk1); - storedMessage_0_8.addContent(chunk1.limit(), chunk2); storedMessage_0_8.flushToStore(); return storedMessage_0_8; @@ -360,7 +410,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto BDBMessageStore bdbStore = assertBDBStore(log); final AMQShortString mockQueueName = new AMQShortString("queueName"); - + TransactionLogResource mockQueue = new TransactionLogResource() { public String getResourceName() @@ -368,27 +418,27 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto return mockQueueName.asString(); } }; - + MessageStore.Transaction txn = log.newTransaction(); - + txn.enqueueMessage(mockQueue, new MockMessage(1L)); txn.enqueueMessage(mockQueue, new MockMessage(5L)); txn.commitTran(); List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); - + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); Long val = enqueuedIds.get(0); assertEquals("First Message is incorrect", 1L, val.longValue()); val = enqueuedIds.get(1); assertEquals("Second Message is incorrect", 5L, val.longValue()); } - - + + /** - * Tests transaction rollback before a commit has occurred by utilising the - * enqueue and dequeue methods available in the TransactionLog interface - * implemented by the store, and verifying the behaviour using BDB + * Tests transaction rollback before a commit has occurred by utilising the + * enqueue and dequeue methods available in the TransactionLog interface + * implemented by the store, and verifying the behaviour using BDB * implementation methods. */ public void testTranRollbackBeforeCommit() throws Exception @@ -398,7 +448,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto BDBMessageStore bdbStore = assertBDBStore(log); final AMQShortString mockQueueName = new AMQShortString("queueName"); - + TransactionLogResource mockQueue = new TransactionLogResource() { public String getResourceName() @@ -406,30 +456,30 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto return mockQueueName.asString(); } }; - + MessageStore.Transaction txn = log.newTransaction(); - + txn.enqueueMessage(mockQueue, new MockMessage(21L)); txn.abortTran(); - + txn = log.newTransaction(); txn.enqueueMessage(mockQueue, new MockMessage(22L)); txn.enqueueMessage(mockQueue, new MockMessage(23L)); txn.commitTran(); List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); - + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); Long val = enqueuedIds.get(0); assertEquals("First Message is incorrect", 22L, val.longValue()); val = enqueuedIds.get(1); assertEquals("Second Message is incorrect", 23L, val.longValue()); } - + /** - * Tests transaction rollback after a commit has occurred by utilising the - * enqueue and dequeue methods available in the TransactionLog interface - * implemented by the store, and verifying the behaviour using BDB + * Tests transaction rollback after a commit has occurred by utilising the + * enqueue and dequeue methods available in the TransactionLog interface + * implemented by the store, and verifying the behaviour using BDB * implementation methods. */ public void testTranRollbackAfterCommit() throws Exception @@ -439,7 +489,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto BDBMessageStore bdbStore = assertBDBStore(log); final AMQShortString mockQueueName = new AMQShortString("queueName"); - + TransactionLogResource mockQueue = new TransactionLogResource() { public String getResourceName() @@ -447,22 +497,22 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto return mockQueueName.asString(); } }; - + MessageStore.Transaction txn = log.newTransaction(); - + txn.enqueueMessage(mockQueue, new MockMessage(30L)); txn.commitTran(); txn = log.newTransaction(); txn.enqueueMessage(mockQueue, new MockMessage(31L)); txn.abortTran(); - + txn = log.newTransaction(); txn.enqueueMessage(mockQueue, new MockMessage(32L)); txn.commitTran(); - + List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); - + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); Long val = enqueuedIds.get(0); assertEquals("First Message is incorrect", 30L, val.longValue()); @@ -470,6 +520,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto assertEquals("Second Message is incorrect", 32L, val.longValue()); } + @SuppressWarnings("rawtypes") private static class MockMessage implements ServerMessage, EnqueableMessage { private long _messageId; diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java index bcbb7d8b72..122f846a2d 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java @@ -20,19 +20,36 @@ */ package org.apache.qpid.server.store.berkeleydb; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.URLSyntaxException; -import javax.jms.*; - /** - * Prepares an older version brokers BDB store with the required + * Prepares an older version brokers BDB store with the required * contents for use in the BDBStoreUpgradeTest. * * NOTE: Must be used with the equivalent older version client! * - * The store will then be used to verify that the upgraded is - * completed properly and that once upgraded it functions as + * The store will then be used to verify that the upgraded is + * completed properly and that once upgraded it functions as * expected with the new broker. * */ @@ -43,9 +60,10 @@ public class BDBStoreUpgradeTestPreparer public static final String SELECTOR_SUB_NAME="mySelectorDurSubName"; public static final String SELECTOR_TOPIC_NAME="mySelectorUpgradeTopic"; public static final String QUEUE_NAME="myUpgradeQueue"; + public static final String NON_DURABLE_QUEUE_NAME="queue-non-durable"; private static AMQConnectionFactory _connFac; - private static final String CONN_URL = + private static final String CONN_URL = "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'"; /** @@ -59,14 +77,28 @@ public class BDBStoreUpgradeTestPreparer private void prepareBroker() throws Exception { prepareQueues(); + prepareNonDurableQueue(); prepareDurableSubscriptionWithSelector(); prepareDurableSubscriptionWithoutSelector(); } + private void prepareNonDurableQueue() throws Exception + { + Connection connection = _connFac.createConnection(); + AMQSession<?, ?> session = (AMQSession<?,?>)connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQShortString queueName = AMQShortString.valueOf(NON_DURABLE_QUEUE_NAME); + AMQDestination destination = (AMQDestination) session.createQueue(NON_DURABLE_QUEUE_NAME); + session.sendCreateQueue(queueName, false, false, false, null); + session.bindQueue(queueName, queueName, null, AMQShortString.valueOf("amq.direct"), destination); + MessageProducer messageProducer = session.createProducer(destination); + sendMessages(session, messageProducer, destination, DeliveryMode.PERSISTENT, 1024, 3); + connection.close(); + } + /** * Prepare a queue for use in testing message and binding recovery * after the upgrade is performed. - * + * * - Create a transacted session on the connection. * - Use a consumer to create the (durable by default) queue. * - Send 5 large messages to test (multi-frame) content recovery. @@ -74,7 +106,7 @@ public class BDBStoreUpgradeTestPreparer * - Commit the session. * - Send 5 small messages to test that uncommitted messages are not recovered. * following the upgrade. - * - Close the session. + * - Close the session. */ private void prepareQueues() throws Exception { @@ -114,9 +146,9 @@ public class BDBStoreUpgradeTestPreparer } /** - * Prepare a DurableSubscription backing queue for use in testing selector + * Prepare a DurableSubscription backing queue for use in testing selector * recovery and queue exclusivity marking during the upgrade process. - * + * * - Create a transacted session on the connection. * - Open and close a DurableSubscription with selector to create the backing queue. * - Send a message which matches the selector. @@ -145,7 +177,7 @@ public class BDBStoreUpgradeTestPreparer TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); durSub1.close(); - // Create a publisher and send a persistent message which matches the selector + // Create a publisher and send a persistent message which matches the selector // followed by one that does not match, and another which matches but is not // committed and so should be 'lost' TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED); @@ -202,7 +234,7 @@ public class BDBStoreUpgradeTestPreparer connection.close(); } - public static void sendMessages(Session session, MessageProducer messageProducer, + public static void sendMessages(Session session, MessageProducer messageProducer, Destination dest, int deliveryMode, int length, int numMesages) throws JMSException { for (int i = 1; i <= numMesages; i++) @@ -213,7 +245,7 @@ public class BDBStoreUpgradeTestPreparer } } - public static void publishMessages(Session session, TopicPublisher publisher, + public static void publishMessages(Session session, TopicPublisher publisher, Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException { for (int i = 1; i <= numMesages; i++) @@ -227,8 +259,8 @@ public class BDBStoreUpgradeTestPreparer /** * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2. - * - * @param length number of characters in the string + * + * @param length number of characters in the string * @return string sequence of the given length */ public static String generateString(int length) @@ -248,6 +280,7 @@ public class BDBStoreUpgradeTestPreparer */ public static void main(String[] args) throws Exception { + System.setProperty("qpid.dest_syntax", "BURL"); BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer(); producer.prepareBroker(); } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java index 55327e3b56..a708ceac1c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java @@ -20,36 +20,14 @@ */ package org.apache.qpid.server.store.berkeleydb; -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.je.DatabaseEntry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.management.common.mbeans.ManagedQueue; -import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4; -import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory; -import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.util.FileUtils; - +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME; + +import java.io.File; import javax.jms.Connection; import javax.jms.DeliveryMode; @@ -64,18 +42,18 @@ import javax.jms.TopicConnection; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; + +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Tests upgrading a BDB store and using it with the new broker - * after the required contents are entered into the store using - * an old broker with the BDBStoreUpgradeTestPreparer. The store - * will then be used to verify that the upgraded is completed - * properly and that once upgraded it functions as expected with - * the new broker. + * Tests upgrading a BDB store on broker startup. + * The store will then be used to verify that the upgrade is completed + * properly and that once upgraded it functions as expected. */ public class BDBUpgradeTest extends QpidBrokerTestCase { @@ -84,73 +62,31 @@ public class BDBUpgradeTest extends QpidBrokerTestCase private static final String STRING_1024 = BDBStoreUpgradeTestPreparer.generateString(1024); private static final String STRING_1024_256 = BDBStoreUpgradeTestPreparer.generateString(1024*256); private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK"); - private static final String QPID_HOME = System.getProperty("QPID_HOME"); - private static final int VERSION_4 = 4; - private String _fromDir; - private String _toDir; - private String _toDirTwice; + private String _storeLocation; @Override public void setUp() throws Exception { assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG); - assertNotNull("QPID_HOME must be set", QPID_HOME); - - _fromDir = QPID_HOME + "/bdbstore-to-upgrade/test-store"; - _toDir = getWorkDirBaseDir() + "/bdbstore/test-store"; - _toDirTwice = getWorkDirBaseDir() + "/bdbstore-upgraded-twice"; + _storeLocation = QPID_WORK_ORIG + "/bdbstore/test-store"; //Clear the two target directories if they exist. - File directory = new File(_toDir); - if (directory.exists() && directory.isDirectory()) - { - FileUtils.delete(directory, true); - } - directory = new File(_toDirTwice); + File directory = new File(_storeLocation); if (directory.exists() && directory.isDirectory()) { FileUtils.delete(directory, true); } - //Upgrade the test store. - upgradeBrokerStore(_fromDir, _toDir); + // copy store files + String src = getClass().getClassLoader().getResource("upgrade/bdbstore-v4/test-store").toURI().getPath(); + FileUtils.copyRecursive(new File(src), new File(_storeLocation)); //override the broker config used and then start the broker with the updated store _configFile = new File("build/etc/config-systests-bdb.xml"); setConfigurationProperty("management.enabled", "true"); - super.setUp(); - } - - private String getWorkDirBaseDir() - { - return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort()); - } - - /** - * Tests that the core upgrade method of the store upgrade tool passes through the exception - * from the BDBMessageStore indicating that the data on disk can't be loaded as the previous - * version because it has already been upgraded. - * @throws Exception - */ - public void testMultipleUpgrades() throws Exception - { - //stop the broker started by setUp() in order to allow the second upgrade attempt to proceed - stopBroker(); - - try - { - new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(VERSION_4); - fail("Second Upgrade Succeeded"); - } - catch (Exception e) - { - System.err.println("Showing stack trace, we are expecting an 'Unable to load BDBStore' error"); - e.printStackTrace(); - assertTrue("Incorrect Exception Thrown:" + e.getMessage(), - e.getMessage().contains("Unable to load BDBStore as version 4. Store on disk contains version 5 data")); - } + super.setUp(); } /** @@ -175,26 +111,26 @@ public class BDBUpgradeTest extends QpidBrokerTestCase try { ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SELECTOR_SUB_NAME); - assertEquals("DurableSubscription backing queue should have 1 message on it initially", + assertEquals("DurableSubscription backing queue should have 1 message on it initially", new Integer(1), dursubQueue.getMessageCount()); - + // Create a connection and start it TopicConnection connection = (TopicConnection) getConnection(); connection.start(); - + // Send messages which don't match and do match the selector, checking message count - TopicSession pubSession = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED); + TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED); Topic topic = pubSession.createTopic(SELECTOR_TOPIC_NAME); TopicPublisher publisher = pubSession.createPublisher(topic); - + BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false"); pubSession.commit(); - assertEquals("DurableSubscription backing queue should still have 1 message on it", + assertEquals("DurableSubscription backing queue should still have 1 message on it", Integer.valueOf(1), dursubQueue.getMessageCount()); - + BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); pubSession.commit(); - assertEquals("DurableSubscription backing queue should now have 2 messages on it", + assertEquals("DurableSubscription backing queue should now have 2 messages on it", Integer.valueOf(2), dursubQueue.getMessageCount()); TopicSubscriber durSub = pubSession.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); @@ -240,7 +176,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase connection.start(); // Send new message matching the topic, checking message count - TopicSession session = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED); + TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED); Topic topic = session.createTopic(TOPIC_NAME); TopicPublisher publisher = session.createPublisher(topic); @@ -298,10 +234,10 @@ public class BDBUpgradeTest extends QpidBrokerTestCase /** * Test that the upgraded queue continues to function properly when used - * for persistent messaging and restarting the broker. - * + * for persistent messaging and restarting the broker. + * * Sends the new messages to the queue BEFORE consuming those which were - * sent before the upgrade. In doing so, this also serves to test that + * sent before the upgrade. In doing so, this also serves to test that * the queue bindings were successfully transitioned during the upgrade. */ public void testBindingAndMessageDurabability() throws Exception @@ -329,7 +265,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase } /** - * Test that all of the committed persistent messages previously sent to + * Test that all of the committed persistent messages previously sent to * the broker are properly received following update of the MetaData and * Content entries during the store upgrade process. */ @@ -349,200 +285,22 @@ public class BDBUpgradeTest extends QpidBrokerTestCase * * @throws Exception */ - public void testMigrationOfMessagesForNonExistingQueues() throws Exception + public void testMigrationOfMessagesForNonDurableQueues() throws Exception { - stopBroker(); - - // copy store data into a new location for adding of phantom message - File storeLocation = new File(_fromDir); - File target = new File(_toDirTwice); - if (!target.exists()) - { - target.mkdirs(); - } - FileUtils.copyRecursive(storeLocation, target); - - // delete migrated data - File directory = new File(_toDir); - if (directory.exists() && directory.isDirectory()) - { - FileUtils.delete(directory, true); - } - - // test data - String nonExistingQueueName = getTestQueueName(); - String messageText = "Test Phantom Message"; - - // add message - addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName, messageText); - - String[] inputs = { "Yes", "Yes", "Yes" }; - upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs); - - // start broker - startBroker(); - // Create a connection and start it Connection connection = getConnection(); connection.start(); // consume a message for non-existing store Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(nonExistingQueueName); + Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); - Message message = messageConsumer.receive(1000); - // assert consumed message - assertNotNull("Message was not migrated!", message); - assertTrue("Unexpected message received!", message instanceof TextMessage); - String text = ((TextMessage) message).getText(); - assertEquals("Message migration failed!", messageText, text); - } - - /** - * An utility method to upgrade broker with simulation user interactions - * - * @param fromDir - * location of the store to migrate - * @param toDir - * location of where migrated data will be stored - * @param inputs - * user answers on upgrade tool questions - * @throws Exception - */ - private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final String[] inputs) - throws Exception - { - // save to restore system.in after data migration - InputStream stdin = System.in; - - // set fake system in to simulate user interactions - // FIXME: it is a quite dirty simulator of system input but it does the job - System.setIn(new InputStream() - { - - private int counter = 0; - - public synchronized int read(byte b[], int off, int len) - { - byte[] src = (inputs[counter] + "\n").getBytes(); - System.arraycopy(src, 0, b, off, src.length); - counter++; - return src.length; - } - - @Override - public int read() throws IOException - { - return -1; - } - }); - - try - { - // Upgrade the test store. - new BDBStoreUpgrade(fromDir, toDir, null, true, true).upgradeFromVersion(VERSION_4); - } - finally + for (int i = 0; i < 3; i++) { - // restore system in - System.setIn(stdin); - } - } - - @SuppressWarnings("unchecked") - private void addMessageForNonExistingQueue(File storeLocation, int storeVersion, String nonExistingQueueName, - String messageText) throws Exception - { - final AMQShortString queueName = new AMQShortString(nonExistingQueueName); - BDBMessageStore store = new BDBMessageStore(storeVersion); - store.configure(storeLocation, false); - try - { - store.start(); - - // store message objects - ByteBuffer completeContentBody = ByteBuffer.wrap(messageText.getBytes("UTF-8")); - long bodySize = completeContentBody.limit(); - MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString("amq.direct"), false, - false, queueName); - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); - props.setContentType("text/plain"); - props.setType("text/plain"); - props.setMessageId("whatever"); - props.setEncoding("UTF-8"); - props.getHeaders().setString("Test", "MST"); - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); - int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); - ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize); - - // add content entry to database - final long messageId = store.getNewMessageId(); - TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance(); - MessageContentKey contentKey = null; - if (storeVersion == VERSION_4) - { - contentKey = new MessageContentKey_4(messageId, 0); - } - else - { - throw new Exception(storeVersion + " is not supported"); - } - DatabaseEntry key = new DatabaseEntry(); - contentKeyTB.objectToEntry(contentKey, key); - DatabaseEntry data = new DatabaseEntry(); - ContentTB contentTB = new ContentTB(); - contentTB.objectToEntry(completeContentBody, data); - store.getContentDb().put(null, key, data); - - // add meta data entry to database - TupleBinding<Long> longTB = TupleBinding.getPrimitiveBinding(Long.class); - TupleBinding<Object> metaDataTB = new MessageMetaDataTupleBindingFactory(storeVersion).getInstance(); - key = new DatabaseEntry(); - data = new DatabaseEntry(); - longTB.objectToEntry(new Long(messageId), key); - MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1); - metaDataTB.objectToEntry(metaData, data); - store.getMetaDataDb().put(null, key, data); - - // add delivery entry to database - TransactionLogResource mockQueue = new TransactionLogResource() - { - public String getResourceName() - { - return queueName.asString(); - } - }; - - EnqueableMessage mockMessage = new EnqueableMessage() - { - - public long getMessageNumber() - { - return messageId; - } - - public boolean isPersistent() - { - return true; - } - - public StoredMessage getStoredMessage() - { - return null; - } - }; - - MessageStore log = (MessageStore) store; - MessageStore.Transaction txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, mockMessage); - txn.commitTran(); - } - finally - { - // close store - store.close(); + Message message = messageConsumer.receive(1000); + assertNotNull("Message was not migrated!", message); + assertTrue("Unexpected message received!", message instanceof TextMessage); } } @@ -564,7 +322,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase } - // Retrieve the matching message + // Retrieve the matching message Message m = durSub.receive(2000); assertNotNull("Failed to receive an expected message", m); if(selector) @@ -623,8 +381,4 @@ public class BDBUpgradeTest extends QpidBrokerTestCase session.close(); } - private void upgradeBrokerStore(String fromDir, String toDir) throws Exception - { - new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(VERSION_4); - } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java new file mode 100644 index 0000000000..6df2f8a8db --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java @@ -0,0 +1,151 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.upgrade; + +import java.io.File; + +import junit.framework.TestCase; + +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.subjects.TestBlankSubject; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.Database; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.Transaction; + +public abstract class AbstractUpgradeTestCase extends TestCase +{ + protected static final class StaticAnswerHandler implements UpgradeInteractionHandler + { + private UpgradeInteractionResponse _response; + + public StaticAnswerHandler(UpgradeInteractionResponse response) + { + _response = response; + } + + @Override + public UpgradeInteractionResponse requireResponse(String question, UpgradeInteractionResponse defaultResponse, + UpgradeInteractionResponse... possibleResponses) + { + return _response; + } + } + + public static final String[] QUEUE_NAMES = { "clientid:myDurSubName", "clientid:mySelectorDurSubName", "myUpgradeQueue", + "queue-non-durable" }; + public static int[] QUEUE_SIZES = { 1, 1, 10, 3 }; + public static int TOTAL_MESSAGE_NUMBER = 15; + protected static final LogSubject LOG_SUBJECT = new TestBlankSubject(); + protected static final String TMP_FOLDER = System.getProperty("java.io.tmpdir"); + + // one binding per exchange + protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2; + protected static final int TOTAL_EXCHANGES = 5; + + private File _storeLocation; + protected Environment _environment; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _storeLocation = copyStore(getStoreDirectoryName()); + + _environment = createEnvironment(_storeLocation); + } + + /** @return eg "bdbstore-v4" - used for copying store */ + protected abstract String getStoreDirectoryName(); + + protected Environment createEnvironment(File storeLocation) + { + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setAllowCreate(true); + envConfig.setTransactional(true); + envConfig.setConfigParam("je.lock.nLockTables", "7"); + envConfig.setReadOnly(false); + envConfig.setSharedCache(false); + envConfig.setCacheSize(0); + return new Environment(storeLocation, envConfig); + } + + @Override + public void tearDown() throws Exception + { + try + { + _environment.close(); + } + finally + { + _environment = null; + deleteDirectoryIfExists(_storeLocation); + } + super.tearDown(); + } + + private File copyStore(String storeDirectoryName) throws Exception + { + String src = getClass().getClassLoader().getResource("upgrade/" + storeDirectoryName).toURI().getPath(); + File storeLocation = new File(new File(TMP_FOLDER), "test-store"); + deleteDirectoryIfExists(storeLocation); + FileUtils.copyRecursive(new File(src), new File(TMP_FOLDER)); + return storeLocation; + } + + protected void deleteDirectoryIfExists(File dir) + { + if (dir.exists()) + { + assertTrue("The provided file " + dir + " is not a directory", dir.isDirectory()); + + boolean deletedSuccessfully = FileUtils.delete(dir, true); + + assertTrue("Files at '" + dir + "' should have been deleted", deletedSuccessfully); + } + } + + protected void assertDatabaseRecordCount(String databaseName, final long expectedCountNumber) + { + long count = getDatabaseCount(databaseName); + assertEquals("Unexpected database '" + databaseName + "' entry number", expectedCountNumber, count); + } + + protected long getDatabaseCount(String databaseName) + { + DatabaseCallable<Long> operation = new DatabaseCallable<Long>() + { + + @Override + public Long call(Database sourceDatabase, Database targetDatabase, Transaction transaction) + { + return new Long(sourceDatabase.count()); + + } + }; + Long count = new DatabaseTemplate(_environment, databaseName, null).call(operation); + return count.longValue(); + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java new file mode 100644 index 0000000000..7ec442b73d --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.upgrade; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import junit.framework.TestCase; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.Environment; +import com.sleepycat.je.Transaction; + +public class DatabaseTemplateTest extends TestCase +{ + private static final String SOURCE_DATABASE = "sourceDatabase"; + private Environment _environment; + private Database _sourceDatabase; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _environment = mock(Environment.class); + _sourceDatabase = mock(Database.class); + when(_environment.openDatabase(any(Transaction.class), same(SOURCE_DATABASE), isA(DatabaseConfig.class))) + .thenReturn(_sourceDatabase); + } + + public void testExecuteWithTwoDatabases() + { + String targetDatabaseName = "targetDatabase"; + Database targetDatabase = mock(Database.class); + + Transaction txn = mock(Transaction.class); + + when(_environment.openDatabase(same(txn), same(targetDatabaseName), isA(DatabaseConfig.class))) + .thenReturn(targetDatabase); + + DatabaseTemplate databaseTemplate = new DatabaseTemplate(_environment, SOURCE_DATABASE, targetDatabaseName, txn); + + DatabaseRunnable databaseOperation = mock(DatabaseRunnable.class); + databaseTemplate.run(databaseOperation); + + verify(databaseOperation).run(_sourceDatabase, targetDatabase, txn); + verify(_sourceDatabase).close(); + verify(targetDatabase).close(); + } + + public void testExecuteWithOneDatabases() + { + DatabaseTemplate databaseTemplate = new DatabaseTemplate(_environment, SOURCE_DATABASE, null, null); + + DatabaseRunnable databaseOperation = mock(DatabaseRunnable.class); + databaseTemplate.run(databaseOperation); + + verify(databaseOperation).run(_sourceDatabase, (Database)null, (Transaction)null); + verify(_sourceDatabase).close(); + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java new file mode 100644 index 0000000000..9341022a38 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java @@ -0,0 +1,299 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.upgrade; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingRecord; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.BindingTuple; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKey; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.MessageContentKeyBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueEntryKey; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueEntryKeyBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom4To5.QueueRecord; + +import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.Transaction; + +public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase +{ + private static final String NON_DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; + private static final String DURABLE_QUEUE = BDBStoreUpgradeTestPreparer.QUEUE_NAME; + private static final String DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR = "clientid:mySelectorDurSubName"; + private static final String DURABLE_SUBSCRIPTION_QUEUE = "clientid:myDurSubName"; + private static final String EXCHANGE_DB_NAME = "exchangeDb_v5"; + private static final String MESSAGE_META_DATA_DB_NAME = "messageMetaDataDb_v5"; + private static final String MESSAGE_CONTENT_DB_NAME = "messageContentDb_v5"; + private static final String DELIVERY_DB_NAME = "deliveryDb_v5"; + private static final String BINDING_DB_NAME = "queueBindingsDb_v5"; + + @Override + protected String getStoreDirectoryName() + { + return "bdbstore-v4"; + } + + public void testPerformUpgradeWithHandlerAnsweringYes() throws Exception + { + UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); + upgrade.performUpgrade(LOG_SUBJECT, _environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES)); + + assertQueues(new HashSet<String>(Arrays.asList(QUEUE_NAMES))); + + assertDatabaseRecordCount(DELIVERY_DB_NAME, TOTAL_MESSAGE_NUMBER); + assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, TOTAL_MESSAGE_NUMBER); + assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES); + + for (int i = 0; i < QUEUE_SIZES.length; i++) + { + assertQueueMessages(QUEUE_NAMES[i], QUEUE_SIZES[i]); + } + + final List<BindingRecord> queueBindings = loadBindings(); + + assertEquals("Unxpected list size", TOTAL_BINDINGS, queueBindings.size()); + assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", BDBStoreUpgradeTestPreparer.TOPIC_NAME, ""); + assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic", + BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'"); + assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null); + assertBindingRecord(queueBindings, NON_DURABLE_QUEUE, "amq.direct", NON_DURABLE_QUEUE, null); + assertContent(); + } + + public void testPerformUpgradeWithHandlerAnsweringNo() throws Exception + { + UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); + upgrade.performUpgrade(LOG_SUBJECT, _environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO)); + assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE))); + + assertDatabaseRecordCount(DELIVERY_DB_NAME, 12); + assertDatabaseRecordCount(MESSAGE_META_DATA_DB_NAME, 12); + assertDatabaseRecordCount(EXCHANGE_DB_NAME, TOTAL_EXCHANGES); + + assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE, 1); + assertQueueMessages(DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, 1); + assertQueueMessages(DURABLE_QUEUE, 10); + + final List<BindingRecord> queueBindings = loadBindings(); + + assertEquals("Unxpected list size", TOTAL_BINDINGS - 2, queueBindings.size()); + assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE, "amq.topic", BDBStoreUpgradeTestPreparer.TOPIC_NAME, + ""); + assertBindingRecord(queueBindings, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, "amq.topic", + BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME, "testprop='true'"); + assertBindingRecord(queueBindings, DURABLE_QUEUE, "amq.direct", DURABLE_QUEUE, null); + assertContent(); + } + + private List<BindingRecord> loadBindings() + { + final BindingTuple bindingTuple = new BindingTuple(); + final List<BindingRecord> queueBindings = new ArrayList<BindingRecord>(); + CursorOperation databaseOperation = new CursorOperation() + { + + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + BindingRecord bindingRecord = bindingTuple.entryToObject(key); + + AMQShortString queueName = bindingRecord.getQueueName(); + AMQShortString exchangeName = bindingRecord.getExchangeName(); + AMQShortString routingKey = bindingRecord.getRoutingKey(); + FieldTable arguments = bindingRecord.getArguments(); + queueBindings.add(new BindingRecord(exchangeName, queueName, routingKey, arguments)); + } + }; + new DatabaseTemplate(_environment, BINDING_DB_NAME, null).run(databaseOperation); + return queueBindings; + } + + private void assertBindingRecord(List<BindingRecord> queueBindings, String queueName, String exchangeName, + String routingKey, String selectorKey) + { + BindingRecord record = null; + for (BindingRecord bindingRecord : queueBindings) + { + if (bindingRecord.getQueueName().asString().equals(queueName) + && bindingRecord.getExchangeName().asString().equals(exchangeName)) + { + record = bindingRecord; + break; + } + } + assertNotNull("Binding is not found for queue " + queueName + " and exchange " + exchangeName, record); + assertEquals("Unexpected routing key", routingKey, record.getRoutingKey().asString()); + + if (selectorKey != null) + { + assertEquals("Unexpected selector key for " + queueName, selectorKey, + record.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.getValue())); + } + } + + private void assertQueueMessages(final String queueName, final int expectedQueueSize) + { + final Set<Long> messageIdsForQueue = assertDeliveriesForQueue(queueName, expectedQueueSize); + + assertMetadataForQueue(queueName, expectedQueueSize, messageIdsForQueue); + + assertContentForQueue(queueName, expectedQueueSize, messageIdsForQueue); + } + + private Set<Long> assertDeliveriesForQueue(final String queueName, final int expectedQueueSize) + { + final QueueEntryKeyBinding queueEntryKeyBinding = new QueueEntryKeyBinding(); + final AtomicInteger deliveryCounter = new AtomicInteger(); + final Set<Long> messagesForQueue = new HashSet<Long>(); + + CursorOperation deliveryDatabaseOperation = new CursorOperation() + { + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + QueueEntryKey entryKey = queueEntryKeyBinding.entryToObject(key); + String thisQueueName = entryKey.getQueueName().asString(); + if (thisQueueName.equals(queueName)) + { + deliveryCounter.incrementAndGet(); + messagesForQueue.add(entryKey.getMessageId()); + } + } + }; + new DatabaseTemplate(_environment, DELIVERY_DB_NAME, null).run(deliveryDatabaseOperation); + + assertEquals("Unxpected number of entries in delivery db for queue " + queueName, expectedQueueSize, + deliveryCounter.get()); + + return messagesForQueue; + } + + private void assertMetadataForQueue(final String queueName, final int expectedQueueSize, + final Set<Long> messageIdsForQueue) + { + final AtomicInteger metadataCounter = new AtomicInteger(); + CursorOperation databaseOperation = new CursorOperation() + { + + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + Long messageId = LongBinding.entryToLong(key); + + boolean messageIsForTheRightQueue = messageIdsForQueue.contains(messageId); + if (messageIsForTheRightQueue) + { + metadataCounter.incrementAndGet(); + } + } + }; + new DatabaseTemplate(_environment, MESSAGE_META_DATA_DB_NAME, null).run(databaseOperation); + + assertEquals("Unxpected number of entries in metadata db for queue " + queueName, expectedQueueSize, + metadataCounter.get()); + } + + private void assertContentForQueue(String queueName, int expectedQueueSize, final Set<Long> messageIdsForQueue) + { + final AtomicInteger contentCounter = new AtomicInteger(); + final MessageContentKeyBinding keyBinding = new MessageContentKeyBinding(); + CursorOperation cursorOperation = new CursorOperation() + { + private long _prevMsgId = -1; + + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + MessageContentKey contentKey = keyBinding.entryToObject(key); + long msgId = contentKey.getMessageId(); + + if (_prevMsgId != msgId && messageIdsForQueue.contains(msgId)) + { + contentCounter.incrementAndGet(); + } + + _prevMsgId = msgId; + } + }; + new DatabaseTemplate(_environment, MESSAGE_CONTENT_DB_NAME, null).run(cursorOperation); + + assertEquals("Unxpected number of entries in content db for queue " + queueName, expectedQueueSize, + contentCounter.get()); + } + + private void assertQueues(Set<String> expectedQueueNames) + { + List<AMQShortString> durableSubNames = new ArrayList<AMQShortString>(); + final UpgradeFrom4To5.QueueRecordBinding binding = new UpgradeFrom4To5.QueueRecordBinding(durableSubNames); + final Set<String> actualQueueNames = new HashSet<String>(); + + CursorOperation queueNameCollector = new CursorOperation() + { + + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + QueueRecord record = binding.entryToObject(value); + String queueName = record.getNameShortString().asString(); + actualQueueNames.add(queueName); + } + }; + new DatabaseTemplate(_environment, "queueDb_v5", null).run(queueNameCollector); + + assertEquals("Unexpected queue names", expectedQueueNames, actualQueueNames); + } + + private void assertContent() + { + final UpgradeFrom4To5.ContentBinding contentBinding = new UpgradeFrom4To5.ContentBinding(); + CursorOperation contentCursorOperation = new CursorOperation() + { + + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key, + DatabaseEntry value) + { + long id = LongBinding.entryToLong(key); + assertTrue("Unexpected id", id > 0); + ByteBuffer content = contentBinding.entryToObject(value); + assertNotNull("Unexpected content", content); + } + }; + new DatabaseTemplate(_environment, MESSAGE_CONTENT_DB_NAME, null).run(contentCursorOperation); + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java new file mode 100644 index 0000000000..cca5923ffd --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.upgrade; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewDataBinding; + +import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.Transaction; + +public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase +{ + private static final Logger _logger = Logger.getLogger(UpgradeFrom5To6Test.class); + + @Override + protected String getStoreDirectoryName() + { + return "bdbstore-v5"; + } + + public void testPerformUpgrade() throws Exception + { + UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); + upgrade.performUpgrade(LOG_SUBJECT, _environment, UpgradeInteractionHandler.DEFAULT_HANDLER); + + assertDatabaseRecordCounts(); + assertContent(); + } + + public void testPerformUpgradeWithMissingMessageChunkKeepsIncompleteMessage() throws Exception + { + corruptDatabase(); + + UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); + upgrade.performUpgrade(LOG_SUBJECT, _environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES)); + + assertDatabaseRecordCounts(); + } + + public void testPerformUpgradeWithMissingMessageChunkDiscardsIncompleteMessage() throws Exception + { + corruptDatabase(); + + UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); + + UpgradeInteractionHandler discardMessageInteractionHandler = new StaticAnswerHandler(UpgradeInteractionResponse.NO); + + upgrade.performUpgrade(LOG_SUBJECT, _environment, discardMessageInteractionHandler); + + assertDatabaseRecordCount("MESSAGE_METADATA", 11); + assertDatabaseRecordCount("MESSAGE_CONTENT", 11); + } + + /** + * modify the chunk offset of a message to be wrong, so we can test logic + * that preserves incomplete messages + */ + private void corruptDatabase() + { + CursorOperation cursorOperation = new CursorOperation() + { + + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + CompoundKeyBinding binding = new CompoundKeyBinding(); + CompoundKey originalCompoundKey = binding.entryToObject(key); + int corruptedOffset = originalCompoundKey.getOffset() + 2; + CompoundKey corruptedCompoundKey = new CompoundKey(originalCompoundKey.getMessageId(), corruptedOffset); + DatabaseEntry newKey = new DatabaseEntry(); + binding.objectToEntry(corruptedCompoundKey, newKey); + + _logger.info("Deliberately corrupted message id " + originalCompoundKey.getMessageId() + + ", changed offset from " + originalCompoundKey.getOffset() + " to " + + corruptedCompoundKey.getOffset()); + + deleteCurrent(); + sourceDatabase.put(transaction, newKey, value); + + abort(); + } + }; + + Transaction transaction = _environment.beginTransaction(null, null); + new DatabaseTemplate(_environment, "messageContentDb_v5", transaction).run(cursorOperation); + transaction.commit(); + } + + private void assertDatabaseRecordCounts() + { + assertDatabaseRecordCount("EXCHANGES", 5); + assertDatabaseRecordCount("QUEUES", 3); + assertDatabaseRecordCount("QUEUE_BINDINGS", 6); + assertDatabaseRecordCount("DELIVERIES", 12); + + assertDatabaseRecordCount("MESSAGE_METADATA", 12); + assertDatabaseRecordCount("MESSAGE_CONTENT", 12); + } + + private void assertContent() + { + final NewDataBinding contentBinding = new NewDataBinding(); + CursorOperation contentCursorOperation = new CursorOperation() + { + + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key, + DatabaseEntry value) + { + long id = LongBinding.entryToLong(key); + assertTrue("Unexpected id", id > 0); + byte[] content = contentBinding.entryToObject(value); + assertNotNull("Unexpected content", content); + } + }; + new DatabaseTemplate(_environment, "MESSAGE_CONTENT", null).run(contentCursorOperation); + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java new file mode 100644 index 0000000000..0e9d722089 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java @@ -0,0 +1,139 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.upgrade; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.server.logging.subjects.TestBlankSubject; +import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; + +import com.sleepycat.bind.tuple.IntegerBinding; +import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.Cursor; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.Transaction; + +public class UpgraderTest extends AbstractUpgradeTestCase +{ + private Upgrader _upgrader; + + @Override + protected String getStoreDirectoryName() + { + return "bdbstore-v4"; + } + + @Override + public void setUp() throws Exception + { + super.setUp(); + _upgrader = new Upgrader(_environment, new TestBlankSubject()); + } + + private int getStoreVersion() + { + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + int storeVersion = -1; + Database versionDb = null; + Cursor cursor = null; + try + { + versionDb = _environment.openDatabase(null, Upgrader.VERSION_DB_NAME, dbConfig); + cursor = versionDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) + { + int version = IntegerBinding.entryToInt(key); + if (storeVersion < version) + { + storeVersion = version; + } + } + } + finally + { + if (cursor != null) + { + cursor.close(); + } + if (versionDb != null) + { + versionDb.close(); + } + } + return storeVersion; + } + + public void testUpgrade() throws Exception + { + assertEquals("Unexpected store version", -1, getStoreVersion()); + _upgrader.upgradeIfNecessary(); + assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion()); + assertContent(); + } + + public void testEmptyDatabaseUpgradeDoesNothing() throws Exception + { + File nonExistentStoreLocation = new File(TMP_FOLDER, getName()); + deleteDirectoryIfExists(nonExistentStoreLocation); + + nonExistentStoreLocation.mkdir(); + _environment = createEnvironment(nonExistentStoreLocation); + _upgrader = new Upgrader(_environment, new TestBlankSubject()); + _upgrader.upgradeIfNecessary(); + + List<String> databaseNames = _environment.getDatabaseNames(); + List<String> expectedDatabases = new ArrayList<String>(); + expectedDatabases.add("VERSION"); + assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames); + assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion()); + + nonExistentStoreLocation.delete(); + } + + private void assertContent() + { + final ContentBinding contentBinding = ContentBinding.getInstance(); + CursorOperation contentCursorOperation = new CursorOperation() + { + + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key, + DatabaseEntry value) + { + long id = LongBinding.entryToLong(key); + assertTrue("Unexpected id", id > 0); + byte[] content = contentBinding.entryToObject(value); + assertNotNull("Unexpected content", content); + } + }; + new DatabaseTemplate(_environment, "MESSAGE_CONTENT", null).run(contentCursorOperation); + } +} diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb Binary files differindex 38158a55e7..167ab7f0ca 100644 --- a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb +++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt new file mode 100644 index 0000000000..a7e754f967 --- /dev/null +++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt @@ -0,0 +1,5 @@ +The bdbstore v5 data were obtained by upgrading the bdbstore v4 data as part of running +test UpgradeFrom4to5Test#testPerformUpgradeWithHandlerAnsweringNo. + +The rationale for not using BDBStoreUpgradeTestPreparer in this case is that we need chunked content. +Current implementation of BDBMessageStore only stores messages in one chunk.
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb Binary files differnew file mode 100644 index 0000000000..d44b21a83e --- /dev/null +++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb Binary files differnew file mode 100644 index 0000000000..9b85860c19 --- /dev/null +++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb |
