summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-04-17 09:01:44 +0000
committerRobert Gemmell <robbie@apache.org>2012-04-17 09:01:44 +0000
commit3203eea7641e1b0f39de96d797db7c54423b7f02 (patch)
treef2563ba4a85ac54765d8f62663b60853846b3a89 /qpid/java/bdbstore
parentdeab61acfe5f4edaae121cf6b9fa5d4b9e42803f (diff)
downloadqpid-python-3203eea7641e1b0f39de96d797db7c54423b7f02.tar.gz
QPID-3923: Store queue, exchange and binding as configured objects in bdb store
Applied patch by Oleksandr Rudyy <orudyy@gmail.com>, Phil Harvey <phil@philharveyonline.com>, and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java443
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java62
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java53
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java12
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java66
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java51
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java37
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java60
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java19
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java79
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java81
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java12
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java42
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java14
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java929
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java8
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java14
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java32
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java61
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java10
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java4
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java284
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java6
26 files changed, 1492 insertions, 895 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 439ec8ac4f..fb1d7c5265 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -46,8 +46,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
@@ -57,6 +57,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
+import org.apache.qpid.server.store.ConfiguredObjectHelper;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
@@ -73,19 +74,14 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.berkeleydb.entry.BindingRecord;
-import org.apache.qpid.server.store.berkeleydb.entry.ExchangeRecord;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.entry.QueueRecord;
import org.apache.qpid.server.store.berkeleydb.entry.Xid;
-import org.apache.qpid.server.store.berkeleydb.tuple.AMQShortStringBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.ExchangeBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.QueueBinding;
-import org.apache.qpid.server.store.berkeleydb.tuple.QueueBindingTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.StringMapBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
@@ -104,23 +100,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private Environment _environment;
+ private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
- private String QUEUEBINDINGSDB_NAME = "QUEUE_BINDINGS";
- private String DELIVERYDB_NAME = "DELIVERIES";
- private String EXCHANGEDB_NAME = "EXCHANGES";
- private String QUEUEDB_NAME = "QUEUES";
+ private String DELIVERYDB_NAME = "QUEUE_ENTRIES";
private String BRIDGEDB_NAME = "BRIDGES";
private String LINKDB_NAME = "LINKS";
private String XIDDB_NAME = "XIDS";
-
+ private Database _configuredObjectsDb;
private Database _messageMetaDataDb;
private Database _messageContentDb;
- private Database _queueBindingsDb;
private Database _deliveryDb;
- private Database _exchangeDb;
- private Database _queueDb;
private Database _bridgeDb;
private Database _linkDb;
private Database _xidDb;
@@ -165,6 +156,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private final EventManager _eventManager = new EventManager();
private String _storeLocation;
+ private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
+
public AbstractBDBMessageStore()
{
_stateManager = new StateManager(_eventManager);
@@ -239,7 +232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
LOGGER.info("Configuring BDB message store");
- setupStore(environmentPath);
+ setupStore(environmentPath, name);
}
/**
@@ -257,11 +250,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_stateManager.attainState(State.ACTIVE);
}
- protected void setupStore(File storePath) throws DatabaseException, AMQStoreException
+ protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException
{
_environment = createEnvironment(storePath);
- new Upgrader(_environment).upgradeIfNecessary();
+ new Upgrader(_environment, name).upgradeIfNecessary();
openDatabases();
}
@@ -326,10 +319,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
//This is required if we are wanting read only access.
dbConfig.setReadOnly(false);
+ _configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig);
_messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig);
- _queueDb = openDatabase(QUEUEDB_NAME, dbConfig);
- _exchangeDb = openDatabase(EXCHANGEDB_NAME, dbConfig);
- _queueBindingsDb = openDatabase(QUEUEBINDINGSDB_NAME, dbConfig);
_messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig);
_deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig);
_linkDb = openDatabase(LINKDB_NAME, dbConfig);
@@ -376,23 +367,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_messageContentDb.close();
}
- if (_exchangeDb != null)
- {
- LOGGER.info("Closing exchange database");
- _exchangeDb.close();
- }
-
- if (_queueBindingsDb != null)
- {
- LOGGER.info("Closing bindings database");
- _queueBindingsDb.close();
- }
-
- if (_queueDb != null)
- {
- LOGGER.info("Closing queue database");
- _queueDb.close();
- }
+ if (_configuredObjectsDb != null)
+ {
+ LOGGER.info("Closing configurable objects database");
+ _configuredObjectsDb.close();
+ }
if (_deliveryDb != null)
{
@@ -440,14 +419,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
try
{
+ List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
QueueRecoveryHandler qrh = recoveryHandler.begin(this);
- loadQueues(qrh);
+ _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
- loadExchanges(erh);
+ _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
BindingRecoveryHandler brh = erh.completeExchangeRecovery();
- recoverBindings(brh);
+ _configuredObjectHelper.recoverBindings(brh, configuredObjects);
ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
recoverBrokerLinks(lrh);
@@ -459,29 +439,21 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
- private void loadQueues(QueueRecoveryHandler qrh) throws DatabaseException
+ private List<ConfiguredObjectRecord> loadConfiguredObjects() throws DatabaseException
{
Cursor cursor = null;
-
+ List<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
try
{
- cursor = _queueDb.openCursor(null, null);
+ cursor = _configuredObjectsDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- QueueBinding binding = QueueBinding.getInstance();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- QueueRecord queueRecord = binding.entryToObject(value);
-
- String queueName = queueRecord.getNameShortString() == null ? null :
- queueRecord.getNameShortString().asString();
- String owner = queueRecord.getOwner() == null ? null :
- queueRecord.getOwner().asString();
- boolean exclusive = queueRecord.isExclusive();
-
- FieldTable arguments = queueRecord.getArguments();
-
- qrh.queue(queueName, owner, exclusive, arguments);
+ ConfiguredObjectRecord configuredObject = ConfiguredObjectBinding.getInstance().entryToObject(value);
+ UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+ configuredObject.setId(id);
+ results.add(configuredObject);
}
}
@@ -489,6 +461,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
closeCursorSafely(cursor);
}
+ return results;
}
private void closeCursorSafely(Cursor cursor)
@@ -499,74 +472,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
-
- private void loadExchanges(ExchangeRecoveryHandler erh) throws DatabaseException
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _exchangeDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- ExchangeBinding binding = ExchangeBinding.getInstance();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- ExchangeRecord exchangeRec = binding.entryToObject(value);
-
- String exchangeName = exchangeRec.getNameShortString() == null ? null :
- exchangeRec.getNameShortString().asString();
- String type = exchangeRec.getType() == null ? null :
- exchangeRec.getType().asString();
- boolean autoDelete = exchangeRec.isAutoDelete();
-
- erh.exchange(exchangeName, type, autoDelete);
- }
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- }
-
- private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException
- {
- Cursor cursor = null;
- try
- {
- cursor = _queueBindingsDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- QueueBindingTupleBinding binding = QueueBindingTupleBinding.getInstance();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- //yes, this is retrieving all the useful information from the key only.
- //For table compatibility it shall currently be left as is
- BindingRecord bindingRecord = binding.entryToObject(key);
-
- String exchangeName = bindingRecord.getExchangeName() == null ? null :
- bindingRecord.getExchangeName().asString();
- String queueName = bindingRecord.getQueueName() == null ? null :
- bindingRecord.getQueueName().asString();
- String routingKey = bindingRecord.getRoutingKey() == null ? null :
- bindingRecord.getRoutingKey().asString();
- ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null :
- java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes()));
-
- brh.binding(exchangeName, queueName, routingKey, argumentsBB);
- }
- }
- finally
- {
- closeCursorSafely(cursor);
- }
-
- }
-
-
private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
{
Cursor cursor = null;
@@ -681,7 +586,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
DatabaseEntry value = new DatabaseEntry();
-
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
QueueEntryKey qek = keyBinding.entryToObject(key);
@@ -700,10 +604,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
for(QueueEntryKey entry : entries)
{
- AMQShortString queueName = entry.getQueueName();
+ UUID queueId = entry.getQueueId();
long messageId = entry.getMessageId();
-
- qerh.queueEntry(queueName.asString(),messageId);
+ qerh.queueEntry(queueId, messageId);
}
}
catch (DatabaseException e)
@@ -892,25 +795,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if (_stateManager.isInState(State.ACTIVE))
{
- ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(),
- exchange.getTypeShortString(), exchange.isAutoDelete());
-
- DatabaseEntry key = new DatabaseEntry();
- AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
- keyBinding.objectToEntry(exchange.getNameShortString(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- ExchangeBinding exchangeBinding = ExchangeBinding.getInstance();
- exchangeBinding.objectToEntry(exchangeRec, value);
-
- try
- {
- _exchangeDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing Exchange with name " + exchange.getName() + " to database: " + e.getMessage(), e);
- }
+ ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange);
+ storeConfiguredObjectEntry(configuredObject);
}
}
@@ -919,82 +805,47 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*/
public void removeExchange(Exchange exchange) throws AMQStoreException
{
- DatabaseEntry key = new DatabaseEntry();
- AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
- keyBinding.objectToEntry(exchange.getNameShortString(), key);
- try
+ UUID id = exchange.getId();
+ if (LOGGER.isDebugEnabled())
{
- OperationStatus status = _exchangeDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Exchange " + exchange.getName() + " not found");
- }
+ LOGGER.debug("public void removeExchange(String name = " + exchange.getName() + ", uuid = " + id + "): called");
}
- catch (DatabaseException e)
+ OperationStatus status = removeConfiguredObject(id);
+ if (status == OperationStatus.NOTFOUND)
{
- throw new AMQStoreException("Error writing deleting with name " + exchange.getName() + " from database: " + e.getMessage(), e);
+ throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + id + " not found");
}
}
/**
- * @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
+ * @see DurableConfigurationStore#bindQueue(Binding)
*/
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
- bindQueue(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args));
- }
-
- protected void bindQueue(final BindingRecord bindingRecord) throws AMQStoreException
+ public void bindQueue(Binding binding) throws AMQStoreException
{
if (_stateManager.isInState(State.ACTIVE))
{
- DatabaseEntry key = new DatabaseEntry();
- QueueBindingTupleBinding keyBinding = QueueBindingTupleBinding.getInstance();
-
- keyBinding.objectToEntry(bindingRecord, key);
-
- //yes, this is writing out 0 as a value and putting all the
- //useful info into the key, don't ask me why. For table
- //compatibility it shall currently be left as is
- DatabaseEntry value = new DatabaseEntry();
- ByteBinding.byteToEntry((byte) 0, value);
-
- try
- {
- _queueBindingsDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing binding for AMQQueue with name " + bindingRecord.getQueueName() + " to exchange "
- + bindingRecord.getExchangeName() + " to database: " + e.getMessage(), e);
- }
+ ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding);
+ storeConfiguredObjectEntry(configuredObject);
}
}
/**
- * @see DurableConfigurationStore#unbindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
+ * @see DurableConfigurationStore#unbindQueue(Binding)
*/
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ public void unbindQueue(Binding binding)
throws AMQStoreException
{
- DatabaseEntry key = new DatabaseEntry();
- QueueBindingTupleBinding keyBinding = QueueBindingTupleBinding.getInstance();
- keyBinding.objectToEntry(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key);
-
- try
+ UUID id = binding.getId();
+ if (LOGGER.isDebugEnabled())
{
- OperationStatus status = _queueBindingsDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Queue binding for queue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " not found");
- }
+ LOGGER.debug("public void unbindQueue(Binding binding = " + binding + ", uuid = " + id + "): called");
}
- catch (DatabaseException e)
+
+ OperationStatus status = removeConfiguredObject(id);
+ if (status == OperationStatus.NOTFOUND)
{
- throw new AMQStoreException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " from database: " + e.getMessage(), e);
+ throw new AMQStoreException("Binding " + binding + " not found");
}
}
@@ -1011,47 +862,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*/
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
{
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
- }
-
- QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
- queue.getOwner(), queue.isExclusive(), arguments);
-
- createQueue(queueRecord);
- }
-
- /**
- * Makes the specified queue persistent.
- *
- * Only intended for direct use during store upgrades.
- *
- * @param queueRecord Details of the queue to store.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- protected void createQueue(QueueRecord queueRecord) throws AMQStoreException
- {
if (_stateManager.isInState(State.ACTIVE))
{
- DatabaseEntry key = new DatabaseEntry();
- AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
- keyBinding.objectToEntry(queueRecord.getNameShortString(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- QueueBinding queueBinding = QueueBinding.getInstance();
-
- queueBinding.objectToEntry(queueRecord, value);
- try
- {
- _queueDb.put(null, key, value);
- }
- catch (DatabaseException e)
+ if (LOGGER.isDebugEnabled())
{
- throw new AMQStoreException("Error writing AMQQueue with name " + queueRecord.getNameShortString().asString()
- + " to database: " + e.getMessage(), e);
+ LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + "), queue id" + queue.getId()
+ + ", arguments=" + arguments + "): called");
}
+ ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments);
+ storeConfiguredObjectEntry(configuredObject);
}
}
@@ -1059,8 +878,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
* Updates the specified queue in the persistent store, IF it is already present. If the queue
* is not present in the store, it will not be added.
*
- * NOTE: Currently only updates the exclusivity.
- *
* @param queue The queue to update the entry for.
* @throws AMQStoreException If the operation fails for any reason.
*/
@@ -1074,28 +891,30 @@ public abstract class AbstractBDBMessageStore implements MessageStore
try
{
DatabaseEntry key = new DatabaseEntry();
- AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
- keyBinding.objectToEntry(queue.getNameShortString(), key);
+ UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
+ keyBinding.objectToEntry(queue.getId(), key);
DatabaseEntry value = new DatabaseEntry();
DatabaseEntry newValue = new DatabaseEntry();
- QueueBinding queueBinding = QueueBinding.getInstance();
+ ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
- OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT);
- if(status == OperationStatus.SUCCESS)
+ OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT);
+ if (status == OperationStatus.SUCCESS)
{
- //read the existing record and apply the new exclusivity setting
- QueueRecord queueRecord = queueBinding.entryToObject(value);
- queueRecord.setExclusive(queue.isExclusive());
-
- //write the updated entry to the store
- queueBinding.objectToEntry(queueRecord, newValue);
+ ConfiguredObjectRecord queueRecord = configuredObjectBinding.entryToObject(value);
+ ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueRecord);
- _queueDb.put(null, key, newValue);
+ // write the updated entry to the store
+ configuredObjectBinding.objectToEntry(newQueueRecord, newValue);
+ status = _configuredObjectsDb.put(null, key, newValue);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Error updating queue details within the store: " + status);
+ }
}
- else if(status != OperationStatus.NOTFOUND)
+ else if (status != OperationStatus.NOTFOUND)
{
- throw new AMQStoreException("Error updating queue details within the store: " + status);
+ throw new AMQStoreException("Error finding queue details within the store: " + status);
}
}
catch (DatabaseException e)
@@ -1113,27 +932,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*/
public void removeQueue(final AMQQueue queue) throws AMQStoreException
{
- AMQShortString name = queue.getNameShortString();
-
+ UUID id = queue.getId();
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("public void removeQueue(AMQShortString name = " + name + "): called");
+ LOGGER.debug("public void removeQueue(AMQShortString name = " + queue.getName() + ", uuid = " + id + "): called");
}
- DatabaseEntry key = new DatabaseEntry();
- AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
- keyBinding.objectToEntry(name, key);
- try
+ OperationStatus status = removeConfiguredObject(id);
+ if (status == OperationStatus.NOTFOUND)
{
- OperationStatus status = _queueDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Queue " + name + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing deleting with name " + name + " from database: " + e.getMessage(), e);
+ throw new AMQStoreException("Queue " + queue.getName() + " with id " + id + " not found");
}
}
@@ -1233,11 +1041,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
long messageId) throws AMQStoreException
{
- AMQShortString name = AMQShortString.valueOf(queue.getResourceName());
DatabaseEntry key = new DatabaseEntry();
QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey dd = new QueueEntryKey(name, messageId);
+ QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId);
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
@@ -1246,15 +1053,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Enqueuing message " + messageId + " on queue " + name + " [Transaction" + tx + "]");
+ LOGGER.debug("Enqueuing message " + messageId + " on queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
+ + " [Transaction" + tx + "]");
}
_deliveryDb.put(tx, key, value);
}
catch (DatabaseException e)
{
LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
- throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name
- + " to database", e);
+ throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
+ + " to database", e);
}
}
@@ -1262,7 +1072,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
* Extracts a message from a specified queue, in a given transaction.
*
* @param tx The transaction for the operation.
- * @param queue The name queue to take the message from.
+ * @param queue The queue to take the message from.
* @param messageId The message to dequeue.
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
@@ -1270,17 +1080,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
long messageId) throws AMQStoreException
{
- AMQShortString name = new AMQShortString(queue.getResourceName());
DatabaseEntry key = new DatabaseEntry();
QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey queueEntryKey = new QueueEntryKey(name, messageId);
-
+ QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId);
+ UUID id = queue.getId();
keyBinding.objectToEntry(queueEntryKey, key);
-
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Dequeue message id " + messageId);
+ LOGGER.debug("Dequeue message id " + messageId + " from queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
}
try
@@ -1289,16 +1098,20 @@ public abstract class AbstractBDBMessageStore implements MessageStore
OperationStatus status = _deliveryDb.delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
- throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name);
+ throw new AMQStoreException("Unable to find message with id " + messageId + " on queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
}
else if (status != OperationStatus.SUCCESS)
{
- throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue " + name);
+ throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue"
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
}
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Removed message " + messageId + ", " + name + " from delivery db");
+ LOGGER.debug("Removed message " + messageId + " on queue "
+ + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id
+ + " from delivery db");
}
}
@@ -1438,7 +1251,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*
* @return a list of message ids for messages enqueued for a particular queue
*/
- List<Long> getEnqueuedMessages(AMQShortString queueName) throws AMQStoreException
+ List<Long> getEnqueuedMessages(UUID queueId) throws AMQStoreException
{
Cursor cursor = null;
try
@@ -1447,7 +1260,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
DatabaseEntry key = new DatabaseEntry();
- QueueEntryKey dd = new QueueEntryKey(queueName, 0);
+ QueueEntryKey dd = new QueueEntryKey(queueId, 0);
QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
keyBinding.objectToEntry(dd, key);
@@ -1459,7 +1272,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
dd = keyBinding.entryToObject(key);
- while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName))
+ while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId))
{
messageIds.add(dd.getMessageId());
@@ -1644,7 +1457,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
LOGGER.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
}
- Cursor cursor = null;
try
{
@@ -1706,24 +1518,59 @@ public abstract class AbstractBDBMessageStore implements MessageStore
return _messageContentDb;
}
- Database getQueuesDb()
- {
- return _queueDb;
- }
-
Database getDeliveryDb()
{
return _deliveryDb;
}
- Database getExchangesDb()
+ /**
+ * Makes the specified configured object persistent.
+ *
+ * @param configuredObject Details of the configured object to store.
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ private void storeConfiguredObjectEntry(ConfiguredObjectRecord configuredObject) throws AMQStoreException
{
- return _exchangeDb;
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
+ keyBinding.objectToEntry(configuredObject.getId(), key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance();
+
+ queueBinding.objectToEntry(configuredObject, value);
+ try
+ {
+ OperationStatus status = _configuredObjectsDb.put(null, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Error writing configured object " + configuredObject + " to database: "
+ + status);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error writing configured object " + configuredObject
+ + " to database: " + e.getMessage(), e);
+ }
+ }
}
- Database getBindingsDb()
+ private OperationStatus removeConfiguredObject(UUID id) throws AMQStoreException
{
- return _queueBindingsDb;
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
+ uuidBinding.objectToEntry(id, key);
+ try
+ {
+ return _configuredObjectsDb.delete(null, key);
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error deleting of configured object with id " + id + " from database", e);
+ }
}
protected abstract StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index e5aeed57cd..9f7eb4bfd9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -50,9 +50,9 @@ public class BDBMessageStore extends AbstractBDBMessageStore
private final CommitThread _commitThread = new CommitThread("Commit-Thread");
@Override
- protected void setupStore(File storePath) throws DatabaseException, AMQStoreException
+ protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException
{
- super.setupStore(storePath);
+ super.setupStore(storePath, name);
startCommitThread();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java
deleted file mode 100644
index b9d868f909..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.entry;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-public class BindingRecord extends Object
-{
- private final AMQShortString _exchangeName;
- private final AMQShortString _queueName;
- private final AMQShortString _routingKey;
- private final FieldTable _arguments;
-
- public BindingRecord(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments)
- {
- _exchangeName = exchangeName;
- _queueName = queueName;
- _routingKey = routingKey;
- _arguments = arguments;
- }
-
-
- public AMQShortString getExchangeName()
- {
- return _exchangeName;
- }
-
- public AMQShortString getQueueName()
- {
- return _queueName;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
-
- public FieldTable getArguments()
- {
- return _arguments;
- }
-
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java
deleted file mode 100644
index 180893178d..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.entry;
-
-import org.apache.qpid.framing.AMQShortString;
-
-public class ExchangeRecord extends Object
-{
- private final AMQShortString _exchangeName;
- private final AMQShortString _exchangeType;
- private final boolean _autoDelete;
-
- public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType, boolean autoDelete)
- {
- _exchangeName = exchangeName;
- _exchangeType = exchangeType;
- _autoDelete = autoDelete;
- }
-
- public AMQShortString getNameShortString()
- {
- return _exchangeName;
- }
-
- public AMQShortString getType()
- {
- return _exchangeType;
- }
-
- public boolean isAutoDelete()
- {
- return _autoDelete;
- }
-
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java
index a716758da3..e7cf93ff7a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java
@@ -20,22 +20,22 @@
*/
package org.apache.qpid.server.store.berkeleydb.entry;
-import org.apache.qpid.framing.AMQShortString;
+import java.util.UUID;
public class QueueEntryKey
{
- private AMQShortString _queueName;
+ private UUID _queueId;
private long _messageId;
- public QueueEntryKey(AMQShortString queueName, long messageId)
+ public QueueEntryKey(UUID queueId, long messageId)
{
- _queueName = queueName;
+ _queueId = queueId;
_messageId = messageId;
}
- public AMQShortString getQueueName()
+ public UUID getQueueId()
{
- return _queueName;
+ return _queueId;
}
public long getMessageId()
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java
deleted file mode 100644
index 5ea82427dc..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.entry;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-public class QueueRecord extends Object
-{
- private final AMQShortString _queueName;
- private final AMQShortString _owner;
- private final FieldTable _arguments;
- private boolean _exclusive;
-
- public QueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments)
- {
- _queueName = queueName;
- _owner = owner;
- _exclusive = exclusive;
- _arguments = arguments;
- }
-
- public AMQShortString getNameShortString()
- {
- return _queueName;
- }
-
- public AMQShortString getOwner()
- {
- return _owner;
- }
-
- public boolean isExclusive()
- {
- return _exclusive;
- }
-
- public void setExclusive(boolean exclusive)
- {
- _exclusive = exclusive;
- }
-
- public FieldTable getArguments()
- {
- return _arguments;
- }
-
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java
deleted file mode 100644
index b57ffb0169..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuple;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-public class AMQShortStringBinding extends TupleBinding<AMQShortString>
-{
- private static final AMQShortStringBinding INSTANCE = new AMQShortStringBinding();
-
- public static AMQShortStringBinding getInstance()
- {
- return INSTANCE;
- }
-
- /** private constructor forces getInstance instead */
- private AMQShortStringBinding() { }
-
- public AMQShortString entryToObject(TupleInput tupleInput)
- {
- return AMQShortStringEncoding.readShortString(tupleInput);
- }
-
- public void objectToEntry(AMQShortString object, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString(object, tupleOutput);
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
new file mode 100644
index 0000000000..8b84a4c9bb
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
@@ -0,0 +1,37 @@
+package org.apache.qpid.server.store.berkeleydb.tuple;
+
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord>
+{
+ private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding();
+
+ public static ConfiguredObjectBinding getInstance()
+ {
+ return INSTANCE;
+ }
+
+ /** non-public constructor forces getInstance instead */
+ private ConfiguredObjectBinding()
+ {
+ }
+
+ public ConfiguredObjectRecord entryToObject(TupleInput tupleInput)
+ {
+ String type = tupleInput.readString();
+ String json = tupleInput.readString();
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(null, type, json);
+ return configuredObject;
+ }
+
+ public void objectToEntry(ConfiguredObjectRecord object, TupleOutput tupleOutput)
+ {
+ tupleOutput.writeString(object.getType());
+ tupleOutput.writeString(object.getAttributes());
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java
deleted file mode 100644
index d4b1475ac7..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuple;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.entry.ExchangeRecord;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-public class ExchangeBinding extends TupleBinding<ExchangeRecord>
-{
- private static final ExchangeBinding INSTANCE = new ExchangeBinding();
-
- public static ExchangeBinding getInstance()
- {
- return INSTANCE;
- }
-
- /** private constructor forces getInstance instead */
- private ExchangeBinding() { }
-
- public ExchangeRecord entryToObject(TupleInput tupleInput)
- {
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
-
- boolean autoDelete = tupleInput.readBoolean();
-
- return new ExchangeRecord(name, typeName, autoDelete);
- }
-
- public void objectToEntry(ExchangeRecord exchange, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput);
- AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
-
- tupleOutput.writeBoolean(exchange.isAutoDelete());
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
index 33bf269880..09f2c50e2d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.store.berkeleydb.tuple;
+import java.util.UUID;
+
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
@@ -47,7 +49,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
Transaction.Record[] records = new Transaction.Record[input.readInt()];
for(int i = 0; i < records.length; i++)
{
- records[i] = new RecordImpl(input.readString(), input.readLong());
+ records[i] = new RecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong());
}
return records;
}
@@ -71,7 +73,9 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
output.writeInt(records.length);
for(Transaction.Record record : records)
{
- output.writeString(record.getQueue().getResourceName());
+ UUID id = record.getQueue().getId();
+ output.writeLong(id.getMostSignificantBits());
+ output.writeLong(id.getLeastSignificantBits());
output.writeLong(record.getMessage().getMessageNumber());
}
}
@@ -80,13 +84,13 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage
{
- private final String _queueName;
private long _messageNumber;
+ private UUID _queueId;
- public RecordImpl(String queueName, long messageNumber)
+ public RecordImpl(UUID queueId, long messageNumber)
{
- _queueName = queueName;
_messageNumber = messageNumber;
+ _queueId = queueId;
}
public TransactionLogResource getQueue()
@@ -114,9 +118,10 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
throw new UnsupportedOperationException();
}
- public String getResourceName()
+ @Override
+ public UUID getId()
{
- return _queueName;
+ return _queueId;
}
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java
deleted file mode 100644
index 7e1c63cc28..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuple;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.server.store.berkeleydb.entry.QueueRecord;
-
-public class QueueBinding extends TupleBinding<QueueRecord>
-{
- private static final Logger _logger = Logger.getLogger(QueueBinding.class);
-
- private static final QueueBinding INSTANCE = new QueueBinding();
-
- public static QueueBinding getInstance()
- {
- return INSTANCE;
- }
-
- /** private constructor forces getInstance instead */
- private QueueBinding() { }
-
- public QueueRecord entryToObject(TupleInput tupleInput)
- {
- try
- {
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
- // Addition for Version 2 of this table, read the queue arguments
- FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
- // Addition for Version 3 of this table, read the queue exclusivity
- boolean exclusive = tupleInput.readBoolean();
-
- return new QueueRecord(name, owner, exclusive, arguments);
- }
- catch (DatabaseException e)
- {
- _logger.error("Unable to create binding: " + e, e);
- return null;
- }
-
- }
-
- public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
- AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
- // Addition for Version 2 of this table, store the queue arguments
- FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
- // Addition for Version 3 of this table, store the queue exclusivity
- tupleOutput.writeBoolean(queue.isExclusive());
- }
-}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java
deleted file mode 100644
index 6ba929a541..0000000000
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuple;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.server.store.berkeleydb.entry.BindingRecord;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-
-public class QueueBindingTupleBinding extends TupleBinding<BindingRecord>
-{
- protected static final Logger _log = Logger.getLogger(QueueBindingTupleBinding.class);
-
- private static final QueueBindingTupleBinding INSTANCE = new QueueBindingTupleBinding();
-
- public static QueueBindingTupleBinding getInstance()
- {
- return INSTANCE;
- }
-
- /** private constructor forces getInstance instead */
- private QueueBindingTupleBinding() { }
-
- public BindingRecord entryToObject(TupleInput tupleInput)
- {
- AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
-
- FieldTable arguments;
-
- // Addition for Version 2 of this table
- try
- {
- arguments = FieldTableEncoding.readFieldTable(tupleInput);
- }
- catch (DatabaseException e)
- {
- _log.error("Unable to create binding: " + e, e);
- return null;
- }
-
- return new BindingRecord(exchangeName, queueName, routingKey, arguments);
- }
-
- public void objectToEntry(BindingRecord binding, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
-
- // Addition for Version 2 of this table
- FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
- }
-
-} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java
index f65df23706..22d0ede31f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuple;
+import java.util.UUID;
+
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
public class QueueEntryBinding extends TupleBinding<QueueEntryKey>
@@ -43,15 +43,17 @@ public class QueueEntryBinding extends TupleBinding<QueueEntryKey>
public QueueEntryKey entryToObject(TupleInput tupleInput)
{
- AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ UUID queueId = new UUID(tupleInput.readLong(), tupleInput.readLong());
long messageId = tupleInput.readLong();
- return new QueueEntryKey(queueName, messageId);
+ return new QueueEntryKey(queueId, messageId);
}
public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput)
{
- AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput);
+ UUID uuid = mk.getQueueId();
+ tupleOutput.writeLong(uuid.getMostSignificantBits());
+ tupleOutput.writeLong(uuid.getLeastSignificantBits());
tupleOutput.writeLong(mk.getMessageId());
}
} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java
index c96c751694..43aa5aa2b4 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java
@@ -31,39 +31,33 @@ import com.sleepycat.je.Transaction;
public abstract class AbstractStoreUpgrade implements StoreUpgrade
{
private static final Logger _logger = Logger.getLogger(AbstractStoreUpgrade.class);
- protected static final String[] USER_FRIENDLY_NAMES = new String[] { "Exchanges", "Queues", "Queue bindings",
- "Message deliveries", "Message metadata", "Message content", "Bridges", "Links", "Distributed transactions" };
- protected void reportFinished(Environment environment, String[] databaseNames, String[] userFriendlyNames)
+ protected void reportFinished(Environment environment, int version)
{
- if (_logger.isInfoEnabled())
+ _logger.info("Completed upgrade to version " + version);
+ if (_logger.isDebugEnabled())
{
- _logger.info("Upgraded:");
- List<String> databases = environment.getDatabaseNames();
- for (int i = 0; i < databaseNames.length; i++)
- {
- if (databases.contains(databaseNames[i]))
- {
- _logger.info(" " + getRowCount(databaseNames[i], environment) + " rows in " + userFriendlyNames[i]);
- }
- }
+ _logger.debug("Upgraded:");
+ reportDatabaseRowCount(environment);
}
}
+ private void reportDatabaseRowCount(Environment environment)
+ {
+ List<String> databases = environment.getDatabaseNames();
+ for (String database : databases)
+ {
+ _logger.debug(" " + getRowCount(database, environment) + " rows in " + database);
+ }
+ }
- protected void reportStarting(Environment environment, String[] databaseNames, String[] userFriendlyNames)
+ protected void reportStarting(Environment environment, int version)
{
- if (_logger.isInfoEnabled())
+ _logger.info("Starting store upgrade from version " + version);
+ if (_logger.isDebugEnabled())
{
- _logger.info("Upgrading:");
- List<String> databases = environment.getDatabaseNames();
- for (int i = 0; i < databaseNames.length; i++)
- {
- if (databases.contains(databaseNames[i]))
- {
- _logger.info(" " + getRowCount(databaseNames[i], environment) + " rows from " + userFriendlyNames[i]);
- }
- }
+ _logger.debug("Upgrading:");
+ reportDatabaseRowCount(environment);
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java
index 42a3173e21..925e40ea93 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java
@@ -48,7 +48,7 @@ public abstract class CursorOperation implements DatabaseRunnable
CursorOperation.this.processEntry(database, targetDatabase, transaction, key, value);
if (getProcessedCount() % 1000 == 0)
{
- _logger.info("Processed " + getProcessedCount() + " messages of " + getRowCount() + ".");
+ _logger.info("Processed " + getProcessedCount() + " records of " + getRowCount() + ".");
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
index df09726c8e..f73e2e5d78 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
@@ -26,6 +26,6 @@ import org.apache.qpid.AMQStoreException;
public interface StoreUpgrade
{
- void performUpgrade(Environment environment, UpgradeInteractionHandler handler)
+ void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName)
throws DatabaseException, AMQStoreException;
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
index d3e4dfce12..49e5e700c4 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
@@ -71,24 +71,16 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade
private static final String OLD_CONTENT_DB_NAME = "messageContentDb_v4";
private static final String NEW_CONTENT_DB_NAME = "messageContentDb_v5";
- private static final String[] OLD_DATABASE_NAMES = new String[] { EXCHANGE_DB_NAME, OLD_QUEUE_DB_NAME,
- OLD_BINDINGS_DB_NAME, OLD_DELIVERY_DB, OLD_METADATA_DB_NAME, OLD_CONTENT_DB_NAME, "bridges_v4", "links_v4",
- "xids_v4" };
- private static final String[] NEW_DATABASE_NAMES = new String[] { "exchangeDb_v5", NEW_QUEUE_DB_NAME,
- NEW_BINDINGS_DB_NAME, NEW_DELIVERY_DB, NEW_METADATA_DB_NAME, NEW_CONTENT_DB_NAME, "bridges_v5", "links_v5",
- "xids_v5" };
-
private static final byte COLON = (byte) ':';
private static final Logger _logger = Logger.getLogger(UpgradeFrom4To5.class);
- public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler) throws DatabaseException, AMQStoreException
+ public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, String virtualHostName) throws DatabaseException, AMQStoreException
{
- _logger.info("Starting store upgrade from version 4");
Transaction transaction = null;
try
{
- reportStarting(environment, OLD_DATABASE_NAMES, USER_FRIENDLY_NAMES);
+ reportStarting(environment, 4);
transaction = environment.beginTransaction(null, null);
@@ -103,7 +95,7 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade
renameRemainingDatabases(environment, handler, transaction);
transaction.commit();
- reportFinished(environment, NEW_DATABASE_NAMES, USER_FRIENDLY_NAMES);
+ reportFinished(environment, 5);
}
catch (Exception e)
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
index 54f6aa6f88..3265fb6823 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
@@ -24,13 +24,30 @@ import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteraction
import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteractionResponse.NO;
import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeInteractionResponse.YES;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.util.MapJsonSerializer;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.bind.tuple.TupleBinding;
@@ -51,26 +68,71 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
private static final Logger _logger = Logger.getLogger(UpgradeFrom5To6.class);
- private static final String OLD_CONTENT_DB_NAME = "messageContentDb_v5";
- private static final String NEW_CONTENT_DB_NAME = "MESSAGE_CONTENT";
- private static final String META_DATA_DB_NAME = "messageMetaDataDb_v5";
+ static final String OLD_CONTENT_DB_NAME = "messageContentDb_v5";
+ static final String NEW_CONTENT_DB_NAME = "MESSAGE_CONTENT";
+ static final String NEW_METADATA_DB_NAME = "MESSAGE_METADATA";
+ static final String OLD_META_DATA_DB_NAME = "messageMetaDataDb_v5";
+ static final String OLD_EXCHANGE_DB_NAME = "exchangeDb_v5";
+ static final String OLD_QUEUE_DB_NAME = "queueDb_v5";
+ static final String OLD_DELIVERY_DB_NAME = "deliveryDb_v5";
+ static final String OLD_QUEUE_BINDINGS_DB_NAME = "queueBindingsDb_v5";
+ static final String OLD_XID_DB_NAME = "xids_v5";
+ static final String NEW_XID_DB_NAME = "XIDS";
+ static final String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
+ static final String NEW_DELIVERY_DB_NAME = "QUEUE_ENTRIES";
+ static final String NEW_BRIDGES_DB_NAME = "BRIDGES";
+ static final String NEW_LINKS_DB_NAME = "LINKS";
+ static final String OLD_BRIDGES_DB_NAME = "bridges_v5";
+ static final String OLD_LINKS_DB_NAME = "links_v5";
- private static final String NEW_DB_NAMES[] = { "EXCHANGES", "QUEUES", "QUEUE_BINDINGS", "DELIVERIES",
- "MESSAGE_METADATA", NEW_CONTENT_DB_NAME, "BRIDGES", "LINKS", "XIDS" };
- private static final String OLD_DB_NAMES[] = { "exchangeDb_v5", "queueDb_v5", "queueBindingsDb_v5", "deliveryDb_v5",
- META_DATA_DB_NAME, OLD_CONTENT_DB_NAME, "bridges_v5", "links_v5", "xids_v5" };
+ static final String[] DEFAULT_EXCHANGES = { ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(),
+ ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), ExchangeDefaults.FANOUT_EXCHANGE_NAME.asString(),
+ ExchangeDefaults.HEADERS_EXCHANGE_NAME.asString(), ExchangeDefaults.TOPIC_EXCHANGE_NAME.asString(),
+ ExchangeDefaults.DIRECT_EXCHANGE_NAME.asString() };
+ private static final Set<String> DEFAULT_EXCHANGES_SET = new HashSet<String>(Arrays.asList(DEFAULT_EXCHANGES));
- public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler) throws DatabaseException, AMQStoreException
+ private MapJsonSerializer _serializer = new MapJsonSerializer();
+
+ /**
+ * Upgrades from a v5 database to a v6 database
+ *
+ * v6 is the first "new style" schema where we don't version every table,
+ * and the upgrade is re-runnable
+ *
+ * Change in this version:
+ *
+ * Message content is moved from the database messageContentDb_v5 to
+ * MESSAGE_CONTENT. The structure of the database changes from ( message-id:
+ * long, chunk-id: int ) -> ( size: int, byte[] data ) to ( message-id:
+ * long) -> ( byte[] data )
+ *
+ * That is we keep only one record per message, which contains all the
+ * message content
+ *
+ * Queue, Exchange, Bindings entries are stored now as configurable objects
+ * in "CONFIGURED_OBJECTS" table.
+ */
+ public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, String virtualHostName)
+ throws DatabaseException, AMQStoreException
+ {
+ reportStarting(environment, 5);
+ upgradeMessages(environment, handler);
+ upgradeConfiguredObjectsAndDependencies(environment, handler, virtualHostName);
+ renameDatabases(environment, null);
+ reportFinished(environment, 6);
+ }
+
+ private void upgradeConfiguredObjectsAndDependencies(Environment environment, UpgradeInteractionHandler handler, String virtualHostName)
+ throws AMQStoreException
{
- _logger.info("Starting store upgrade from version 5");
Transaction transaction = null;
try
{
- reportStarting(environment, OLD_DB_NAMES, USER_FRIENDLY_NAMES);
transaction = environment.beginTransaction(null, null);
- performUpgradeInternal(environment, handler, transaction);
+ upgradeConfiguredObjects(environment, handler, transaction, virtualHostName);
+ upgradeQueueEntries(environment, transaction, virtualHostName);
+ upgradeXidEntries(environment, transaction, virtualHostName);
transaction.commit();
- reportFinished(environment, NEW_DB_NAMES, USER_FRIENDLY_NAMES);
}
catch (Exception e)
{
@@ -90,22 +152,53 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
}
}
- /**
- * Upgrades from a v5 database to a v6 database
- *
- * v6 is the first "new style" schema where we don't version every table, and the upgrade is re-runnable
- *
- * Change in this version:
- *
- * Message content is moved from the database messageContentDb_v5 to MESSAGE_CONTENT.
- * The structure of the database changes from
- * ( message-id: long, chunk-id: int ) -> ( size: int, byte[] data )
- * to
- * ( message-id: long) -> ( byte[] data )
- *
- * That is we keep only one record per message, which contains all the message content
- */
- public void performUpgradeInternal(final Environment environment, final UpgradeInteractionHandler handler,
+ private void upgradeMessages(final Environment environment, final UpgradeInteractionHandler handler)
+ throws AMQStoreException
+ {
+ Transaction transaction = null;
+ try
+ {
+ transaction = environment.beginTransaction(null, null);
+ upgradeMessages(environment, handler, transaction);
+ transaction.commit();
+ }
+ catch (Exception e)
+ {
+ transaction.abort();
+ if (e instanceof DatabaseException)
+ {
+ throw (DatabaseException) e;
+ }
+ else if (e instanceof AMQStoreException)
+ {
+ throw (AMQStoreException) e;
+ }
+ else
+ {
+ throw new AMQStoreException("Unexpected exception", e);
+ }
+ }
+ }
+
+ private void renameDatabases(Environment environment, Transaction transaction)
+ {
+ List<String> databases = environment.getDatabaseNames();
+ String[] oldDatabases = { OLD_META_DATA_DB_NAME, OLD_BRIDGES_DB_NAME, OLD_LINKS_DB_NAME };
+ String[] newDatabases = { NEW_METADATA_DB_NAME, NEW_BRIDGES_DB_NAME, NEW_LINKS_DB_NAME };
+
+ for (int i = 0; i < oldDatabases.length; i++)
+ {
+ String oldName = oldDatabases[i];
+ String newName = newDatabases[i];
+ if (databases.contains(oldName))
+ {
+ _logger.info("Renaming " + oldName + " into " + newName);
+ environment.renameDatabase(transaction, oldName, newName);
+ }
+ }
+ }
+
+ private void upgradeMessages(final Environment environment, final UpgradeInteractionHandler handler,
final Transaction transaction) throws AMQStoreException
{
_logger.info("Message Contents");
@@ -129,27 +222,13 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
metadataDatabase);
}
};
- new DatabaseTemplate(environment, META_DATA_DB_NAME, contentTransaction).run(metaDataDatabaseOperation);
+ new DatabaseTemplate(environment, OLD_META_DATA_DB_NAME, contentTransaction)
+ .run(metaDataDatabaseOperation);
_logger.info(metaDataDatabaseOperation.getRowCount() + " Message Content Entries");
}
};
new DatabaseTemplate(environment, OLD_CONTENT_DB_NAME, NEW_CONTENT_DB_NAME, transaction).run(contentOperation);
- }
- renameDatabases(environment, transaction);
- }
-
- private void renameDatabases(Environment environment, Transaction transaction)
- {
- List<String> databases = environment.getDatabaseNames();
- for (int i = 0; i < OLD_DB_NAMES.length; i++)
- {
- String oldName = OLD_DB_NAMES[i];
- String newName = NEW_DB_NAMES[i];
- if (databases.contains(oldName))
- {
- _logger.info("Renaming " + oldName + " into " + newName);
- environment.renameDatabase(transaction, oldName, newName);
- }
+ environment.removeDatabase(transaction, OLD_CONTENT_DB_NAME);
}
}
@@ -221,7 +300,7 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
DatabaseEntry value = new DatabaseEntry();
dataBinding.objectToEntry(consolidatedData, value);
- newDatabase.put(txn, key, value);
+ put(newDatabase, txn, key, value);
}
/**
@@ -268,6 +347,264 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
return data;
}
+ private void upgradeConfiguredObjects(Environment environment, UpgradeInteractionHandler handler, Transaction transaction, String virtualHostName)
+ throws AMQStoreException
+ {
+ upgradeQueues(environment, transaction, virtualHostName);
+ upgradeExchanges(environment, transaction, virtualHostName);
+ upgradeQueueBindings(environment, transaction, handler, virtualHostName);
+ }
+
+ private void upgradeXidEntries(Environment environment, Transaction transaction, final String virtualHostName)
+ {
+ if (environment.getDatabaseNames().contains(OLD_XID_DB_NAME))
+ {
+ _logger.info("Xid Records");
+ final OldPreparedTransactionBinding oldTransactionBinding = new OldPreparedTransactionBinding();
+ final NewPreparedTransactionBinding newTransactionBinding = new NewPreparedTransactionBinding();
+ CursorOperation xidEntriesCursor = new CursorOperation()
+ {
+ @Override
+ public void processEntry(Database oldXidDatabase, Database newXidDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ OldPreparedTransaction oldPreparedTransaction = oldTransactionBinding.entryToObject(value);
+ OldRecordImpl[] oldDequeues = oldPreparedTransaction.getDequeues();
+ OldRecordImpl[] oldEnqueues = oldPreparedTransaction.getEnqueues();
+
+ NewRecordImpl[] newEnqueues = null;
+ NewRecordImpl[] newDequeues = null;
+ if (oldDequeues != null)
+ {
+ newDequeues = new NewRecordImpl[oldDequeues.length];
+ for (int i = 0; i < newDequeues.length; i++)
+ {
+ OldRecordImpl dequeue = oldDequeues[i];
+ UUID id = UUIDGenerator.generateUUID(dequeue.getQueueName(), virtualHostName);
+ newDequeues[i] = new NewRecordImpl(id, dequeue.getMessageNumber());
+ }
+ }
+ if (oldEnqueues != null)
+ {
+ newEnqueues = new NewRecordImpl[oldEnqueues.length];
+ for (int i = 0; i < newEnqueues.length; i++)
+ {
+ OldRecordImpl enqueue = oldEnqueues[i];
+ UUID id = UUIDGenerator.generateUUID(enqueue.getQueueName(), virtualHostName);
+ newEnqueues[i] = new NewRecordImpl(id, enqueue.getMessageNumber());
+ }
+ }
+ NewPreparedTransaction newPreparedTransaction = new NewPreparedTransaction(newEnqueues, newDequeues);
+ DatabaseEntry newValue = new DatabaseEntry();
+ newTransactionBinding.objectToEntry(newPreparedTransaction, newValue);
+ put(newXidDatabase, transaction, key, newValue);
+ }
+ };
+ new DatabaseTemplate(environment, OLD_XID_DB_NAME, NEW_XID_DB_NAME, transaction).run(xidEntriesCursor);
+ environment.removeDatabase(transaction, OLD_XID_DB_NAME);
+ _logger.info(xidEntriesCursor.getRowCount() + " Xid Entries");
+ }
+ }
+
+ private void upgradeQueueEntries(Environment environment, Transaction transaction, final String virtualHostName)
+ {
+ _logger.info("Queue Delivery Records");
+ if (environment.getDatabaseNames().contains(OLD_DELIVERY_DB_NAME))
+ {
+ final OldQueueEntryBinding oldBinding = new OldQueueEntryBinding();
+ final NewQueueEntryBinding newBinding = new NewQueueEntryBinding();
+ CursorOperation queueEntriesCursor = new CursorOperation()
+ {
+ @Override
+ public void processEntry(Database oldDeliveryDatabase, Database newDeliveryDatabase,
+ Transaction transaction, DatabaseEntry key, DatabaseEntry value)
+ {
+ OldQueueEntryKey oldEntryRecord = oldBinding.entryToObject(key);
+ UUID queueId = UUIDGenerator.generateUUID(oldEntryRecord.getQueueName().asString(), virtualHostName);
+
+ NewQueueEntryKey newEntryRecord = new NewQueueEntryKey(queueId, oldEntryRecord.getMessageId());
+ DatabaseEntry newKey = new DatabaseEntry();
+ newBinding.objectToEntry(newEntryRecord, newKey);
+ put(newDeliveryDatabase, transaction, newKey, value);
+ }
+ };
+ new DatabaseTemplate(environment, OLD_DELIVERY_DB_NAME, NEW_DELIVERY_DB_NAME, transaction)
+ .run(queueEntriesCursor);
+ environment.removeDatabase(transaction, OLD_DELIVERY_DB_NAME);
+ _logger.info(queueEntriesCursor.getRowCount() + " Queue Delivery Record Entries");
+ }
+ }
+
+ private void upgradeQueueBindings(Environment environment, Transaction transaction, final UpgradeInteractionHandler handler, final String virtualHostName)
+ {
+ _logger.info("Queue Bindings");
+ if (environment.getDatabaseNames().contains(OLD_QUEUE_BINDINGS_DB_NAME))
+ {
+ final QueueBindingBinding binding = new QueueBindingBinding();
+ CursorOperation bindingCursor = new CursorOperation()
+ {
+ @Override
+ public void processEntry(Database exchangeDatabase, Database configuredObjectsDatabase,
+ Transaction transaction, DatabaseEntry key, DatabaseEntry value)
+ {
+ // TODO: check and remove orphaned bindings
+ BindingRecord bindingRecord = binding.entryToObject(key);
+ String exchangeName = bindingRecord.getExchangeName() == null ? ExchangeDefaults.DEFAULT_EXCHANGE_NAME
+ .asString() : bindingRecord.getExchangeName().asString();
+ String queueName = bindingRecord.getQueueName().asString();
+ String routingKey = bindingRecord.getRoutingKey().asString();
+ FieldTable arguments = bindingRecord.getArguments();
+
+ UUID bindingId = UUIDGenerator.generateUUID();
+ UpgradeConfiguredObjectRecord configuredObject = createBindingConfiguredObjectRecord(exchangeName, queueName,
+ routingKey, arguments, virtualHostName);
+ storeConfiguredObjectEntry(configuredObjectsDatabase, bindingId, configuredObject, transaction);
+ }
+
+ };
+ new DatabaseTemplate(environment, OLD_QUEUE_BINDINGS_DB_NAME, CONFIGURED_OBJECTS_DB_NAME, transaction)
+ .run(bindingCursor);
+ environment.removeDatabase(transaction, OLD_QUEUE_BINDINGS_DB_NAME);
+ _logger.info(bindingCursor.getRowCount() + " Queue Binding Entries");
+ }
+ }
+
+ private List<String> upgradeExchanges(Environment environment, Transaction transaction, final String virtualHostName)
+ {
+ final List<String> exchangeNames = new ArrayList<String>();
+ _logger.info("Exchanges");
+ if (environment.getDatabaseNames().contains(OLD_EXCHANGE_DB_NAME))
+ {
+ final ExchangeBinding exchangeBinding = new ExchangeBinding();
+ CursorOperation exchangeCursor = new CursorOperation()
+ {
+ @Override
+ public void processEntry(Database exchangeDatabase, Database configuredObjectsDatabase,
+ Transaction transaction, DatabaseEntry key, DatabaseEntry value)
+ {
+ ExchangeRecord exchangeRecord = exchangeBinding.entryToObject(value);
+ String exchangeName = exchangeRecord.getNameShortString().asString();
+ if (!DEFAULT_EXCHANGES_SET.contains(exchangeName))
+ {
+ String exchangeType = exchangeRecord.getType().asString();
+ boolean autoDelete = exchangeRecord.isAutoDelete();
+
+ UUID exchangeId = UUIDGenerator.generateUUID(exchangeName, virtualHostName);
+
+ UpgradeConfiguredObjectRecord configuredObject = createExchangeConfiguredObjectRecord(exchangeName,
+ exchangeType, autoDelete);
+ storeConfiguredObjectEntry(configuredObjectsDatabase, exchangeId, configuredObject, transaction);
+ exchangeNames.add(exchangeName);
+ }
+ }
+ };
+ new DatabaseTemplate(environment, OLD_EXCHANGE_DB_NAME, CONFIGURED_OBJECTS_DB_NAME, transaction)
+ .run(exchangeCursor);
+ environment.removeDatabase(transaction, OLD_EXCHANGE_DB_NAME);
+ _logger.info(exchangeCursor.getRowCount() + " Exchange Entries");
+ }
+ return exchangeNames;
+ }
+
+ private List<String> upgradeQueues(Environment environment, Transaction transaction, final String virtualHostName)
+ {
+ final List<String> queueNames = new ArrayList<String>();
+ _logger.info("Queues");
+ if (environment.getDatabaseNames().contains(OLD_QUEUE_DB_NAME))
+ {
+ final UpgradeQueueBinding queueBinding = new UpgradeQueueBinding();
+ CursorOperation queueCursor = new CursorOperation()
+ {
+ @Override
+ public void processEntry(Database queueDatabase, Database configuredObjectsDatabase,
+ Transaction transaction, DatabaseEntry key, DatabaseEntry value)
+ {
+ OldQueueRecord queueRecord = queueBinding.entryToObject(value);
+ String queueName = queueRecord.getNameShortString().asString();
+ queueNames.add(queueName);
+ String owner = queueRecord.getOwner() == null ? null : queueRecord.getOwner().asString();
+ boolean exclusive = queueRecord.isExclusive();
+ FieldTable arguments = queueRecord.getArguments();
+
+ UUID queueId = UUIDGenerator.generateUUID(queueName, virtualHostName);
+ UpgradeConfiguredObjectRecord configuredObject = createQueueConfiguredObjectRecord(queueName, owner, exclusive,
+ arguments);
+ storeConfiguredObjectEntry(configuredObjectsDatabase, queueId, configuredObject, transaction);
+ }
+ };
+ new DatabaseTemplate(environment, OLD_QUEUE_DB_NAME, CONFIGURED_OBJECTS_DB_NAME, transaction).run(queueCursor);
+ environment.removeDatabase(transaction, OLD_QUEUE_DB_NAME);
+ _logger.info(queueCursor.getRowCount() + " Queue Entries");
+ }
+ return queueNames;
+ }
+
+ private void storeConfiguredObjectEntry(Database configuredObjectsDatabase, UUID id,
+ UpgradeConfiguredObjectRecord configuredObject, Transaction transaction)
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ UpgradeUUIDBinding uuidBinding = new UpgradeUUIDBinding();
+ uuidBinding.objectToEntry(id, key);
+ ConfiguredObjectBinding configuredBinding = new ConfiguredObjectBinding();
+ configuredBinding.objectToEntry(configuredObject, value);
+ put(configuredObjectsDatabase, transaction, key, value);
+ }
+
+ private UpgradeConfiguredObjectRecord createQueueConfiguredObjectRecord(String queueName, String owner, boolean exclusive,
+ FieldTable arguments)
+ {
+ Map<String, Object> attributesMap = new HashMap<String, Object>();
+ attributesMap.put(Queue.NAME, queueName);
+ attributesMap.put(Queue.OWNER, owner);
+ attributesMap.put(Queue.EXCLUSIVE, exclusive);
+ if (arguments != null)
+ {
+ attributesMap.put("ARGUMENTS", FieldTable.convertToMap(arguments));
+ }
+ String json = _serializer.serialize(attributesMap);
+ UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Queue.class.getName(), json);
+ return configuredObject;
+ }
+
+ private UpgradeConfiguredObjectRecord createExchangeConfiguredObjectRecord(String exchangeName, String exchangeType,
+ boolean autoDelete)
+ {
+ Map<String, Object> attributesMap = new HashMap<String, Object>();
+ attributesMap.put(Exchange.NAME, exchangeName);
+ attributesMap.put(Exchange.TYPE, exchangeType);
+ attributesMap.put(Exchange.LIFETIME_POLICY, autoDelete ? LifetimePolicy.AUTO_DELETE.name()
+ : LifetimePolicy.PERMANENT.name());
+ String json = _serializer.serialize(attributesMap);
+ UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Exchange.class.getName(), json);
+ return configuredObject;
+ }
+
+ private UpgradeConfiguredObjectRecord createBindingConfiguredObjectRecord(String exchangeName, String queueName,
+ String routingKey, FieldTable arguments, String virtualHostName)
+ {
+ Map<String, Object> attributesMap = new HashMap<String, Object>();
+ attributesMap.put(Binding.NAME, routingKey);
+ attributesMap.put(Binding.EXCHANGE, UUIDGenerator.generateUUID(exchangeName, virtualHostName));
+ attributesMap.put(Binding.QUEUE, UUIDGenerator.generateUUID(queueName, virtualHostName));
+ if (arguments != null)
+ {
+ attributesMap.put(Binding.ARGUMENTS, FieldTable.convertToMap(arguments));
+ }
+ String json = _serializer.serialize(attributesMap);
+ UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(Binding.class.getName(), json);
+ return configuredObject;
+ }
+
+ private void put(final Database database, Transaction txn, DatabaseEntry key, DatabaseEntry value)
+ {
+ OperationStatus status = database.put(txn, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new RuntimeException("Cannot add record into " + database.getDatabaseName() + ":" + status);
+ }
+ }
+
static final class CompoundKey
{
public final long _messageId;
@@ -367,4 +704,504 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade
}
}
+ static class OldQueueRecord extends Object
+ {
+ private final AMQShortString _queueName;
+ private final AMQShortString _owner;
+ private final FieldTable _arguments;
+ private boolean _exclusive;
+
+ public OldQueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments)
+ {
+ _queueName = queueName;
+ _owner = owner;
+ _exclusive = exclusive;
+ _arguments = arguments;
+ }
+
+ public AMQShortString getNameShortString()
+ {
+ return _queueName;
+ }
+
+ public AMQShortString getOwner()
+ {
+ return _owner;
+ }
+
+ public boolean isExclusive()
+ {
+ return _exclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _exclusive = exclusive;
+ }
+
+ public FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+ }
+
+ static class UpgradeConfiguredObjectRecord
+ {
+ private String _attributes;
+ private String _type;
+
+ public UpgradeConfiguredObjectRecord(String type, String attributes)
+ {
+ super();
+ _attributes = attributes;
+ _type = type;
+ }
+
+ public String getAttributes()
+ {
+ return _attributes;
+ }
+
+ public String getType()
+ {
+ return _type;
+ }
+
+ }
+
+ static class UpgradeQueueBinding extends TupleBinding<OldQueueRecord>
+ {
+ public OldQueueRecord entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+ boolean exclusive = tupleInput.readBoolean();
+ return new OldQueueRecord(name, owner, exclusive, arguments);
+ }
+
+ public void objectToEntry(OldQueueRecord queue, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
+ tupleOutput.writeBoolean(queue.isExclusive());
+ }
+ }
+
+ static class UpgradeUUIDBinding extends TupleBinding<UUID>
+ {
+ public UUID entryToObject(final TupleInput tupleInput)
+ {
+ return new UUID(tupleInput.readLong(), tupleInput.readLong());
+ }
+
+ public void objectToEntry(final UUID uuid, final TupleOutput tupleOutput)
+ {
+ tupleOutput.writeLong(uuid.getMostSignificantBits());
+ tupleOutput.writeLong(uuid.getLeastSignificantBits());
+ }
+ }
+
+ static class ConfiguredObjectBinding extends TupleBinding<UpgradeConfiguredObjectRecord>
+ {
+
+ public UpgradeConfiguredObjectRecord entryToObject(TupleInput tupleInput)
+ {
+ String type = tupleInput.readString();
+ String json = tupleInput.readString();
+ UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(type, json);
+ return configuredObject;
+ }
+
+ public void objectToEntry(UpgradeConfiguredObjectRecord object, TupleOutput tupleOutput)
+ {
+ tupleOutput.writeString(object.getType());
+ tupleOutput.writeString(object.getAttributes());
+ }
+
+ }
+
+ static class ExchangeRecord extends Object
+ {
+ private final AMQShortString _exchangeName;
+ private final AMQShortString _exchangeType;
+ private final boolean _autoDelete;
+
+ public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType, boolean autoDelete)
+ {
+ _exchangeName = exchangeName;
+ _exchangeType = exchangeType;
+ _autoDelete = autoDelete;
+ }
+
+ public AMQShortString getNameShortString()
+ {
+ return _exchangeName;
+ }
+
+ public AMQShortString getType()
+ {
+ return _exchangeType;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _autoDelete;
+ }
+
+ }
+
+ static class ExchangeBinding extends TupleBinding<ExchangeRecord>
+ {
+
+ public ExchangeRecord entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
+
+ boolean autoDelete = tupleInput.readBoolean();
+
+ return new ExchangeRecord(name, typeName, autoDelete);
+ }
+
+ public void objectToEntry(ExchangeRecord exchange, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
+
+ tupleOutput.writeBoolean(exchange.isAutoDelete());
+ }
+ }
+
+ static class BindingRecord extends Object
+ {
+ private final AMQShortString _exchangeName;
+ private final AMQShortString _queueName;
+ private final AMQShortString _routingKey;
+ private final FieldTable _arguments;
+
+ public BindingRecord(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey,
+ FieldTable arguments)
+ {
+ _exchangeName = exchangeName;
+ _queueName = queueName;
+ _routingKey = routingKey;
+ _arguments = arguments;
+ }
+
+ public AMQShortString getExchangeName()
+ {
+ return _exchangeName;
+ }
+
+ public AMQShortString getQueueName()
+ {
+ return _queueName;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ public FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+ }
+
+ static class QueueBindingBinding extends TupleBinding<BindingRecord>
+ {
+
+ public BindingRecord entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+
+ FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+
+ return new BindingRecord(exchangeName, queueName, routingKey, arguments);
+ }
+
+ public void objectToEntry(BindingRecord binding, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+
+ FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
+ }
+ }
+
+ static class OldQueueEntryKey
+ {
+ private AMQShortString _queueName;
+ private long _messageId;
+
+ public OldQueueEntryKey(AMQShortString queueName, long messageId)
+ {
+ _queueName = queueName;
+ _messageId = messageId;
+ }
+
+ public AMQShortString getQueueName()
+ {
+ return _queueName;
+ }
+
+ public long getMessageId()
+ {
+ return _messageId;
+ }
+ }
+
+ static class OldQueueEntryBinding extends TupleBinding<OldQueueEntryKey>
+ {
+
+ public OldQueueEntryKey entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ long messageId = tupleInput.readLong();
+
+ return new OldQueueEntryKey(queueName, messageId);
+ }
+
+ public void objectToEntry(OldQueueEntryKey mk, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(mk.getQueueName(), tupleOutput);
+ tupleOutput.writeLong(mk.getMessageId());
+ }
+ }
+
+ static class NewQueueEntryKey
+ {
+ private UUID _queueId;
+ private long _messageId;
+
+ public NewQueueEntryKey(UUID queueId, long messageId)
+ {
+ _queueId = queueId;
+ _messageId = messageId;
+ }
+
+ public UUID getQueueId()
+ {
+ return _queueId;
+ }
+
+ public long getMessageId()
+ {
+ return _messageId;
+ }
+ }
+
+ static class NewQueueEntryBinding extends TupleBinding<NewQueueEntryKey>
+ {
+
+ public NewQueueEntryKey entryToObject(TupleInput tupleInput)
+ {
+ UUID queueId = new UUID(tupleInput.readLong(), tupleInput.readLong());
+ long messageId = tupleInput.readLong();
+
+ return new NewQueueEntryKey(queueId, messageId);
+ }
+
+ public void objectToEntry(NewQueueEntryKey mk, TupleOutput tupleOutput)
+ {
+ UUID uuid = mk.getQueueId();
+ tupleOutput.writeLong(uuid.getMostSignificantBits());
+ tupleOutput.writeLong(uuid.getLeastSignificantBits());
+ tupleOutput.writeLong(mk.getMessageId());
+ }
+ }
+
+ static class NewPreparedTransaction
+ {
+ private final NewRecordImpl[] _enqueues;
+ private final NewRecordImpl[] _dequeues;
+
+ public NewPreparedTransaction(NewRecordImpl[] enqueues, NewRecordImpl[] dequeues)
+ {
+ _enqueues = enqueues;
+ _dequeues = dequeues;
+ }
+
+ public NewRecordImpl[] getEnqueues()
+ {
+ return _enqueues;
+ }
+
+ public NewRecordImpl[] getDequeues()
+ {
+ return _dequeues;
+ }
+ }
+
+ static class NewRecordImpl
+ {
+
+ private long _messageNumber;
+ private UUID _queueId;
+
+ public NewRecordImpl(UUID queueId, long messageNumber)
+ {
+ _messageNumber = messageNumber;
+ _queueId = queueId;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
+
+ public UUID getId()
+ {
+ return _queueId;
+ }
+ }
+
+ static class NewPreparedTransactionBinding extends TupleBinding<NewPreparedTransaction>
+ {
+ @Override
+ public NewPreparedTransaction entryToObject(TupleInput input)
+ {
+ NewRecordImpl[] enqueues = readRecords(input);
+
+ NewRecordImpl[] dequeues = readRecords(input);
+
+ return new NewPreparedTransaction(enqueues, dequeues);
+ }
+
+ private NewRecordImpl[] readRecords(TupleInput input)
+ {
+ NewRecordImpl[] records = new NewRecordImpl[input.readInt()];
+ for (int i = 0; i < records.length; i++)
+ {
+ records[i] = new NewRecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong());
+ }
+ return records;
+ }
+
+ @Override
+ public void objectToEntry(NewPreparedTransaction preparedTransaction, TupleOutput output)
+ {
+ writeRecords(preparedTransaction.getEnqueues(), output);
+ writeRecords(preparedTransaction.getDequeues(), output);
+ }
+
+ private void writeRecords(NewRecordImpl[] records, TupleOutput output)
+ {
+ if (records == null)
+ {
+ output.writeInt(0);
+ }
+ else
+ {
+ output.writeInt(records.length);
+ for (NewRecordImpl record : records)
+ {
+ UUID id = record.getId();
+ output.writeLong(id.getMostSignificantBits());
+ output.writeLong(id.getLeastSignificantBits());
+ output.writeLong(record.getMessageNumber());
+ }
+ }
+ }
+ }
+
+ static class OldRecordImpl
+ {
+
+ private long _messageNumber;
+ private String _queueName;
+
+ public OldRecordImpl(String queueName, long messageNumber)
+ {
+ _messageNumber = messageNumber;
+ _queueName = queueName;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
+
+ public String getQueueName()
+ {
+ return _queueName;
+ }
+ }
+
+ static class OldPreparedTransaction
+ {
+ private final OldRecordImpl[] _enqueues;
+ private final OldRecordImpl[] _dequeues;
+
+ public OldPreparedTransaction(OldRecordImpl[] enqueues, OldRecordImpl[] dequeues)
+ {
+ _enqueues = enqueues;
+ _dequeues = dequeues;
+ }
+
+ public OldRecordImpl[] getEnqueues()
+ {
+ return _enqueues;
+ }
+
+ public OldRecordImpl[] getDequeues()
+ {
+ return _dequeues;
+ }
+ }
+
+ static class OldPreparedTransactionBinding extends TupleBinding<OldPreparedTransaction>
+ {
+ @Override
+ public OldPreparedTransaction entryToObject(TupleInput input)
+ {
+ OldRecordImpl[] enqueues = readRecords(input);
+
+ OldRecordImpl[] dequeues = readRecords(input);
+
+ return new OldPreparedTransaction(enqueues, dequeues);
+ }
+
+ private OldRecordImpl[] readRecords(TupleInput input)
+ {
+ OldRecordImpl[] records = new OldRecordImpl[input.readInt()];
+ for (int i = 0; i < records.length; i++)
+ {
+ records[i] = new OldRecordImpl(input.readString(), input.readLong());
+ }
+ return records;
+ }
+
+ @Override
+ public void objectToEntry(OldPreparedTransaction preparedTransaction, TupleOutput output)
+ {
+ writeRecords(preparedTransaction.getEnqueues(), output);
+ writeRecords(preparedTransaction.getDequeues(), output);
+ }
+
+ private void writeRecords(OldRecordImpl[] records, TupleOutput output)
+ {
+ if (records == null)
+ {
+ output.writeInt(0);
+ }
+ else
+ {
+ output.writeInt(records.length);
+ for (OldRecordImpl record : records)
+ {
+ output.writeString(record.getQueueName());
+ output.writeLong(record.getMessageNumber());
+ }
+ }
+ }
+ }
} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
index 77455c2ea1..e71e39cbb8 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
@@ -38,13 +38,15 @@ import com.sleepycat.je.OperationStatus;
public class Upgrader
{
- static final String VERSION_DB_NAME = "VERSION";
+ static final String VERSION_DB_NAME = "DB_VERSION";
private Environment _environment;
+ private String _virtualHostName;
- public Upgrader(Environment environment)
+ public Upgrader(Environment environment, String virtualHostName)
{
_environment = environment;
+ _virtualHostName = virtualHostName;
}
public void upgradeIfNecessary() throws AMQStoreException
@@ -125,7 +127,7 @@ public class Upgrader
+ "UpgradeFrom"+fromVersion+"To"+toVersion);
Constructor<StoreUpgrade> ctr = upgradeClass.getConstructor();
StoreUpgrade upgrade = ctr.newInstance();
- upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER);
+ upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _virtualHostName);
}
catch (ClassNotFoundException e)
{
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
new file mode 100644
index 0000000000..687c671566
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
@@ -0,0 +1,14 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.store.DurableConfigurationStoreTest;
+import org.apache.qpid.server.store.MessageStore;
+
+public class BDBMessageStoreConfigurationTest extends DurableConfigurationStoreTest
+{
+ @Override
+ protected MessageStore createStore() throws Exception
+ {
+ return new BDBMessageStore();
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 18e20f2ee8..a318187f13 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -36,6 +37,7 @@ import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.store.MessageMetaDataType;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -405,13 +407,13 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
BDBMessageStore bdbStore = assertBDBStore(log);
- final AMQShortString mockQueueName = new AMQShortString("queueName");
-
+ final UUID mockQueueId = UUIDGenerator.generateUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
{
- public String getResourceName()
+ @Override
+ public UUID getId()
{
- return mockQueueName.asString();
+ return mockQueueId;
}
};
@@ -421,7 +423,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
txn.enqueueMessage(mockQueue, new MockMessage(5L));
txn.commitTran();
- List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
@@ -443,13 +445,13 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
BDBMessageStore bdbStore = assertBDBStore(log);
- final AMQShortString mockQueueName = new AMQShortString("queueName");
-
+ final UUID mockQueueId = UUIDGenerator.generateUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
{
- public String getResourceName()
+ @Override
+ public UUID getId()
{
- return mockQueueName.asString();
+ return mockQueueId;
}
};
@@ -463,7 +465,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
txn.enqueueMessage(mockQueue, new MockMessage(23L));
txn.commitTran();
- List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
@@ -484,13 +486,13 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
BDBMessageStore bdbStore = assertBDBStore(log);
- final AMQShortString mockQueueName = new AMQShortString("queueName");
-
+ final UUID mockQueueId = UUIDGenerator.generateUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
{
- public String getResourceName()
+ @Override
+ public UUID getId()
{
- return mockQueueName.asString();
+ return mockQueueId;
}
};
@@ -507,7 +509,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
txn.enqueueMessage(mockQueue, new MockMessage(32L));
txn.commitTran();
- List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
new file mode 100644
index 0000000000..f8aeb7f7b0
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuple;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class ConfiguredObjectBindingTest extends TestCase
+{
+
+ private ConfiguredObjectRecord _object;
+
+ private static final String DUMMY_ATTRIBUTES_STRING = "dummyAttributes";
+ private static final String DUMMY_TYPE_STRING = "dummyType";
+ private ConfiguredObjectBinding _configuredObjectBinding;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _configuredObjectBinding = ConfiguredObjectBinding.getInstance();
+ _object = new ConfiguredObjectRecord(UUIDGenerator.generateUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_STRING);
+ }
+
+ public void testObjectToEntryAndEntryToObject()
+ {
+ TupleOutput tupleOutput = new TupleOutput();
+
+ _configuredObjectBinding.objectToEntry(_object, tupleOutput);
+
+ byte[] entryAsBytes = tupleOutput.getBufferBytes();
+ TupleInput tupleInput = new TupleInput(entryAsBytes);
+
+ ConfiguredObjectRecord storedObject = _configuredObjectBinding.entryToObject(tupleInput);
+ assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_STRING, storedObject.getAttributes());
+ assertEquals("Unexpected type", DUMMY_TYPE_STRING, storedObject.getType());
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
index 6df2f8a8db..36991b90d0 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
@@ -22,10 +22,9 @@ package org.apache.qpid.server.store.berkeleydb.upgrade;
import java.io.File;
-import junit.framework.TestCase;
-
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.subjects.TestBlankSubject;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
import com.sleepycat.je.Database;
@@ -33,7 +32,7 @@ import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.Transaction;
-public abstract class AbstractUpgradeTestCase extends TestCase
+public abstract class AbstractUpgradeTestCase extends QpidTestCase
{
protected static final class StaticAnswerHandler implements UpgradeInteractionHandler
{
@@ -57,7 +56,6 @@ public abstract class AbstractUpgradeTestCase extends TestCase
public static int[] QUEUE_SIZES = { 1, 1, 10, 3 };
public static int TOTAL_MESSAGE_NUMBER = 15;
protected static final LogSubject LOG_SUBJECT = new TestBlankSubject();
- protected static final String TMP_FOLDER = System.getProperty("java.io.tmpdir");
// one binding per exchange
protected static final int TOTAL_BINDINGS = QUEUE_NAMES.length * 2;
@@ -148,4 +146,8 @@ public abstract class AbstractUpgradeTestCase extends TestCase
return count.longValue();
}
+ public String getVirtualHostName()
+ {
+ return getName();
+ }
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
index c9103b1997..3f9e4e4aa1 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
@@ -66,7 +66,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
public void testPerformUpgradeWithHandlerAnsweringYes() throws Exception
{
UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
- upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES));
+ upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHostName());
assertQueues(new HashSet<String>(Arrays.asList(QUEUE_NAMES)));
@@ -93,7 +93,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase
public void testPerformUpgradeWithHandlerAnsweringNo() throws Exception
{
UpgradeFrom4To5 upgrade = new UpgradeFrom4To5();
- upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO));
+ upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHostName());
assertQueues(new HashSet<String>(Arrays.asList(DURABLE_SUBSCRIPTION_QUEUE, DURABLE_SUBSCRIPTION_QUEUE_WITH_SELECTOR, DURABLE_QUEUE)));
assertDatabaseRecordCount(DELIVERY_DB_NAME, 12);
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
index d73a777ca6..5297692820 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
@@ -20,14 +20,50 @@
*/
package org.apache.qpid.server.store.berkeleydb.upgrade;
+import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CONFIGURED_OBJECTS_DB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_CONTENT_DB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_DELIVERY_DB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_METADATA_DB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NEW_XID_DB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_CONTENT_DB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OLD_XID_DB_NAME;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.ConfiguredObjectBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeConfiguredObjectRecord;
import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewDataBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransaction;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewPreparedTransactionBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewQueueEntryBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewQueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.NewRecordImpl;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransaction;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldPreparedTransactionBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.OldRecordImpl;
+import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.UpgradeUUIDBinding;
+import org.apache.qpid.server.util.MapJsonSerializer;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.LockMode;
import com.sleepycat.je.Transaction;
public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
@@ -43,10 +79,13 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
public void testPerformUpgrade() throws Exception
{
UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
- upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER);
+ upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHostName());
assertDatabaseRecordCounts();
assertContent();
+
+ assertConfiguredObjects();
+ assertQueueEntries();
}
public void testPerformUpgradeWithMissingMessageChunkKeepsIncompleteMessage() throws Exception
@@ -54,9 +93,12 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
corruptDatabase();
UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
- upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES));
+ upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHostName());
assertDatabaseRecordCounts();
+
+ assertConfiguredObjects();
+ assertQueueEntries();
}
public void testPerformUpgradeWithMissingMessageChunkDiscardsIncompleteMessage() throws Exception
@@ -67,10 +109,117 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
UpgradeInteractionHandler discardMessageInteractionHandler = new StaticAnswerHandler(UpgradeInteractionResponse.NO);
- upgrade.performUpgrade(_environment, discardMessageInteractionHandler);
+ upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHostName());
+
+ assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 11);
+ assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 11);
+
+ assertConfiguredObjects();
+ assertQueueEntries();
+ }
+
+ public void testPerformXidUpgrade() throws Exception
+ {
+ File storeLocation = new File(TMP_FOLDER, getName());
+ storeLocation.mkdirs();
+ Environment environment = createEnvironment(storeLocation);
+ try
+ {
+ populateOldXidEntries(environment);
+ UpgradeFrom5To6 upgrade = new UpgradeFrom5To6();
+ upgrade.performUpgrade(environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHostName());
+ assertXidEntries(environment);
+ }
+ finally
+ {
+ try
+ {
+ environment.close();
+ }
+ finally
+ {
+ deleteDirectoryIfExists(storeLocation);
+ }
+
+ }
+ }
+
+ private void assertXidEntries(Environment environment)
+ {
+ final DatabaseEntry value = new DatabaseEntry();
+ final DatabaseEntry key = getXidKey();
+ new DatabaseTemplate(environment, NEW_XID_DB_NAME, null).run(new DatabaseRunnable()
+ {
+
+ @Override
+ public void run(Database xidDatabase, Database nullDatabase, Transaction transaction)
+ {
+ xidDatabase.get(null, key, value, LockMode.DEFAULT);
+ }
+ });
+ NewPreparedTransactionBinding newBinding = new NewPreparedTransactionBinding();
+ NewPreparedTransaction newTransaction = newBinding.entryToObject(value);
+ NewRecordImpl[] newEnqueues = newTransaction.getEnqueues();
+ NewRecordImpl[] newDequeues = newTransaction.getDequeues();
+ assertEquals("Unxpected new enqueus number", 1, newEnqueues.length);
+ NewRecordImpl enqueue = newEnqueues[0];
+ assertEquals("Unxpected queue id", UUIDGenerator.generateUUID("TEST1", getVirtualHostName()), enqueue.getId());
+ assertEquals("Unxpected message id", 1, enqueue.getMessageNumber());
+ assertEquals("Unxpected new dequeues number", 1, newDequeues.length);
+ NewRecordImpl dequeue = newDequeues[0];
+ assertEquals("Unxpected queue id", UUIDGenerator.generateUUID("TEST2", getVirtualHostName()), dequeue.getId());
+ assertEquals("Unxpected message id", 2, dequeue.getMessageNumber());
+ }
+
+ private void populateOldXidEntries(Environment environment)
+ {
- assertDatabaseRecordCount("MESSAGE_METADATA", 11);
- assertDatabaseRecordCount("MESSAGE_CONTENT", 11);
+ final DatabaseEntry value = new DatabaseEntry();
+ OldRecordImpl[] enqueues = { new OldRecordImpl("TEST1", 1) };
+ OldRecordImpl[] dequeues = { new OldRecordImpl("TEST2", 2) };
+ OldPreparedTransaction oldPreparedTransaction = new OldPreparedTransaction(enqueues, dequeues);
+ OldPreparedTransactionBinding oldPreparedTransactionBinding = new OldPreparedTransactionBinding();
+ oldPreparedTransactionBinding.objectToEntry(oldPreparedTransaction, value);
+
+ final DatabaseEntry key = getXidKey();
+ new DatabaseTemplate(environment, OLD_XID_DB_NAME, null).run(new DatabaseRunnable()
+ {
+
+ @Override
+ public void run(Database xidDatabase, Database nullDatabase, Transaction transaction)
+ {
+ xidDatabase.put(null, key, value);
+ }
+ });
+ }
+
+ protected DatabaseEntry getXidKey()
+ {
+ final DatabaseEntry value = new DatabaseEntry();
+ byte[] globalId = { 1 };
+ byte[] branchId = { 2 };
+ Xid xid = new Xid(1l, globalId, branchId);
+ XidBinding xidBinding = XidBinding.getInstance();
+ xidBinding.objectToEntry(xid, value);
+ return value;
+ }
+
+ private void assertQueueEntries()
+ {
+ final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
+ final NewQueueEntryBinding newBinding = new NewQueueEntryBinding();
+ CursorOperation cursorOperation = new CursorOperation()
+ {
+
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ NewQueueEntryKey newEntryRecord = newBinding.entryToObject(key);
+ assertTrue("Unexpected queue id", configuredObjects.containsKey(newEntryRecord.getQueueId()));
+ }
+ };
+ new DatabaseTemplate(_environment, NEW_DELIVERY_DB_NAME, null).run(cursorOperation);
}
/**
@@ -105,19 +254,124 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
};
Transaction transaction = _environment.beginTransaction(null, null);
- new DatabaseTemplate(_environment, "messageContentDb_v5", transaction).run(cursorOperation);
+ new DatabaseTemplate(_environment, OLD_CONTENT_DB_NAME, transaction).run(cursorOperation);
transaction.commit();
}
private void assertDatabaseRecordCounts()
{
- assertDatabaseRecordCount("EXCHANGES", 5);
- assertDatabaseRecordCount("QUEUES", 3);
- assertDatabaseRecordCount("QUEUE_BINDINGS", 6);
- assertDatabaseRecordCount("DELIVERIES", 12);
+ assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 9);
+ assertDatabaseRecordCount(NEW_DELIVERY_DB_NAME, 12);
- assertDatabaseRecordCount("MESSAGE_METADATA", 12);
- assertDatabaseRecordCount("MESSAGE_CONTENT", 12);
+ assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12);
+ assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 12);
+ }
+
+ private void assertConfiguredObjects()
+ {
+ Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
+ assertEquals("Unexpected number of configured objects", 9, configuredObjects.size());
+
+ Set<Map<String, Object>> expected = new HashSet<Map<String, Object>>(9);
+ Map<String, Object> queue1 = new HashMap<String, Object>();
+ queue1.put("exclusive", Boolean.FALSE);
+ queue1.put("name", "myUpgradeQueue");
+ queue1.put("owner", null);
+ expected.add(queue1);
+ Map<String, Object> queue2 = new HashMap<String, Object>();
+ queue2.put("exclusive", Boolean.TRUE);
+ queue2.put("name", "clientid:mySelectorDurSubName");
+ queue2.put("owner", "clientid");
+ expected.add(queue2);
+ Map<String, Object> queue3 = new HashMap<String, Object>();
+ queue3.put("exclusive", Boolean.TRUE);
+ queue3.put("name", "clientid:myDurSubName");
+ queue3.put("owner", "clientid");
+ expected.add(queue3);
+
+ Map<String, Object> queueBinding1 = new HashMap<String, Object>();
+ queueBinding1.put("queue", UUIDGenerator.generateUUID("myUpgradeQueue", getVirtualHostName()).toString());
+ queueBinding1.put("name", "myUpgradeQueue");
+ queueBinding1.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString());
+ expected.add(queueBinding1);
+ Map<String, Object> queueBinding2 = new HashMap<String, Object>();
+ queueBinding2.put("queue", UUIDGenerator.generateUUID("myUpgradeQueue", getVirtualHostName()).toString());
+ queueBinding2.put("name", "myUpgradeQueue");
+ queueBinding2.put("exchange", UUIDGenerator.generateUUID("amq.direct", getVirtualHostName()).toString());
+ Map<String, Object> arguments2 = new HashMap<String, Object>();
+ arguments2.put("x-filter-jms-selector", "");
+ queueBinding2.put("arguments", arguments2);
+ expected.add(queueBinding2);
+ Map<String, Object> queueBinding3 = new HashMap<String, Object>();
+ queueBinding3.put("queue", UUIDGenerator.generateUUID("clientid:myDurSubName", getVirtualHostName()).toString());
+ queueBinding3.put("name", "myUpgradeTopic");
+ queueBinding3.put("exchange", UUIDGenerator.generateUUID("amq.topic", getVirtualHostName()).toString());
+ Map<String, Object> arguments3 = new HashMap<String, Object>();
+ arguments3.put("x-filter-jms-selector", "");
+ queueBinding3.put("arguments", arguments3);
+ expected.add(queueBinding3);
+ Map<String, Object> queueBinding4 = new HashMap<String, Object>();
+ queueBinding4.put("queue", UUIDGenerator.generateUUID("clientid:mySelectorDurSubName", getVirtualHostName()).toString());
+ queueBinding4.put("name", "mySelectorUpgradeTopic");
+ queueBinding4.put("exchange", UUIDGenerator.generateUUID("amq.topic", getVirtualHostName()).toString());
+ Map<String, Object> arguments4 = new HashMap<String, Object>();
+ arguments4.put("x-filter-jms-selector", "testprop='true'");
+ queueBinding4.put("arguments", arguments4);
+ expected.add(queueBinding4);
+ Map<String, Object> queueBinding5 = new HashMap<String, Object>();
+ queueBinding5.put("queue", UUIDGenerator.generateUUID("clientid:myDurSubName", getVirtualHostName()).toString());
+ queueBinding5.put("name", "clientid:myDurSubName");
+ queueBinding5.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString());
+ expected.add(queueBinding5);
+ Map<String, Object> queueBinding6 = new HashMap<String, Object>();
+ queueBinding6.put("queue", UUIDGenerator.generateUUID("clientid:mySelectorDurSubName", getVirtualHostName()).toString());
+ queueBinding6.put("name", "clientid:mySelectorDurSubName");
+ queueBinding6.put("exchange", UUIDGenerator.generateUUID("<<default>>", getVirtualHostName()).toString());
+ expected.add(queueBinding6);
+
+ Set<String> expectedTypes = new HashSet<String>();
+ expectedTypes.add(Queue.class.getName());
+ expectedTypes.add(Exchange.class.getName());
+ expectedTypes.add(Binding.class.getName());
+ MapJsonSerializer jsonSerializer = new MapJsonSerializer();
+ for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet())
+ {
+ UpgradeConfiguredObjectRecord object = entry.getValue();
+ UUID key = entry.getKey();
+ Map<String, Object> deserialized = jsonSerializer.deserialize(object.getAttributes());
+ assertTrue("Unexpected entry:" + object.getAttributes(), expected.remove(deserialized));
+ String type = object.getType();
+ assertTrue("Unexpected type:" + type, expectedTypes.contains(type));
+ if (type.equals(Exchange.class.getName()) || type.equals(Queue.class.getName()))
+ {
+ assertEquals("Unexpected key", key, UUIDGenerator.generateUUID(((String) deserialized.get("name")), getVirtualHostName()));
+ }
+ else
+ {
+ assertNotNull("Key cannot be null", key);
+ }
+ }
+ assertTrue("Not all expected configured objects found:" + expected, expected.isEmpty());
+ }
+
+ private Map<UUID, UpgradeConfiguredObjectRecord> loadConfiguredObjects()
+ {
+ final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjectsRecords = new HashMap<UUID, UpgradeConfiguredObjectRecord>();
+ final ConfiguredObjectBinding binding = new ConfiguredObjectBinding();
+ final UpgradeUUIDBinding uuidBinding = new UpgradeUUIDBinding();
+ CursorOperation configuredObjectsCursor = new CursorOperation()
+ {
+ @Override
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
+ {
+ UUID id = uuidBinding.entryToObject(key);
+ UpgradeConfiguredObjectRecord object = binding.entryToObject(value);
+ configuredObjectsRecords.put(id, object);
+ }
+ };
+ new DatabaseTemplate(_environment, CONFIGURED_OBJECTS_DB_NAME, null).run(configuredObjectsCursor);
+ return configuredObjectsRecords;
}
private void assertContent()
@@ -127,8 +381,8 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
{
@Override
- public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, DatabaseEntry key,
- DatabaseEntry value)
+ public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction,
+ DatabaseEntry key, DatabaseEntry value)
{
long id = LongBinding.entryToLong(key);
assertTrue("Unexpected id", id > 0);
@@ -136,6 +390,6 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase
assertNotNull("Unexpected content", content);
}
};
- new DatabaseTemplate(_environment, "MESSAGE_CONTENT", null).run(contentCursorOperation);
+ new DatabaseTemplate(_environment, NEW_CONTENT_DB_NAME, null).run(contentCursorOperation);
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
index 99c4b7ab5b..ba5ca842bf 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
@@ -50,7 +50,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase
public void setUp() throws Exception
{
super.setUp();
- _upgrader = new Upgrader(_environment);
+ _upgrader = new Upgrader(_environment, getVirtualHostName());
}
private int getStoreVersion()
@@ -105,12 +105,12 @@ public class UpgraderTest extends AbstractUpgradeTestCase
nonExistentStoreLocation.mkdir();
_environment = createEnvironment(nonExistentStoreLocation);
- _upgrader = new Upgrader(_environment);
+ _upgrader = new Upgrader(_environment, getVirtualHostName());
_upgrader.upgradeIfNecessary();
List<String> databaseNames = _environment.getDatabaseNames();
List<String> expectedDatabases = new ArrayList<String>();
- expectedDatabases.add("VERSION");
+ expectedDatabases.add(Upgrader.VERSION_DB_NAME);
assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames);
assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion());