diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-04-17 09:01:44 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-04-17 09:01:44 +0000 |
| commit | 3203eea7641e1b0f39de96d797db7c54423b7f02 (patch) | |
| tree | f2563ba4a85ac54765d8f62663b60853846b3a89 /qpid/java/bdbstore/src | |
| parent | deab61acfe5f4edaae121cf6b9fa5d4b9e42803f (diff) | |
| download | qpid-python-3203eea7641e1b0f39de96d797db7c54423b7f02.tar.gz | |
QPID-3923: Store queue, exchange and binding as configured objects in bdb store
Applied patch by Oleksandr Rudyy <orudyy@gmail.com>, Phil Harvey <phil@philharveyonline.com>, and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
26 files changed, 1492 insertions, 895 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 439ec8ac4f..fb1d7c5265 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -46,8 +46,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; @@ -57,6 +57,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; +import org.apache.qpid.server.store.ConfiguredObjectHelper; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; @@ -73,19 +74,14 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.berkeleydb.entry.BindingRecord; -import org.apache.qpid.server.store.berkeleydb.entry.ExchangeRecord; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; -import org.apache.qpid.server.store.berkeleydb.entry.QueueRecord; import org.apache.qpid.server.store.berkeleydb.entry.Xid; -import org.apache.qpid.server.store.berkeleydb.tuple.AMQShortStringBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.ExchangeBinding; import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.QueueBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.QueueBindingTupleBinding; import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; import org.apache.qpid.server.store.berkeleydb.tuple.StringMapBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; @@ -104,23 +100,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore private Environment _environment; + private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS"; private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA"; private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT"; - private String QUEUEBINDINGSDB_NAME = "QUEUE_BINDINGS"; - private String DELIVERYDB_NAME = "DELIVERIES"; - private String EXCHANGEDB_NAME = "EXCHANGES"; - private String QUEUEDB_NAME = "QUEUES"; + private String DELIVERYDB_NAME = "QUEUE_ENTRIES"; private String BRIDGEDB_NAME = "BRIDGES"; private String LINKDB_NAME = "LINKS"; private String XIDDB_NAME = "XIDS"; - + private Database _configuredObjectsDb; private Database _messageMetaDataDb; private Database _messageContentDb; - private Database _queueBindingsDb; private Database _deliveryDb; - private Database _exchangeDb; - private Database _queueDb; private Database _bridgeDb; private Database _linkDb; private Database _xidDb; @@ -165,6 +156,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore private final EventManager _eventManager = new EventManager(); private String _storeLocation; + private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper(); + public AbstractBDBMessageStore() { _stateManager = new StateManager(_eventManager); @@ -239,7 +232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore LOGGER.info("Configuring BDB message store"); - setupStore(environmentPath); + setupStore(environmentPath, name); } /** @@ -257,11 +250,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore _stateManager.attainState(State.ACTIVE); } - protected void setupStore(File storePath) throws DatabaseException, AMQStoreException + protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException { _environment = createEnvironment(storePath); - new Upgrader(_environment).upgradeIfNecessary(); + new Upgrader(_environment, name).upgradeIfNecessary(); openDatabases(); } @@ -326,10 +319,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore //This is required if we are wanting read only access. dbConfig.setReadOnly(false); + _configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig); _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig); - _queueDb = openDatabase(QUEUEDB_NAME, dbConfig); - _exchangeDb = openDatabase(EXCHANGEDB_NAME, dbConfig); - _queueBindingsDb = openDatabase(QUEUEBINDINGSDB_NAME, dbConfig); _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig); _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig); _linkDb = openDatabase(LINKDB_NAME, dbConfig); @@ -376,23 +367,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore _messageContentDb.close(); } - if (_exchangeDb != null) - { - LOGGER.info("Closing exchange database"); - _exchangeDb.close(); - } - - if (_queueBindingsDb != null) - { - LOGGER.info("Closing bindings database"); - _queueBindingsDb.close(); - } - - if (_queueDb != null) - { - LOGGER.info("Closing queue database"); - _queueDb.close(); - } + if (_configuredObjectsDb != null) + { + LOGGER.info("Closing configurable objects database"); + _configuredObjectsDb.close(); + } if (_deliveryDb != null) { @@ -440,14 +419,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore { try { + List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); QueueRecoveryHandler qrh = recoveryHandler.begin(this); - loadQueues(qrh); + _configuredObjectHelper.recoverQueues(qrh, configuredObjects); ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); - loadExchanges(erh); + _configuredObjectHelper.recoverExchanges(erh, configuredObjects); BindingRecoveryHandler brh = erh.completeExchangeRecovery(); - recoverBindings(brh); + _configuredObjectHelper.recoverBindings(brh, configuredObjects); ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); recoverBrokerLinks(lrh); @@ -459,29 +439,21 @@ public abstract class AbstractBDBMessageStore implements MessageStore } - private void loadQueues(QueueRecoveryHandler qrh) throws DatabaseException + private List<ConfiguredObjectRecord> loadConfiguredObjects() throws DatabaseException { Cursor cursor = null; - + List<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>(); try { - cursor = _queueDb.openCursor(null, null); + cursor = _configuredObjectsDb.openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); - QueueBinding binding = QueueBinding.getInstance(); while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { - QueueRecord queueRecord = binding.entryToObject(value); - - String queueName = queueRecord.getNameShortString() == null ? null : - queueRecord.getNameShortString().asString(); - String owner = queueRecord.getOwner() == null ? null : - queueRecord.getOwner().asString(); - boolean exclusive = queueRecord.isExclusive(); - - FieldTable arguments = queueRecord.getArguments(); - - qrh.queue(queueName, owner, exclusive, arguments); + ConfiguredObjectRecord configuredObject = ConfiguredObjectBinding.getInstance().entryToObject(value); + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + configuredObject.setId(id); + results.add(configuredObject); } } @@ -489,6 +461,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { closeCursorSafely(cursor); } + return results; } private void closeCursorSafely(Cursor cursor) @@ -499,74 +472,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - - private void loadExchanges(ExchangeRecoveryHandler erh) throws DatabaseException - { - Cursor cursor = null; - - try - { - cursor = _exchangeDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - ExchangeBinding binding = ExchangeBinding.getInstance(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - ExchangeRecord exchangeRec = binding.entryToObject(value); - - String exchangeName = exchangeRec.getNameShortString() == null ? null : - exchangeRec.getNameShortString().asString(); - String type = exchangeRec.getType() == null ? null : - exchangeRec.getType().asString(); - boolean autoDelete = exchangeRec.isAutoDelete(); - - erh.exchange(exchangeName, type, autoDelete); - } - } - finally - { - closeCursorSafely(cursor); - } - - } - - private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException - { - Cursor cursor = null; - try - { - cursor = _queueBindingsDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - QueueBindingTupleBinding binding = QueueBindingTupleBinding.getInstance(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - //yes, this is retrieving all the useful information from the key only. - //For table compatibility it shall currently be left as is - BindingRecord bindingRecord = binding.entryToObject(key); - - String exchangeName = bindingRecord.getExchangeName() == null ? null : - bindingRecord.getExchangeName().asString(); - String queueName = bindingRecord.getQueueName() == null ? null : - bindingRecord.getQueueName().asString(); - String routingKey = bindingRecord.getRoutingKey() == null ? null : - bindingRecord.getRoutingKey().asString(); - ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null : - java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes())); - - brh.binding(exchangeName, queueName, routingKey, argumentsBB); - } - } - finally - { - closeCursorSafely(cursor); - } - - } - - private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh) { Cursor cursor = null; @@ -681,7 +586,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { QueueEntryKey qek = keyBinding.entryToObject(key); @@ -700,10 +604,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore for(QueueEntryKey entry : entries) { - AMQShortString queueName = entry.getQueueName(); + UUID queueId = entry.getQueueId(); long messageId = entry.getMessageId(); - - qerh.queueEntry(queueName.asString(),messageId); + qerh.queueEntry(queueId, messageId); } } catch (DatabaseException e) @@ -892,25 +795,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if (_stateManager.isInState(State.ACTIVE)) { - ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(), - exchange.getTypeShortString(), exchange.isAutoDelete()); - - DatabaseEntry key = new DatabaseEntry(); - AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance(); - keyBinding.objectToEntry(exchange.getNameShortString(), key); - - DatabaseEntry value = new DatabaseEntry(); - ExchangeBinding exchangeBinding = ExchangeBinding.getInstance(); - exchangeBinding.objectToEntry(exchangeRec, value); - - try - { - _exchangeDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing Exchange with name " + exchange.getName() + " to database: " + e.getMessage(), e); - } + ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange); + storeConfiguredObjectEntry(configuredObject); } } @@ -919,82 +805,47 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ public void removeExchange(Exchange exchange) throws AMQStoreException { - DatabaseEntry key = new DatabaseEntry(); - AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance(); - keyBinding.objectToEntry(exchange.getNameShortString(), key); - try + UUID id = exchange.getId(); + if (LOGGER.isDebugEnabled()) { - OperationStatus status = _exchangeDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Exchange " + exchange.getName() + " not found"); - } + LOGGER.debug("public void removeExchange(String name = " + exchange.getName() + ", uuid = " + id + "): called"); } - catch (DatabaseException e) + OperationStatus status = removeConfiguredObject(id); + if (status == OperationStatus.NOTFOUND) { - throw new AMQStoreException("Error writing deleting with name " + exchange.getName() + " from database: " + e.getMessage(), e); + throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + id + " not found"); } } /** - * @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable) + * @see DurableConfigurationStore#bindQueue(Binding) */ - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException - { - bindQueue(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args)); - } - - protected void bindQueue(final BindingRecord bindingRecord) throws AMQStoreException + public void bindQueue(Binding binding) throws AMQStoreException { if (_stateManager.isInState(State.ACTIVE)) { - DatabaseEntry key = new DatabaseEntry(); - QueueBindingTupleBinding keyBinding = QueueBindingTupleBinding.getInstance(); - - keyBinding.objectToEntry(bindingRecord, key); - - //yes, this is writing out 0 as a value and putting all the - //useful info into the key, don't ask me why. For table - //compatibility it shall currently be left as is - DatabaseEntry value = new DatabaseEntry(); - ByteBinding.byteToEntry((byte) 0, value); - - try - { - _queueBindingsDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing binding for AMQQueue with name " + bindingRecord.getQueueName() + " to exchange " - + bindingRecord.getExchangeName() + " to database: " + e.getMessage(), e); - } + ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding); + storeConfiguredObjectEntry(configuredObject); } } /** - * @see DurableConfigurationStore#unbindQueue(Exchange, AMQShortString, AMQQueue, FieldTable) + * @see DurableConfigurationStore#unbindQueue(Binding) */ - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) + public void unbindQueue(Binding binding) throws AMQStoreException { - DatabaseEntry key = new DatabaseEntry(); - QueueBindingTupleBinding keyBinding = QueueBindingTupleBinding.getInstance(); - keyBinding.objectToEntry(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key); - - try + UUID id = binding.getId(); + if (LOGGER.isDebugEnabled()) { - OperationStatus status = _queueBindingsDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Queue binding for queue with name " + queue.getName() + " to exchange " - + exchange.getName() + " not found"); - } + LOGGER.debug("public void unbindQueue(Binding binding = " + binding + ", uuid = " + id + "): called"); } - catch (DatabaseException e) + + OperationStatus status = removeConfiguredObject(id); + if (status == OperationStatus.NOTFOUND) { - throw new AMQStoreException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange " - + exchange.getName() + " from database: " + e.getMessage(), e); + throw new AMQStoreException("Binding " + binding + " not found"); } } @@ -1011,47 +862,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called"); - } - - QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(), - queue.getOwner(), queue.isExclusive(), arguments); - - createQueue(queueRecord); - } - - /** - * Makes the specified queue persistent. - * - * Only intended for direct use during store upgrades. - * - * @param queueRecord Details of the queue to store. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - protected void createQueue(QueueRecord queueRecord) throws AMQStoreException - { if (_stateManager.isInState(State.ACTIVE)) { - DatabaseEntry key = new DatabaseEntry(); - AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance(); - keyBinding.objectToEntry(queueRecord.getNameShortString(), key); - - DatabaseEntry value = new DatabaseEntry(); - QueueBinding queueBinding = QueueBinding.getInstance(); - - queueBinding.objectToEntry(queueRecord, value); - try - { - _queueDb.put(null, key, value); - } - catch (DatabaseException e) + if (LOGGER.isDebugEnabled()) { - throw new AMQStoreException("Error writing AMQQueue with name " + queueRecord.getNameShortString().asString() - + " to database: " + e.getMessage(), e); + LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + "), queue id" + queue.getId() + + ", arguments=" + arguments + "): called"); } + ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments); + storeConfiguredObjectEntry(configuredObject); } } @@ -1059,8 +878,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore * Updates the specified queue in the persistent store, IF it is already present. If the queue * is not present in the store, it will not be added. * - * NOTE: Currently only updates the exclusivity. - * * @param queue The queue to update the entry for. * @throws AMQStoreException If the operation fails for any reason. */ @@ -1074,28 +891,30 @@ public abstract class AbstractBDBMessageStore implements MessageStore try { DatabaseEntry key = new DatabaseEntry(); - AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance(); - keyBinding.objectToEntry(queue.getNameShortString(), key); + UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); + keyBinding.objectToEntry(queue.getId(), key); DatabaseEntry value = new DatabaseEntry(); DatabaseEntry newValue = new DatabaseEntry(); - QueueBinding queueBinding = QueueBinding.getInstance(); + ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT); - if(status == OperationStatus.SUCCESS) + OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT); + if (status == OperationStatus.SUCCESS) { - //read the existing record and apply the new exclusivity setting - QueueRecord queueRecord = queueBinding.entryToObject(value); - queueRecord.setExclusive(queue.isExclusive()); - - //write the updated entry to the store - queueBinding.objectToEntry(queueRecord, newValue); + ConfiguredObjectRecord queueRecord = configuredObjectBinding.entryToObject(value); + ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueRecord); - _queueDb.put(null, key, newValue); + // write the updated entry to the store + configuredObjectBinding.objectToEntry(newQueueRecord, newValue); + status = _configuredObjectsDb.put(null, key, newValue); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Error updating queue details within the store: " + status); + } } - else if(status != OperationStatus.NOTFOUND) + else if (status != OperationStatus.NOTFOUND) { - throw new AMQStoreException("Error updating queue details within the store: " + status); + throw new AMQStoreException("Error finding queue details within the store: " + status); } } catch (DatabaseException e) @@ -1113,27 +932,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore */ public void removeQueue(final AMQQueue queue) throws AMQStoreException { - AMQShortString name = queue.getNameShortString(); - + UUID id = queue.getId(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("public void removeQueue(AMQShortString name = " + name + "): called"); + LOGGER.debug("public void removeQueue(AMQShortString name = " + queue.getName() + ", uuid = " + id + "): called"); } - DatabaseEntry key = new DatabaseEntry(); - AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance(); - keyBinding.objectToEntry(name, key); - try + OperationStatus status = removeConfiguredObject(id); + if (status == OperationStatus.NOTFOUND) { - OperationStatus status = _queueDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Queue " + name + " not found"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing deleting with name " + name + " from database: " + e.getMessage(), e); + throw new AMQStoreException("Queue " + queue.getName() + " with id " + id + " not found"); } } @@ -1233,11 +1041,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, long messageId) throws AMQStoreException { - AMQShortString name = AMQShortString.valueOf(queue.getResourceName()); DatabaseEntry key = new DatabaseEntry(); QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey dd = new QueueEntryKey(name, messageId); + QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId); keyBinding.objectToEntry(dd, key); DatabaseEntry value = new DatabaseEntry(); ByteBinding.byteToEntry((byte) 0, value); @@ -1246,15 +1053,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Enqueuing message " + messageId + " on queue " + name + " [Transaction" + tx + "]"); + LOGGER.debug("Enqueuing message " + messageId + " on queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() + + " [Transaction" + tx + "]"); } _deliveryDb.put(tx, key, value); } catch (DatabaseException e) { LOGGER.error("Failed to enqueue: " + e.getMessage(), e); - throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name - + " to database", e); + throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() + + " to database", e); } } @@ -1262,7 +1072,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore * Extracts a message from a specified queue, in a given transaction. * * @param tx The transaction for the operation. - * @param queue The name queue to take the message from. + * @param queue The queue to take the message from. * @param messageId The message to dequeue. * * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. @@ -1270,17 +1080,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, long messageId) throws AMQStoreException { - AMQShortString name = new AMQShortString(queue.getResourceName()); DatabaseEntry key = new DatabaseEntry(); QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey queueEntryKey = new QueueEntryKey(name, messageId); - + QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId); + UUID id = queue.getId(); keyBinding.objectToEntry(queueEntryKey, key); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Dequeue message id " + messageId); + LOGGER.debug("Dequeue message id " + messageId + " from queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); } try @@ -1289,16 +1098,20 @@ public abstract class AbstractBDBMessageStore implements MessageStore OperationStatus status = _deliveryDb.delete(tx, key); if (status == OperationStatus.NOTFOUND) { - throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name); + throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); } else if (status != OperationStatus.SUCCESS) { - throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue " + name); + throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue" + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); } if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Removed message " + messageId + ", " + name + " from delivery db"); + LOGGER.debug("Removed message " + messageId + " on queue " + + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id + + " from delivery db"); } } @@ -1438,7 +1251,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore * * @return a list of message ids for messages enqueued for a particular queue */ - List<Long> getEnqueuedMessages(AMQShortString queueName) throws AMQStoreException + List<Long> getEnqueuedMessages(UUID queueId) throws AMQStoreException { Cursor cursor = null; try @@ -1447,7 +1260,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore DatabaseEntry key = new DatabaseEntry(); - QueueEntryKey dd = new QueueEntryKey(queueName, 0); + QueueEntryKey dd = new QueueEntryKey(queueId, 0); QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); keyBinding.objectToEntry(dd, key); @@ -1459,7 +1272,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT); dd = keyBinding.entryToObject(key); - while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName)) + while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId)) { messageIds.add(dd.getMessageId()); @@ -1644,7 +1457,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset); } - Cursor cursor = null; try { @@ -1706,24 +1518,59 @@ public abstract class AbstractBDBMessageStore implements MessageStore return _messageContentDb; } - Database getQueuesDb() - { - return _queueDb; - } - Database getDeliveryDb() { return _deliveryDb; } - Database getExchangesDb() + /** + * Makes the specified configured object persistent. + * + * @param configuredObject Details of the configured object to store. + * @throws AMQStoreException If the operation fails for any reason. + */ + private void storeConfiguredObjectEntry(ConfiguredObjectRecord configuredObject) throws AMQStoreException { - return _exchangeDb; + if (_stateManager.isInState(State.ACTIVE)) + { + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); + keyBinding.objectToEntry(configuredObject.getId(), key); + + DatabaseEntry value = new DatabaseEntry(); + ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); + + queueBinding.objectToEntry(configuredObject, value); + try + { + OperationStatus status = _configuredObjectsDb.put(null, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Error writing configured object " + configuredObject + " to database: " + + status); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing configured object " + configuredObject + + " to database: " + e.getMessage(), e); + } + } } - Database getBindingsDb() + private OperationStatus removeConfiguredObject(UUID id) throws AMQStoreException { - return _queueBindingsDb; + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); + uuidBinding.objectToEntry(id, key); + try + { + return _configuredObjectsDb.delete(null, key); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error deleting of configured object with id " + id + " from database", e); + } } protected abstract StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index e5aeed57cd..9f7eb4bfd9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -50,9 +50,9 @@ public class BDBMessageStore extends AbstractBDBMessageStore private final CommitThread _commitThread = new CommitThread("Commit-Thread"); @Override - protected void setupStore(File storePath) throws DatabaseException, AMQStoreException + protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException { - super.setupStore(storePath); + super.setupStore(storePath, name); startCommitThread(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java deleted file mode 100644 index b9d868f909..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.entry; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public class BindingRecord extends Object -{ - private final AMQShortString _exchangeName; - private final AMQShortString _queueName; - private final AMQShortString _routingKey; - private final FieldTable _arguments; - - public BindingRecord(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments) - { - _exchangeName = exchangeName; - _queueName = queueName; - _routingKey = routingKey; - _arguments = arguments; - } - - - public AMQShortString getExchangeName() - { - return _exchangeName; - } - - public AMQShortString getQueueName() - { - return _queueName; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - public FieldTable getArguments() - { - return _arguments; - } - -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java deleted file mode 100644 index 180893178d..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.entry; - -import org.apache.qpid.framing.AMQShortString; - -public class ExchangeRecord extends Object -{ - private final AMQShortString _exchangeName; - private final AMQShortString _exchangeType; - private final boolean _autoDelete; - - public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType, boolean autoDelete) - { - _exchangeName = exchangeName; - _exchangeType = exchangeType; - _autoDelete = autoDelete; - } - - public AMQShortString getNameShortString() - { - return _exchangeName; - } - - public AMQShortString getType() - { - return _exchangeType; - } - - public boolean isAutoDelete() - { - return _autoDelete; - } - -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java index a716758da3..e7cf93ff7a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java @@ -20,22 +20,22 @@ */ package org.apache.qpid.server.store.berkeleydb.entry; -import org.apache.qpid.framing.AMQShortString; +import java.util.UUID; public class QueueEntryKey { - private AMQShortString _queueName; + private UUID _queueId; private long _messageId; - public QueueEntryKey(AMQShortString queueName, long messageId) + public QueueEntryKey(UUID queueId, long messageId) { - _queueName = queueName; + _queueId = queueId; _messageId = messageId; } - public AMQShortString getQueueName() + public UUID getQueueId() { - return _queueName; + return _queueId; } public long getMessageId() diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java deleted file mode 100644 index 5ea82427dc..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.entry; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public class QueueRecord extends Object -{ - private final AMQShortString _queueName; - private final AMQShortString _owner; - private final FieldTable _arguments; - private boolean _exclusive; - - public QueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments) - { - _queueName = queueName; - _owner = owner; - _exclusive = exclusive; - _arguments = arguments; - } - - public AMQShortString getNameShortString() - { - return _queueName; - } - - public AMQShortString getOwner() - { - return _owner; - } - - public boolean isExclusive() - { - return _exclusive; - } - - public void setExclusive(boolean exclusive) - { - _exclusive = exclusive; - } - - public FieldTable getArguments() - { - return _arguments; - } - -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java deleted file mode 100644 index b57ffb0169..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuple; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; - -public class AMQShortStringBinding extends TupleBinding<AMQShortString> -{ - private static final AMQShortStringBinding INSTANCE = new AMQShortStringBinding(); - - public static AMQShortStringBinding getInstance() - { - return INSTANCE; - } - - /** private constructor forces getInstance instead */ - private AMQShortStringBinding() { } - - public AMQShortString entryToObject(TupleInput tupleInput) - { - return AMQShortStringEncoding.readShortString(tupleInput); - } - - public void objectToEntry(AMQShortString object, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString(object, tupleOutput); - } -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java new file mode 100644 index 0000000000..8b84a4c9bb --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java @@ -0,0 +1,37 @@ +package org.apache.qpid.server.store.berkeleydb.tuple; + +import org.apache.qpid.server.store.ConfiguredObjectRecord; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord> +{ + private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding(); + + public static ConfiguredObjectBinding getInstance() + { + return INSTANCE; + } + + /** non-public constructor forces getInstance instead */ + private ConfiguredObjectBinding() + { + } + + public ConfiguredObjectRecord entryToObject(TupleInput tupleInput) + { + String type = tupleInput.readString(); + String json = tupleInput.readString(); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(null, type, json); + return configuredObject; + } + + public void objectToEntry(ConfiguredObjectRecord object, TupleOutput tupleOutput) + { + tupleOutput.writeString(object.getType()); + tupleOutput.writeString(object.getAttributes()); + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java deleted file mode 100644 index d4b1475ac7..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuple; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; -import org.apache.qpid.server.store.berkeleydb.entry.ExchangeRecord; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; - -public class ExchangeBinding extends TupleBinding<ExchangeRecord> -{ - private static final ExchangeBinding INSTANCE = new ExchangeBinding(); - - public static ExchangeBinding getInstance() - { - return INSTANCE; - } - - /** private constructor forces getInstance instead */ - private ExchangeBinding() { } - - public ExchangeRecord entryToObject(TupleInput tupleInput) - { - AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput); - - boolean autoDelete = tupleInput.readBoolean(); - - return new ExchangeRecord(name, typeName, autoDelete); - } - - public void objectToEntry(ExchangeRecord exchange, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput); - AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput); - - tupleOutput.writeBoolean(exchange.isAutoDelete()); - } -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index 33bf269880..09f2c50e2d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.store.berkeleydb.tuple; +import java.util.UUID; + import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; @@ -47,7 +49,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction Transaction.Record[] records = new Transaction.Record[input.readInt()]; for(int i = 0; i < records.length; i++) { - records[i] = new RecordImpl(input.readString(), input.readLong()); + records[i] = new RecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong()); } return records; } @@ -71,7 +73,9 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction output.writeInt(records.length); for(Transaction.Record record : records) { - output.writeString(record.getQueue().getResourceName()); + UUID id = record.getQueue().getId(); + output.writeLong(id.getMostSignificantBits()); + output.writeLong(id.getLeastSignificantBits()); output.writeLong(record.getMessage().getMessageNumber()); } } @@ -80,13 +84,13 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage { - private final String _queueName; private long _messageNumber; + private UUID _queueId; - public RecordImpl(String queueName, long messageNumber) + public RecordImpl(UUID queueId, long messageNumber) { - _queueName = queueName; _messageNumber = messageNumber; + _queueId = queueId; } public TransactionLogResource getQueue() @@ -114,9 +118,10 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction throw new UnsupportedOperationException(); } - public String getResourceName() + @Override + public UUID getId() { - return _queueName; + return _queueId; } } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java deleted file mode 100644 index 7e1c63cc28..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuple; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import com.sleepycat.je.DatabaseException; -import org.apache.log4j.Logger; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; -import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; -import org.apache.qpid.server.store.berkeleydb.entry.QueueRecord; - -public class QueueBinding extends TupleBinding<QueueRecord> -{ - private static final Logger _logger = Logger.getLogger(QueueBinding.class); - - private static final QueueBinding INSTANCE = new QueueBinding(); - - public static QueueBinding getInstance() - { - return INSTANCE; - } - - /** private constructor forces getInstance instead */ - private QueueBinding() { } - - public QueueRecord entryToObject(TupleInput tupleInput) - { - try - { - AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput); - // Addition for Version 2 of this table, read the queue arguments - FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput); - // Addition for Version 3 of this table, read the queue exclusivity - boolean exclusive = tupleInput.readBoolean(); - - return new QueueRecord(name, owner, exclusive, arguments); - } - catch (DatabaseException e) - { - _logger.error("Unable to create binding: " + e, e); - return null; - } - - } - - public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput); - AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput); - // Addition for Version 2 of this table, store the queue arguments - FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput); - // Addition for Version 3 of this table, store the queue exclusivity - tupleOutput.writeBoolean(queue.isExclusive()); - } -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java deleted file mode 100644 index 6ba929a541..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuple; - -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; -import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; -import org.apache.qpid.server.store.berkeleydb.entry.BindingRecord; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import com.sleepycat.je.DatabaseException; - -public class QueueBindingTupleBinding extends TupleBinding<BindingRecord> -{ - protected static final Logger _log = Logger.getLogger(QueueBindingTupleBinding.class); - - private static final QueueBindingTupleBinding INSTANCE = new QueueBindingTupleBinding(); - - public static QueueBindingTupleBinding getInstance() - { - return INSTANCE; - } - - /** private constructor forces getInstance instead */ - private QueueBindingTupleBinding() { } - - public BindingRecord entryToObject(TupleInput tupleInput) - { - AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput); - - FieldTable arguments; - - // Addition for Version 2 of this table - try - { - arguments = FieldTableEncoding.readFieldTable(tupleInput); - } - catch (DatabaseException e) - { - _log.error("Unable to create binding: " + e, e); - return null; - } - - return new BindingRecord(exchangeName, queueName, routingKey, arguments); - } - - public void objectToEntry(BindingRecord binding, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput); - AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput); - AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput); - - // Addition for Version 2 of this table - FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput); - } - -}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java index f65df23706..22d0ede31f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.server.store.berkeleydb.tuple; +import java.util.UUID; + import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; public class QueueEntryBinding extends TupleBinding<QueueEntryKey> @@ -43,15 +43,17 @@ public class QueueEntryBinding extends TupleBinding<QueueEntryKey> public QueueEntryKey entryToObject(TupleInput tupleInput) { - AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); + UUID queueId = new UUID(tupleInput.readLong(), tupleInput.readLong()); long messageId = tupleInput.readLong(); - return new QueueEntryKey(queueName, messageId); + return new QueueEntryKey(queueId, messageId); } public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput) { - AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput); + UUID uuid = mk.getQueueId(); + tupleOutput.writeLong(uuid.getMostSignificantBits()); + tupleOutput.writeLong(uuid.getLeastSignificantBits()); tupleOutput.writeLong(mk.getMessageId()); } }
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java index c96c751694..43aa5aa2b4 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java @@ -31,39 +31,33 @@ import com.sleepycat.je.Transaction; public abstract class AbstractStoreUpgrade implements StoreUpgrade { private static final Logger _logger = Logger.getLogger(AbstractStoreUpgrade.class); - protected static final String[] USER_FRIENDLY_NAMES = new String[] { "Exchanges", "Queues", "Queue bindings", - "Message deliveries", "Message metadata", "Message content", "Bridges", "Links", "Distributed transactions" }; - protected void reportFinished(Environment environment, String[] databaseNames, String[] userFriendlyNames) + protected void reportFinished(Environment environment, int version) { - if (_logger.isInfoEnabled()) + _logger.info("Completed upgrade to version " + version); + if (_logger.isDebugEnabled()) { - _logger.info("Upgraded:"); - List<String> databases = environment.getDatabaseNames(); - for (int i = 0; i < databaseNames.length; i++) - { - if (databases.contains(databaseNames[i])) - { - _logger.info(" " + getRowCount(databaseNames[i], environment) + " rows in " + userFriendlyNames[i]); - } - } + _logger.debug("Upgraded:"); + reportDatabaseRowCount(environment); } } + private void reportDatabaseRowCount(Environment environment) + { + List<String> databases = environment.getDatabaseNames(); + for (String database : databases) + { + _logger.debug(" " + getRowCount(database, environment) + " rows in " + database); + } + } - protected void reportStarting(Environment environment, String[] databaseNames, String[] userFriendlyNames) + protected void reportStarting(Environment environment, int version) { - if (_logger.isInfoEnabled()) + _logger.info("Starting store upgrade from version " + version); + if (_logger.isDebugEnabled()) { - _logger.info("Upgrading:"); - List<String> databases = environment.getDatabaseNames(); - for (int i = 0; i < databaseNames.length; i++) - { - if (databases.contains(databaseNames[i])) - { - _logger.info(" " + getRowCount(databaseNames[i], environment) + " rows from " + userFriendlyNames[i]); - } - } + _logger.debug("Upgrading:"); + reportDatabaseRowCount(environment); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java index 42a3173e21..925e40ea93 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java @@ -48,7 +48,7 @@ public abstract class CursorOperation implements DatabaseRunnable CursorOperation.this.processEntry(database, targetDatabase, transaction, key, value); if (getProcessedCount() % 1000 == 0) { - _logger.info("Processed " + getProcessedCount() + " messages of " + getRowCount() + "."); + _logger.info("Processed " + getProcessedCount() + " records of " + getRowCount() + "."); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java index df09726c8e..f73e2e5d78 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java @@ -26,6 +26,6 @@ import org.apache.qpid.AMQStoreException; public interface StoreUpgrade { - void performUpgrade(Environment environment, UpgradeInteractionHandler handler) + void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName) throws DatabaseException, AMQStoreException; } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java index d3e4dfce12..49e5e700c4 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java @@ -71,24 +71,16 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade private static final String OLD_CONTENT_DB_NAME = "messageContentDb_v4"; private static final String NEW_CONTENT_DB_NAME = "messageContentDb_v5"; - private static final String[] OLD_DATABASE_NAMES = new String[] { EXCHANGE_DB_NAME, OLD_QUEUE_DB_NAME, - OLD_BINDINGS_DB_NAME, OLD_DELIVERY_DB, OLD_METADATA_DB_NAME, OLD_CONTENT_DB_NAME, "bridges_v4", "links_v4", - "xids_v4" }; - private static final String[] NEW_DATABASE_NAMES = new String[] { "exchangeDb_v5", NEW_QUEUE_DB_NAME, - NEW_BINDINGS_DB_NAME, NEW_DELIVERY_DB, NEW_METADATA_DB_NAME, NEW_CONTENT_DB_NAME, "bridges_v5", "links_v5", - "xids_v5" }; - private static final byte COLON = (byte) ':'; private static final Logger _logger = Logger.getLogger(UpgradeFrom4To5.class); - public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler) throws DatabaseException, AMQStoreException + public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, String virtualHostName) throws DatabaseException, AMQStoreException { - _logger.info("Starting store upgrade from version 4"); Transaction transaction = null; try { - reportStarting(environment, OLD_DATABASE_NAMES, USER_FRIENDLY_NAMES); + reportStarting(environment, 4); transaction = environment.beginTransaction(null, null); @@ -103,7 +95,7 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade renameRemainingDatabases(environment, handler, transaction); transaction.commit(); - reportFinished(environment, NEW_DATABASE_NAMES, USER_FRIENDLY_NAMES); + reportFinished(environment, 5); } catch (Exception e) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java index 54f6aa6f88..3265fb6823 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java @@ -24,13 +24,30 @@ import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteraction import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteractionResponse.NO; import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteractionResponse.YES; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; +import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; +import org.apache.qpid.server.util.MapJsonSerializer; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.bind.tuple.TupleBinding; @@ -51,26 +68,71 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade private static final Logger _logger = Logger.getLogger(UpgradeFrom5To6.class); - private static final String OLD_CONTENT_DB_NAME = "messageContentDb_v5"; - private static final String NEW_CONTENT_DB_NAME = "MESSAGE_CONTENT"; - private static final String META_DATA_DB_NAME = "messageMetaDataDb_v5"; + static final String OLD_CONTENT_DB_NAME = "messageContentDb_v5"; + static final String NEW_CONTENT_DB_NAME = "MESSAGE_CONTENT"; + static final String NEW_METADATA_DB_NAME = "MESSAGE_METADATA"; + static final String OLD_META_DATA_DB_NAME = "messageMetaDataDb_v5"; + static final String OLD_EXCHANGE_DB_NAME = "exchangeDb_v5"; + static final String OLD_QUEUE_DB_NAME = "queueDb_v5"; + static final String OLD_DELIVERY_DB_NAME = "deliveryDb_v5"; + static final String OLD_QUEUE_BINDINGS_DB_NAME = "queueBindingsDb_v5"; + static final String OLD_XID_DB_NAME = "xids_v5"; + static final String NEW_XID_DB_NAME = "XIDS"; + static final String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; + static final String NEW_DELIVERY_DB_NAME = "QUEUE_ENTRIES"; + static final String NEW_BRIDGES_DB_NAME = "BRIDGES"; + static final String NEW_LINKS_DB_NAME = "LINKS"; + static final String OLD_BRIDGES_DB_NAME = "bridges_v5"; + static final String OLD_LINKS_DB_NAME = "links_v5"; - private static final String NEW_DB_NAMES[] = { "EXCHANGES", "QUEUES", "QUEUE_BINDINGS", "DELIVERIES", - "MESSAGE_METADATA", NEW_CONTENT_DB_NAME, "BRIDGES", "LINKS", "XIDS" }; - private static final String OLD_DB_NAMES[] = { "exchangeDb_v5", "queueDb_v5", "queueBindingsDb_v5", "deliveryDb_v5", - META_DATA_DB_NAME, OLD_CONTENT_DB_NAME, "bridges_v5", "links_v5", "xids_v5" }; + static final String[] DEFAULT_EXCHANGES = { ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), + ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), ExchangeDefaults.FANOUT_EXCHANGE_NAME.asString(), + ExchangeDefaults.HEADERS_EXCHANGE_NAME.asString(), ExchangeDefaults.TOPIC_EXCHANGE_NAME.asString(), + ExchangeDefaults.DIRECT_EXCHANGE_NAME.asString() }; + private static final Set<String> DEFAULT_EXCHANGES_SET = new HashSet<String>(Arrays.asList(DEFAULT_EXCHANGES)); - public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler) throws DatabaseException, AMQStoreException + private MapJsonSerializer _serializer = new MapJsonSerializer(); + + /** + * Upgrades from a v5 database to a v6 database + * + * v6 is the first "new style" schema where we don't version every table, + * and the upgrade is re-runnable + * + * Change in this version: + * + * Message content is moved from the database messageContentDb_v5 to + * MESSAGE_CONTENT. The structure of the database changes from ( message-id: + * long, chunk-id: int ) -> ( size: int, byte[] data ) to ( message-id: + * long) -> ( byte[] data ) + * + * That is we keep only one record per message, which contains all the + * message content + * + * Queue, Exchange, Bindings entries are stored now as configurable objects + * in "CONFIGURED_OBJECTS" table. + */ + public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, String virtualHostName) + throws DatabaseException, AMQStoreException + { + reportStarting(environment, 5); + upgradeMessages(environment, handler); + upgradeConfiguredObjectsAndDependencies(environment, handler, virtualHostName); + renameDatabases(environment, null); + reportFinished(environment, 6); + } + + private void upgradeConfiguredObjectsAndDependencies(Environment environment, UpgradeInteractionHandler handler, String virtualHostName) + throws AMQStoreException { - _logger.info("Starting store upgrade from version 5"); Transaction transaction = null; try { - reportStarting(environment, OLD_DB_NAMES, USER_FRIENDLY_NAMES); transaction = environment.beginTransaction(null, null); - performUpgradeInternal(environment, handler, transaction); + upgradeConfiguredObjects(environment, handler, transaction, virtualHostName); + upgradeQueueEntries(environment, transaction, virtualHostName); + upgradeXidEntries(environment, transaction, virtualHostName); transaction.commit(); - reportFinished(environment, NEW_DB_NAMES, USER_FRIENDLY_NAMES); } catch (Exception e) { @@ -90,22 +152,53 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade } } - /** - * Upgrades from a v5 database to a v6 database - * - * v6 is the first "new style" schema where we don't version every table, and the upgrade is re-runnable - * - * Change in this version: - * - * Message content is moved from the database messageContentDb_v5 to MESSAGE_CONTENT. - * The structure of the database changes from - * ( message-id: long, chunk-id: int ) -> ( size: int, byte[] data ) - * to - * ( message-id: long) -> ( byte[] data ) - * - * That is we keep only one record per message, which contains all the message content - */ - public void performUpgradeInternal(final Environment environment, final UpgradeInteractionHandler handler, + private void upgradeMessages(final Environment environment, final UpgradeInteractionHandler handler) + throws AMQStoreException + { + Transaction transaction = null; + try + { + transaction = environment.beginTransaction(null, null); + upgradeMessages(environment, handler, transaction); + transaction.commit(); + } + catch (Exception e) + { + transaction.abort(); + if (e instanceof DatabaseException) + { + throw (DatabaseException) e; + } + else if (e instanceof AMQStoreException) + { + throw (AMQStoreException) e; + } + else + { + throw new AMQStoreException("Unexpected exception", e); + } + } + } + + private void renameDatabases(Environment environment, Transaction transaction) + { + List<String> databases = environment.getDatabaseNames(); + String[] oldDatabases = { OLD_META_DATA_DB_NAME, OLD_BRIDGES_DB_NAME, OLD_LINKS_DB_NAME }; + String[] newDatabases = { NEW_METADATA_DB_NAME, NEW_BRIDGES_DB_NAME, NEW_LINKS_DB_NAME }; + + for (int i = 0; i < oldDatabases.length; i++) + { + String oldName = oldDatabases[i]; + String newName = newDatabases[i]; + if (databases.contains(oldName)) + { + _logger.info("Renaming " + oldName + " into " + newName); + environment.renameDatabase(transaction, oldName, newName); + } + } + } + + private void upgradeMessages(final Environment environment, final UpgradeInteractionHandler handler, final Transaction transaction) throws AMQStoreException { _logger.info("Message Contents"); @@ -129,27 +222,13 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade metadataDatabase); } }; - new DatabaseTemplate(environment, META_DATA_DB_NAME, contentTransaction).run(metaDataDatabaseOperation); + new DatabaseTemplate(environment, OLD_META_DATA_DB_NAME, contentTransaction) + .run(metaDataDatabaseOperation); _logger.info(metaDataDatabaseOperation.getRowCount() + " Message Content Entries"); } }; new DatabaseTemplate(environment, OLD_CONTENT_DB_NAME, NEW_CONTENT_DB_NAME, transaction).run(contentOperation); - } - renameDatabases(environment, transaction); - } - - private void renameDatabases(Environment environment, Transaction transaction) - { - List<String> databases = environment.getDatabaseNames(); - for (int i = 0; i < OLD_DB_NAMES.length; i++) - { - String oldName = OLD_DB_NAMES[i]; - String newName = NEW_DB_NAMES[i]; - if (databases.contains(oldName)) - { - _logger.info("Renaming " + oldName + " into " + newName); - environment.renameDatabase(transaction, oldName, newName); - } + environment.removeDatabase(transaction, OLD_CONTENT_DB_NAME); } } @@ -221,7 +300,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade DatabaseEntry value = new DatabaseEntry(); dataBinding.objectToEntry(consolidatedData, value); - newDatabase.put(txn, key, value); + put(newDatabase, txn, key, value); } /** @@ -268,6 +347,264 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade return data; } + private void upgradeConfiguredObjects(Environment environment, UpgradeInteractionHandler handler, Transaction transaction, String virtualHostName) + throws AMQStoreException + { + upgradeQueues(environment, transaction, virtualHostName); + upgradeExchanges(environment, transaction, virtualHostName); + upgradeQueueBindings(environment, transaction, handler, virtualHostName); + } + + private void upgradeXidEntries(Environment environment, Transaction transaction, final String virtualHostName) + { + if (environment.getDatabaseNames().contains(OLD_XID_DB_NAME)) + { + _logger.info("Xid Records"); + final OldPreparedTransactionBinding oldTransactionBinding = new OldPreparedTransactionBinding(); + final NewPreparedTransactionBinding newTransactionBinding = new NewPreparedTransactionBinding(); + CursorOperation xidEntriesCursor = new CursorOperation() + { + @Override + public void processEntry(Database oldXidDatabase, Database newXidDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + OldPreparedTransaction oldPreparedTransaction = oldTransactionBinding.entryToObject(value); + OldRecordImpl[] oldDequeues = oldPreparedTransaction.getDequeues(); + OldRecordImpl[] oldEnqueues = oldPreparedTransaction.getEnqueues(); + + NewRecordImpl[] newEnqueues = null; + NewRecordImpl[] newDequeues = null; + if (oldDequeues != null) + { + newDequeues = new NewRecordImpl[oldDequeues.length]; + for (int i = 0; i < newDequeues.length; i++) + { + OldRecordImpl dequeue = oldDequeues[i]; + UUID id = UUIDGenerator.generateUUID(dequeue.getQueueName(), virtualHostName); + newDequeues[i] = new NewRecordImpl(id, dequeue.getMessageNumber()); + } + } + if (oldEnqueues != null) + { + newEnqueues = new NewRecordImpl[oldEnqueues.length]; + for (int i = 0; i < newEnqueues.length; i++) + { + OldRecordImpl enqueue = oldEnqueues[i]; + UUID id = UUIDGenerator.generateUUID(enqueue.getQueueName(), virtualHostName); + newEnqueues[i] = new NewRecordImpl(id, enqueue.getMessageNumber()); + } + } + NewPreparedTransaction newPreparedTransaction = new NewPreparedTransaction(newEnqueues, newDequeues); + DatabaseEntry newValue = new DatabaseEntry(); + newTransactionBinding.objectToEntry(newPreparedTransaction, newValue); + put(newXidDatabase, transaction, key, newValue); + } + }; + new DatabaseTemplate(environment, OLD_XID_DB_NAME, NEW_XID_DB_NAME, transaction).run(xidEntriesCursor); + environment.removeDatabase(transaction, OLD_XID_DB_NAME); + _logger.info(xidEntriesCursor.getRowCount() + " Xid Entries"); + } + } + + private void upgradeQueueEntries(Environment environment, Transaction transaction, final String virtualHostName) + { + _logger.info("Queue Delivery Records"); + if (environment.getDatabaseNames().contains(OLD_DELIVERY_DB_NAME)) + { + final OldQueueEntryBinding oldBinding = new OldQueueEntryBinding(); + final NewQueueEntryBinding newBinding = new NewQueueEntryBinding(); + CursorOperation queueEntriesCursor = new CursorOperation() + { + @Override + public void processEntry(Database oldDeliveryDatabase, Database newDeliveryDatabase, + Transaction transaction, DatabaseEntry key, DatabaseEntry value) + { + OldQueueEntryKey oldEntryRecord = oldBinding.entryToObject(key); + UUID queueId = UUIDGenerator.generateUUID(oldEntryRecord.getQueueName().asString(), virtualHostName); + + NewQueueEntryKey newEntryRecord = new NewQueueEntryKey(queueId, oldEntryRecord.getMessageId()); + DatabaseEntry newKey = new DatabaseEntry(); + newBinding.objectToEntry(newEntryRecord, newKey); + put(newDeliveryDatabase, transaction, newKey, value); + } + }; + new DatabaseTemplate(environment, OLD_DELIVERY_DB_NAME, NEW_DELIVERY_DB_NAME, transaction) + .run(queueEntriesCursor); + environment.removeDatabase(transaction, OLD_DELIVERY_DB_NAME); + _logger.info(queueEntriesCursor.getRowCount() + " Queue Delivery Record Entries"); + } + } + + private void upgradeQueueBindings(Environment environment, Transaction transaction, final UpgradeInteractionHandler handler, final String virtualHostName) + { + _logger.info("Queue Bindings"); + if (environment.getDatabaseNames().contains(OLD_QUEUE_BINDINGS_DB_NAME)) + { + final QueueBindingBinding binding = new QueueBindingBinding(); + CursorOperation bindingCursor = new CursorOperation() + { + @Override + public void processEntry(Database exchangeDatabase, Database configuredObjectsDatabase, + Transaction transaction, DatabaseEntry key, DatabaseEntry value) + { + // TODO: check and remove orphaned bindings + BindingRecord bindingRecord = binding.entryToObject(key); + String exchangeName = bindingRecord.getExchangeName() == null ? ExchangeDefaults.DEFAULT_EXCHANGE_NAME + .asString() : bindingRecord.getExchangeName().asString(); + String queueName = bindingRecord.getQueueName().asString(); + String routingKey = bindingRecord.getRoutingKey().asString(); + FieldTable arguments = bindingRecord.getArguments(); + + UUID bindingId = UUIDGenerator.generateUUID(); + UpgradeConfiguredObjectRecord configuredObject = createBindingConfiguredObjectRecord(exchangeName, queueName, + routingKey, arguments, virtualHostName); + storeConfiguredObjectEntry(configuredObjectsDatabase, bindingId, configuredObject, transaction); + } + + }; + new DatabaseTemplate(environment, OLD_QUEUE_BINDINGS_DB_NAME, CONFIGURED_OBJECTS_DB_NAME, transaction) + .run(bindingCursor); + environment.removeDatabase(transaction, OLD_QUEUE_BINDINGS_DB_NAME); + _logger.info(bindingCursor.getRowCount() + " Queue Binding Entries"); + } + } + + private List<String> upgradeExchanges(Environment environment, Transaction transaction, final String virtualHostName) + { + final List<String> exchangeNames = new ArrayList<String>(); + _logger.info("Exchanges"); + if (environment.getDatabaseNames().contains(OLD_EXCHANGE_DB_NAME)) + { + final ExchangeBinding exchangeBinding = new ExchangeBinding(); + CursorOperation exchangeCursor = new CursorOperation() + { + @Override + public void processEntry(Database exchangeDatabase, Database configuredObjectsDatabase, + Transaction transaction, DatabaseEntry key, DatabaseEntry value) + { + ExchangeRecord exchangeRecord = exchangeBinding.entryToObject(value); + String exchangeName = exchangeRecord.getNameShortString().asString(); + if (!DEFAULT_EXCHANGES_SET.contains(exchangeName)) + { + String exchangeType = exchangeRecord.getType().asString(); + boolean autoDelete = exchangeRecord.isAutoDelete(); + + UUID exchangeId = UUIDGenerator.generateUUID(exchangeName, virtualHostName); + + UpgradeConfiguredObjectRecord configuredObject = createExchangeConfiguredObjectRecord(exchangeName, + exchangeType, autoDelete); + storeConfiguredObjectEntry(configuredObjectsDatabase, exchangeId, configuredObject, transaction); + exchangeNames.add(exchangeName); + } + } + }; + new DatabaseTemplate(environment, OLD_EXCHANGE_DB_NAME, CONFIGURED_OBJECTS_DB_NAME, transaction) + .run(exchangeCursor); + environment.removeDatabase(transaction, OLD_EXCHANGE_DB_NAME); + _logger.info(exchangeCursor.getRowCount() + " Exchange Entries"); + } + return exchangeNames; + } + + private List<String> upgradeQueues(Environment environment, Transaction transaction, final String virtualHostName) + { + final List<String> queueNames = new ArrayList<String>(); + _logger.info("Queues"); + if (environment.getDatabaseNames().contains(OLD_QUEUE_DB_NAME)) + { + final UpgradeQueueBinding queueBinding = new UpgradeQueueBinding(); + CursorOperation queueCursor = new CursorOperation() + { + @Override + public void processEntry(Database queueDatabase, Database configuredObjectsDatabase, + Transaction transaction, DatabaseEntry key, DatabaseEntry value) + { + OldQueueRecord queueRecord = queueBinding.entryToObject(value); + String queueName = queueRecord.getNameShortString().asString(); + queueNames.add(queueName); + String owner = queueRecord.getOwner() == null ? null : queueRecord.getOwner().asString(); + boolean exclusive = queueRecord.isExclusive(); + FieldTable arguments = queueRecord.getArguments(); + + UUID queueId = UUIDGenerator.generateUUID(queueName, virtualHostName); + UpgradeConfiguredObjectRecord configuredObject = createQueueConfiguredObjectRecord(queueName, owner, exclusive, + arguments); + storeConfiguredObjectEntry(configuredObjectsDatabase, queueId, configuredObject, transaction); + } + }; + new DatabaseTemplate(environment, OLD_QUEUE_DB_NAME, CONFIGURED_OBJECTS_DB_NAME, transaction).run(queueCursor); + environment.removeDatabase(transaction, OLD_QUEUE_DB_NAME); + _logger.info(queueCursor.getRowCount() + " Queue Entries"); + } + return queueNames; + } + + private void storeConfiguredObjectEntry(Database configuredObjectsDatabase, UUID id, + UpgradeConfiguredObjectRecord configuredObject, Transaction transaction) + { + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + UpgradeUUIDBinding uuidBinding = new UpgradeUUIDBinding(); + uuidBinding.objectToEntry(id, key); + ConfiguredObjectBinding configuredBinding = new ConfiguredObjectBinding(); + configuredBinding.objectToEntry(configuredObject, value); + put(configuredObjectsDatabase, transaction, key, value); + } + + private UpgradeConfiguredObjectRecord createQueueConfiguredObjectRecord(String queueName, String owner, boolean exclusive, + FieldTable arguments) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); + attributesMap.put(Queue.NAME, queueName); + attributesMap.put(Queue.OWNER, owner); + attributesMap.put(Queue.EXCLUSIVE, exclusive); + if (arguments != null) + { + attributesMap.put("ARGUMENTS", FieldTable.convertToMap(arguments)); + } + String json = _serializer.serialize(attributesMap); + UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json); + return configuredObject; + } + + private UpgradeConfiguredObjectRecord createExchangeConfiguredObjectRecord(String exchangeName, String exchangeType, + boolean autoDelete) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); + attributesMap.put(Exchange.NAME, exchangeName); + attributesMap.put(Exchange.TYPE, exchangeType); + attributesMap.put(Exchange.LIFETIME_POLICY, autoDelete ? LifetimePolicy.AUTO_DELETE.name() + : LifetimePolicy.PERMANENT.name()); + String json = _serializer.serialize(attributesMap); + UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Exchange.class.getName(), json); + return configuredObject; + } + + private UpgradeConfiguredObjectRecord createBindingConfiguredObjectRecord(String exchangeName, String queueName, + String routingKey, FieldTable arguments, String virtualHostName) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); + attributesMap.put(Binding.NAME, routingKey); + attributesMap.put(Binding.EXCHANGE, UUIDGenerator.generateUUID(exchangeName, virtualHostName)); + attributesMap.put(Binding.QUEUE, UUIDGenerator.generateUUID(queueName, virtualHostName)); + if (arguments != null) + { + attributesMap.put(Binding.ARGUMENTS, FieldTable.convertToMap(arguments)); + } + String json = _serializer.serialize(attributesMap); + UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Binding.class.getName(), json); + return configuredObject; + } + + private void put(final Database database, Transaction txn, DatabaseEntry key, DatabaseEntry value) + { + OperationStatus status = database.put(txn, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new RuntimeException("Cannot add record into " + database.getDatabaseName() + ":" + status); + } + } + static final class CompoundKey { public final long _messageId; @@ -367,4 +704,504 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade } } + static class OldQueueRecord extends Object + { + private final AMQShortString _queueName; + private final AMQShortString _owner; + private final FieldTable _arguments; + private boolean _exclusive; + + public OldQueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments) + { + _queueName = queueName; + _owner = owner; + _exclusive = exclusive; + _arguments = arguments; + } + + public AMQShortString getNameShortString() + { + return _queueName; + } + + public AMQShortString getOwner() + { + return _owner; + } + + public boolean isExclusive() + { + return _exclusive; + } + + public void setExclusive(boolean exclusive) + { + _exclusive = exclusive; + } + + public FieldTable getArguments() + { + return _arguments; + } + + } + + static class UpgradeConfiguredObjectRecord + { + private String _attributes; + private String _type; + + public UpgradeConfiguredObjectRecord(String type, String attributes) + { + super(); + _attributes = attributes; + _type = type; + } + + public String getAttributes() + { + return _attributes; + } + + public String getType() + { + return _type; + } + + } + + static class UpgradeQueueBinding extends TupleBinding<OldQueueRecord> + { + public OldQueueRecord entryToObject(TupleInput tupleInput) + { + AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput); + FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput); + boolean exclusive = tupleInput.readBoolean(); + return new OldQueueRecord(name, owner, exclusive, arguments); + } + + public void objectToEntry(OldQueueRecord queue, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput); + AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput); + FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput); + tupleOutput.writeBoolean(queue.isExclusive()); + } + } + + static class UpgradeUUIDBinding extends TupleBinding<UUID> + { + public UUID entryToObject(final TupleInput tupleInput) + { + return new UUID(tupleInput.readLong(), tupleInput.readLong()); + } + + public void objectToEntry(final UUID uuid, final TupleOutput tupleOutput) + { + tupleOutput.writeLong(uuid.getMostSignificantBits()); + tupleOutput.writeLong(uuid.getLeastSignificantBits()); + } + } + + static class ConfiguredObjectBinding extends TupleBinding<UpgradeConfiguredObjectRecord> + { + + public UpgradeConfiguredObjectRecord entryToObject(TupleInput tupleInput) + { + String type = tupleInput.readString(); + String json = tupleInput.readString(); + UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(type, json); + return configuredObject; + } + + public void objectToEntry(UpgradeConfiguredObjectRecord object, TupleOutput tupleOutput) + { + tupleOutput.writeString(object.getType()); + tupleOutput.writeString(object.getAttributes()); + } + + } + + static class ExchangeRecord extends Object + { + private final AMQShortString _exchangeName; + private final AMQShortString _exchangeType; + private final boolean _autoDelete; + + public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType, boolean autoDelete) + { + _exchangeName = exchangeName; + _exchangeType = exchangeType; + _autoDelete = autoDelete; + } + + public AMQShortString getNameShortString() + { + return _exchangeName; + } + + public AMQShortString getType() + { + return _exchangeType; + } + + public boolean isAutoDelete() + { + return _autoDelete; + } + + } + + static class ExchangeBinding extends TupleBinding<ExchangeRecord> + { + + public ExchangeRecord entryToObject(TupleInput tupleInput) + { + AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput); + + boolean autoDelete = tupleInput.readBoolean(); + + return new ExchangeRecord(name, typeName, autoDelete); + } + + public void objectToEntry(ExchangeRecord exchange, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput); + AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput); + + tupleOutput.writeBoolean(exchange.isAutoDelete()); + } + } + + static class BindingRecord extends Object + { + private final AMQShortString _exchangeName; + private final AMQShortString _queueName; + private final AMQShortString _routingKey; + private final FieldTable _arguments; + + public BindingRecord(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, + FieldTable arguments) + { + _exchangeName = exchangeName; + _queueName = queueName; + _routingKey = routingKey; + _arguments = arguments; + } + + public AMQShortString getExchangeName() + { + return _exchangeName; + } + + public AMQShortString getQueueName() + { + return _queueName; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + + public FieldTable getArguments() + { + return _arguments; + } + + } + + static class QueueBindingBinding extends TupleBinding<BindingRecord> + { + + public BindingRecord entryToObject(TupleInput tupleInput) + { + AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput); + + FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput); + + return new BindingRecord(exchangeName, queueName, routingKey, arguments); + } + + public void objectToEntry(BindingRecord binding, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput); + AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput); + AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput); + + FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput); + } + } + + static class OldQueueEntryKey + { + private AMQShortString _queueName; + private long _messageId; + + public OldQueueEntryKey(AMQShortString queueName, long messageId) + { + _queueName = queueName; + _messageId = messageId; + } + + public AMQShortString getQueueName() + { + return _queueName; + } + + public long getMessageId() + { + return _messageId; + } + } + + static class OldQueueEntryBinding extends TupleBinding<OldQueueEntryKey> + { + + public OldQueueEntryKey entryToObject(TupleInput tupleInput) + { + AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); + long messageId = tupleInput.readLong(); + + return new OldQueueEntryKey(queueName, messageId); + } + + public void objectToEntry(OldQueueEntryKey mk, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(mk.getQueueName(), tupleOutput); + tupleOutput.writeLong(mk.getMessageId()); + } + } + + static class NewQueueEntryKey + { + private UUID _queueId; + private long _messageId; + + public NewQueueEntryKey(UUID queueId, long messageId) + { + _queueId = queueId; + _messageId = messageId; + } + + public UUID getQueueId() + { + return _queueId; + } + + public long getMessageId() + { + return _messageId; + } + } + + static class NewQueueEntryBinding extends TupleBinding<NewQueueEntryKey> + { + + public NewQueueEntryKey entryToObject(TupleInput tupleInput) + { + UUID queueId = new UUID(tupleInput.readLong(), tupleInput.readLong()); + long messageId = tupleInput.readLong(); + + return new NewQueueEntryKey(queueId, messageId); + } + + public void objectToEntry(NewQueueEntryKey mk, TupleOutput tupleOutput) + { + UUID uuid = mk.getQueueId(); + tupleOutput.writeLong(uuid.getMostSignificantBits()); + tupleOutput.writeLong(uuid.getLeastSignificantBits()); + tupleOutput.writeLong(mk.getMessageId()); + } + } + + static class NewPreparedTransaction + { + private final NewRecordImpl[] _enqueues; + private final NewRecordImpl[] _dequeues; + + public NewPreparedTransaction(NewRecordImpl[] enqueues, NewRecordImpl[] dequeues) + { + _enqueues = enqueues; + _dequeues = dequeues; + } + + public NewRecordImpl[] getEnqueues() + { + return _enqueues; + } + + public NewRecordImpl[] getDequeues() + { + return _dequeues; + } + } + + static class NewRecordImpl + { + + private long _messageNumber; + private UUID _queueId; + + public NewRecordImpl(UUID queueId, long messageNumber) + { + _messageNumber = messageNumber; + _queueId = queueId; + } + + public long getMessageNumber() + { + return _messageNumber; + } + + public UUID getId() + { + return _queueId; + } + } + + static class NewPreparedTransactionBinding extends TupleBinding<NewPreparedTransaction> + { + @Override + public NewPreparedTransaction entryToObject(TupleInput input) + { + NewRecordImpl[] enqueues = readRecords(input); + + NewRecordImpl[] dequeues = readRecords(input); + + return new NewPreparedTransaction(enqueues, dequeues); + } + + private NewRecordImpl[] readRecords(TupleInput input) + { + NewRecordImpl[] records = new NewRecordImpl[input.readInt()]; + for (int i = 0; i < records.length; i++) + { + records[i] = new NewRecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong()); + } + return records; + } + + @Override + public void objectToEntry(NewPreparedTransaction preparedTransaction, TupleOutput output) + { + writeRecords(preparedTransaction.getEnqueues(), output); + writeRecords(preparedTransaction.getDequeues(), output); + } + + private void writeRecords(NewRecordImpl[] records, TupleOutput output) + { + if (records == null) + { + output.writeInt(0); + } + else + { + output.writeInt(records.length); + for (NewRecordImpl record : records) + { + UUID id = record.getId(); + output.writeLong(id.getMostSignificantBits()); + output.writeLong(id.getLeastSignificantBits()); + output.writeLong(record.getMessageNumber()); + } + } + } + } + + static class OldRecordImpl + { + + private long _messageNumber; + private String _queueName; + + public OldRecordImpl(String queueName, long messageNumber) + { + _messageNumber = messageNumber; + _queueName = queueName; + } + + public long getMessageNumber() + { + return _messageNumber; + } + + public String getQueueName() + { + return _queueName; + } + } + + static class OldPreparedTransaction + { + private final OldRecordImpl[] _enqueues; + private final OldRecordImpl[] _dequeues; + + public OldPreparedTransaction(OldRecordImpl[] enqueues, OldRecordImpl[] dequeues) + { + _enqueues = enqueues; + _dequeues = dequeues; + } + + public OldRecordImpl[] getEnqueues() + { + return _enqueues; + } + + public OldRecordImpl[] getDequeues() + { + return _dequeues; + } + } + + static class OldPreparedTransactionBinding extends TupleBinding<OldPreparedTransaction> + { + @Override + public OldPreparedTransaction entryToObject(TupleInput input) + { + OldRecordImpl[] enqueues = readRecords(input); + + OldRecordImpl[] dequeues = readRecords(input); + + return new OldPreparedTransaction(enqueues, dequeues); + } + + private OldRecordImpl[] readRecords(TupleInput input) + { + OldRecordImpl[] records = new OldRecordImpl[input.readInt()]; + for (int i = 0; i < records.length; i++) + { + records[i] = new OldRecordImpl(input.readString(), input.readLong()); + } + return records; + } + + @Override + public void objectToEntry(OldPreparedTransaction preparedTransaction, TupleOutput output) + { + writeRecords(preparedTransaction.getEnqueues(), output); + writeRecords(preparedTransaction.getDequeues(), output); + } + + private void writeRecords(OldRecordImpl[] records, TupleOutput output) + { + if (records == null) + { + output.writeInt(0); + } + else + { + output.writeInt(records.length); + for (OldRecordImpl record : records) + { + output.writeString(record.getQueueName()); + output.writeLong(record.getMessageNumber()); + } + } + } + } }
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java index 77455c2ea1..e71e39cbb8 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java @@ -38,13 +38,15 @@ import com.sleepycat.je.OperationStatus; public class Upgrader { - static final String VERSION_DB_NAME = "VERSION"; + static final String VERSION_DB_NAME = "DB_VERSION"; private Environment _environment; + private String _virtualHostName; - public Upgrader(Environment environment) + public Upgrader(Environment environment, String virtualHostName) { _environment = environment; + _virtualHostName = virtualHostName; } public void upgradeIfNecessary() throws AMQStoreException @@ -125,7 +127,7 @@ public class Upgrader + "UpgradeFrom"+fromVersion+"To"+toVersion); Constructor<StoreUpgrade> ctr = upgradeClass.getConstructor(); StoreUpgrade upgrade = ctr.newInstance(); - upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER); + upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _virtualHostName); } catch (ClassNotFoundException e) { diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java new file mode 100644 index 0000000000..687c671566 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java @@ -0,0 +1,14 @@ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.server.store.DurableConfigurationStoreTest; +import org.apache.qpid.server.store.MessageStore; + +public class BDBMessageStoreConfigurationTest extends DurableConfigurationStoreTest +{ + @Override + protected MessageStore createStore() throws Exception + { + return new BDBMessageStore(); + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 18e20f2ee8..a318187f13 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import java.util.UUID; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -36,6 +37,7 @@ import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.message.MessageMetaData_0_10; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.store.MessageMetaDataType; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -405,13 +407,13 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto BDBMessageStore bdbStore = assertBDBStore(log); - final AMQShortString mockQueueName = new AMQShortString("queueName"); - + final UUID mockQueueId = UUIDGenerator.generateUUID(); TransactionLogResource mockQueue = new TransactionLogResource() { - public String getResourceName() + @Override + public UUID getId() { - return mockQueueName.asString(); + return mockQueueId; } }; @@ -421,7 +423,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto txn.enqueueMessage(mockQueue, new MockMessage(5L)); txn.commitTran(); - List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); + List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); Long val = enqueuedIds.get(0); @@ -443,13 +445,13 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto BDBMessageStore bdbStore = assertBDBStore(log); - final AMQShortString mockQueueName = new AMQShortString("queueName"); - + final UUID mockQueueId = UUIDGenerator.generateUUID(); TransactionLogResource mockQueue = new TransactionLogResource() { - public String getResourceName() + @Override + public UUID getId() { - return mockQueueName.asString(); + return mockQueueId; } }; @@ -463,7 +465,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto txn.enqueueMessage(mockQueue, new MockMessage(23L)); txn.commitTran(); - List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); + List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); Long val = enqueuedIds.get(0); @@ -484,13 +486,13 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto BDBMessageStore bdbStore = assertBDBStore(log); - final AMQShortString mockQueueName = new AMQShortString("queueName"); - + final UUID mockQueueId = UUIDGenerator.generateUUID(); TransactionLogResource mockQueue = new TransactionLogResource() { - public String getResourceName() + @Override + public UUID getId() { - return mockQueueName.asString(); + return mockQueueId; } }; @@ -507,7 +509,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto txn.enqueueMessage(mockQueue, new MockMessage(32L)); txn.commitTran(); - List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); + List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); Long val = enqueuedIds.get(0); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java new file mode 100644 index 0000000000..f8aeb7f7b0 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuple; + +import junit.framework.TestCase; + +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.store.ConfiguredObjectRecord; + +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +public class ConfiguredObjectBindingTest extends TestCase +{ + + private ConfiguredObjectRecord _object; + + private static final String DUMMY_ATTRIBUTES_STRING = "dummyAttributes"; + private static final String DUMMY_TYPE_STRING = "dummyType"; + private ConfiguredObjectBinding _configuredObjectBinding; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _configuredObjectBinding = ConfiguredObjectBinding.getInstance(); + _object = new ConfiguredObjectRecord(UUIDGenerator.generateUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_STRING); + } + + public void testObjectToEntryAndEntryToObject() + { + TupleOutput tupleOutput = new TupleOutput(); + + _configuredObjectBinding.objectToEntry(_object, tupleOutput); + + byte[] entryAsBytes = tupleOutput.getBufferBytes(); + TupleInput tupleInput = new TupleInput(entryAsBytes); + + ConfiguredObjectRecord storedObject = _configuredObjectBinding.entryToObject(tupleInput); + assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_STRING, storedObject.getAttributes()); + assertEquals("Unexpected type", DUMMY_TYPE_STRING, storedObject.getType()); + } +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java index 6df2f8a8db..36991b90d0 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java @@ -22,10 +22,9 @@ package org.apache.qpid.server.store.berkeleydb.upgrade; import java.io.File; -import junit.framework.TestCase; - import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.subjects.TestBlankSubject; +import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; import com.sleepycat.je.Database; @@ -33,7 +32,7 @@ import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.Transaction; -public abstract class AbstractUpgradeTestCase extends TestCase +public abstract class AbstractUpgradeTestCase extends QpidTestCase { protected static final class StaticAnswerHandler implements UpgradeInteractionHandler { @@ -57,7 +56,6 @@ public abstract class AbstractUpgradeTestCase extends TestCase public static int[] QUEUE_SIZES = { 1, 1, 10, 3 }; public static int TOTAL_MESSAGE_NUMBER = 15; protected static final LogSubject LOG_SUBJECT = new TestBlankSubject(); - protected static final String TMP_FOLDER = System.getProperty("java.io.tmpdir"); // one binding per exchange protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2; @@ -148,4 +146,8 @@ public abstract class AbstractUpgradeTestCase extends TestCase return count.longValue(); } + public String getVirtualHostName() + { + return getName(); + } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java index c9103b1997..3f9e4e4aa1 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java @@ -66,7 +66,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase public void testPerformUpgradeWithHandlerAnsweringYes() throws Exception { UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES)); + upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHostName()); assertQueues(new HashSet<String>(Arrays.asList(QUEUE_NAMES))); @@ -93,7 +93,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase public void testPerformUpgradeWithHandlerAnsweringNo() throws Exception { UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO)); + upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHostName()); assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE))); assertDatabaseRecordCount(DELIVERY_DB_NAME, 12); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java index d73a777ca6..5297692820 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java @@ -20,14 +20,50 @@ */ package org.apache.qpid.server.store.berkeleydb.upgrade; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CONFIGURED_OBJECTS_DB_NAME; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_CONTENT_DB_NAME; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_DELIVERY_DB_NAME; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_METADATA_DB_NAME; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_XID_DB_NAME; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_CONTENT_DB_NAME; +import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_XID_DB_NAME; + +import java.io.File; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + import org.apache.log4j.Logger; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.store.berkeleydb.entry.Xid; +import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.ConfiguredObjectBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeConfiguredObjectRecord; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewDataBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransaction; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransactionBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewQueueEntryBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewQueueEntryKey; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewRecordImpl; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransaction; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransactionBinding; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldRecordImpl; +import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeUUIDBinding; +import org.apache.qpid.server.util.MapJsonSerializer; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.Environment; +import com.sleepycat.je.LockMode; import com.sleepycat.je.Transaction; public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase @@ -43,10 +79,13 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase public void testPerformUpgrade() throws Exception { UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER); + upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHostName()); assertDatabaseRecordCounts(); assertContent(); + + assertConfiguredObjects(); + assertQueueEntries(); } public void testPerformUpgradeWithMissingMessageChunkKeepsIncompleteMessage() throws Exception @@ -54,9 +93,12 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase corruptDatabase(); UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES)); + upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHostName()); assertDatabaseRecordCounts(); + + assertConfiguredObjects(); + assertQueueEntries(); } public void testPerformUpgradeWithMissingMessageChunkDiscardsIncompleteMessage() throws Exception @@ -67,10 +109,117 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase UpgradeInteractionHandler discardMessageInteractionHandler = new StaticAnswerHandler(UpgradeInteractionResponse.NO); - upgrade.performUpgrade(_environment, discardMessageInteractionHandler); + upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHostName()); + + assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 11); + assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 11); + + assertConfiguredObjects(); + assertQueueEntries(); + } + + public void testPerformXidUpgrade() throws Exception + { + File storeLocation = new File(TMP_FOLDER, getName()); + storeLocation.mkdirs(); + Environment environment = createEnvironment(storeLocation); + try + { + populateOldXidEntries(environment); + UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); + upgrade.performUpgrade(environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHostName()); + assertXidEntries(environment); + } + finally + { + try + { + environment.close(); + } + finally + { + deleteDirectoryIfExists(storeLocation); + } + + } + } + + private void assertXidEntries(Environment environment) + { + final DatabaseEntry value = new DatabaseEntry(); + final DatabaseEntry key = getXidKey(); + new DatabaseTemplate(environment, NEW_XID_DB_NAME, null).run(new DatabaseRunnable() + { + + @Override + public void run(Database xidDatabase, Database nullDatabase, Transaction transaction) + { + xidDatabase.get(null, key, value, LockMode.DEFAULT); + } + }); + NewPreparedTransactionBinding newBinding = new NewPreparedTransactionBinding(); + NewPreparedTransaction newTransaction = newBinding.entryToObject(value); + NewRecordImpl[] newEnqueues = newTransaction.getEnqueues(); + NewRecordImpl[] newDequeues = newTransaction.getDequeues(); + assertEquals("Unxpected new enqueus number", 1, newEnqueues.length); + NewRecordImpl enqueue = newEnqueues[0]; + assertEquals("Unxpected queue id", UUIDGenerator.generateUUID("TEST1", getVirtualHostName()), enqueue.getId()); + assertEquals("Unxpected message id", 1, enqueue.getMessageNumber()); + assertEquals("Unxpected new dequeues number", 1, newDequeues.length); + NewRecordImpl dequeue = newDequeues[0]; + assertEquals("Unxpected queue id", UUIDGenerator.generateUUID("TEST2", getVirtualHostName()), dequeue.getId()); + assertEquals("Unxpected message id", 2, dequeue.getMessageNumber()); + } + + private void populateOldXidEntries(Environment environment) + { - assertDatabaseRecordCount("MESSAGE_METADATA", 11); - assertDatabaseRecordCount("MESSAGE_CONTENT", 11); + final DatabaseEntry value = new DatabaseEntry(); + OldRecordImpl[] enqueues = { new OldRecordImpl("TEST1", 1) }; + OldRecordImpl[] dequeues = { new OldRecordImpl("TEST2", 2) }; + OldPreparedTransaction oldPreparedTransaction = new OldPreparedTransaction(enqueues, dequeues); + OldPreparedTransactionBinding oldPreparedTransactionBinding = new OldPreparedTransactionBinding(); + oldPreparedTransactionBinding.objectToEntry(oldPreparedTransaction, value); + + final DatabaseEntry key = getXidKey(); + new DatabaseTemplate(environment, OLD_XID_DB_NAME, null).run(new DatabaseRunnable() + { + + @Override + public void run(Database xidDatabase, Database nullDatabase, Transaction transaction) + { + xidDatabase.put(null, key, value); + } + }); + } + + protected DatabaseEntry getXidKey() + { + final DatabaseEntry value = new DatabaseEntry(); + byte[] globalId = { 1 }; + byte[] branchId = { 2 }; + Xid xid = new Xid(1l, globalId, branchId); + XidBinding xidBinding = XidBinding.getInstance(); + xidBinding.objectToEntry(xid, value); + return value; + } + + private void assertQueueEntries() + { + final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); + final NewQueueEntryBinding newBinding = new NewQueueEntryBinding(); + CursorOperation cursorOperation = new CursorOperation() + { + + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + NewQueueEntryKey newEntryRecord = newBinding.entryToObject(key); + assertTrue("Unexpected queue id", configuredObjects.containsKey(newEntryRecord.getQueueId())); + } + }; + new DatabaseTemplate(_environment, NEW_DELIVERY_DB_NAME, null).run(cursorOperation); } /** @@ -105,19 +254,124 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase }; Transaction transaction = _environment.beginTransaction(null, null); - new DatabaseTemplate(_environment, "messageContentDb_v5", transaction).run(cursorOperation); + new DatabaseTemplate(_environment, OLD_CONTENT_DB_NAME, transaction).run(cursorOperation); transaction.commit(); } private void assertDatabaseRecordCounts() { - assertDatabaseRecordCount("EXCHANGES", 5); - assertDatabaseRecordCount("QUEUES", 3); - assertDatabaseRecordCount("QUEUE_BINDINGS", 6); - assertDatabaseRecordCount("DELIVERIES", 12); + assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 9); + assertDatabaseRecordCount(NEW_DELIVERY_DB_NAME, 12); - assertDatabaseRecordCount("MESSAGE_METADATA", 12); - assertDatabaseRecordCount("MESSAGE_CONTENT", 12); + assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12); + assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 12); + } + + private void assertConfiguredObjects() + { + Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); + assertEquals("Unexpected number of configured objects", 9, configuredObjects.size()); + + Set<Map<String, Object>> expected = new HashSet<Map<String, Object>>(9); + Map<String, Object> queue1 = new HashMap<String, Object>(); + queue1.put("exclusive", Boolean.FALSE); + queue1.put("name", "myUpgradeQueue"); + queue1.put("owner", null); + expected.add(queue1); + Map<String, Object> queue2 = new HashMap<String, Object>(); + queue2.put("exclusive", Boolean.TRUE); + queue2.put("name", "clientid:mySelectorDurSubName"); + queue2.put("owner", "clientid"); + expected.add(queue2); + Map<String, Object> queue3 = new HashMap<String, Object>(); + queue3.put("exclusive", Boolean.TRUE); + queue3.put("name", "clientid:myDurSubName"); + queue3.put("owner", "clientid"); + expected.add(queue3); + + Map<String, Object> queueBinding1 = new HashMap<String, Object>(); + queueBinding1.put("queue", UUIDGenerator.generateUUID("myUpgradeQueue", getVirtualHostName()).toString()); + queueBinding1.put("name", "myUpgradeQueue"); + queueBinding1.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString()); + expected.add(queueBinding1); + Map<String, Object> queueBinding2 = new HashMap<String, Object>(); + queueBinding2.put("queue", UUIDGenerator.generateUUID("myUpgradeQueue", getVirtualHostName()).toString()); + queueBinding2.put("name", "myUpgradeQueue"); + queueBinding2.put("exchange", UUIDGenerator.generateUUID("amq.direct", getVirtualHostName()).toString()); + Map<String, Object> arguments2 = new HashMap<String, Object>(); + arguments2.put("x-filter-jms-selector", ""); + queueBinding2.put("arguments", arguments2); + expected.add(queueBinding2); + Map<String, Object> queueBinding3 = new HashMap<String, Object>(); + queueBinding3.put("queue", UUIDGenerator.generateUUID("clientid:myDurSubName", getVirtualHostName()).toString()); + queueBinding3.put("name", "myUpgradeTopic"); + queueBinding3.put("exchange", UUIDGenerator.generateUUID("amq.topic", getVirtualHostName()).toString()); + Map<String, Object> arguments3 = new HashMap<String, Object>(); + arguments3.put("x-filter-jms-selector", ""); + queueBinding3.put("arguments", arguments3); + expected.add(queueBinding3); + Map<String, Object> queueBinding4 = new HashMap<String, Object>(); + queueBinding4.put("queue", UUIDGenerator.generateUUID("clientid:mySelectorDurSubName", getVirtualHostName()).toString()); + queueBinding4.put("name", "mySelectorUpgradeTopic"); + queueBinding4.put("exchange", UUIDGenerator.generateUUID("amq.topic", getVirtualHostName()).toString()); + Map<String, Object> arguments4 = new HashMap<String, Object>(); + arguments4.put("x-filter-jms-selector", "testprop='true'"); + queueBinding4.put("arguments", arguments4); + expected.add(queueBinding4); + Map<String, Object> queueBinding5 = new HashMap<String, Object>(); + queueBinding5.put("queue", UUIDGenerator.generateUUID("clientid:myDurSubName", getVirtualHostName()).toString()); + queueBinding5.put("name", "clientid:myDurSubName"); + queueBinding5.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString()); + expected.add(queueBinding5); + Map<String, Object> queueBinding6 = new HashMap<String, Object>(); + queueBinding6.put("queue", UUIDGenerator.generateUUID("clientid:mySelectorDurSubName", getVirtualHostName()).toString()); + queueBinding6.put("name", "clientid:mySelectorDurSubName"); + queueBinding6.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString()); + expected.add(queueBinding6); + + Set<String> expectedTypes = new HashSet<String>(); + expectedTypes.add(Queue.class.getName()); + expectedTypes.add(Exchange.class.getName()); + expectedTypes.add(Binding.class.getName()); + MapJsonSerializer jsonSerializer = new MapJsonSerializer(); + for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet()) + { + UpgradeConfiguredObjectRecord object = entry.getValue(); + UUID key = entry.getKey(); + Map<String, Object> deserialized = jsonSerializer.deserialize(object.getAttributes()); + assertTrue("Unexpected entry:" + object.getAttributes(), expected.remove(deserialized)); + String type = object.getType(); + assertTrue("Unexpected type:" + type, expectedTypes.contains(type)); + if (type.equals(Exchange.class.getName()) || type.equals(Queue.class.getName())) + { + assertEquals("Unexpected key", key, UUIDGenerator.generateUUID(((String) deserialized.get("name")), getVirtualHostName())); + } + else + { + assertNotNull("Key cannot be null", key); + } + } + assertTrue("Not all expected configured objects found:" + expected, expected.isEmpty()); + } + + private Map<UUID, UpgradeConfiguredObjectRecord> loadConfiguredObjects() + { + final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjectsRecords = new HashMap<UUID, UpgradeConfiguredObjectRecord>(); + final ConfiguredObjectBinding binding = new ConfiguredObjectBinding(); + final UpgradeUUIDBinding uuidBinding = new UpgradeUUIDBinding(); + CursorOperation configuredObjectsCursor = new CursorOperation() + { + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + UUID id = uuidBinding.entryToObject(key); + UpgradeConfiguredObjectRecord object = binding.entryToObject(value); + configuredObjectsRecords.put(id, object); + } + }; + new DatabaseTemplate(_environment, CONFIGURED_OBJECTS_DB_NAME, null).run(configuredObjectsCursor); + return configuredObjectsRecords; } private void assertContent() @@ -127,8 +381,8 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase { @Override - public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key, - DatabaseEntry value) + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) { long id = LongBinding.entryToLong(key); assertTrue("Unexpected id", id > 0); @@ -136,6 +390,6 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase assertNotNull("Unexpected content", content); } }; - new DatabaseTemplate(_environment, "MESSAGE_CONTENT", null).run(contentCursorOperation); + new DatabaseTemplate(_environment, NEW_CONTENT_DB_NAME, null).run(contentCursorOperation); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java index 99c4b7ab5b..ba5ca842bf 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java @@ -50,7 +50,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase public void setUp() throws Exception { super.setUp(); - _upgrader = new Upgrader(_environment); + _upgrader = new Upgrader(_environment, getVirtualHostName()); } private int getStoreVersion() @@ -105,12 +105,12 @@ public class UpgraderTest extends AbstractUpgradeTestCase nonExistentStoreLocation.mkdir(); _environment = createEnvironment(nonExistentStoreLocation); - _upgrader = new Upgrader(_environment); + _upgrader = new Upgrader(_environment, getVirtualHostName()); _upgrader.upgradeIfNecessary(); List<String> databaseNames = _environment.getDatabaseNames(); List<String> expectedDatabases = new ArrayList<String>(); - expectedDatabases.add("VERSION"); + expectedDatabases.add(Upgrader.VERSION_DB_NAME); assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames); assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion()); |
