diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-02 15:26:42 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-02 15:26:42 +0000 |
| commit | 706370f431afa3e812b94878fd9bc6a09a0920d5 (patch) | |
| tree | fc3ab91d71d6a12f56de333eee696ea9d902c735 /qpid/java/bdbstore | |
| parent | 296e94464d89b8d9affa0e73ba2d015a475dc88d (diff) | |
| download | qpid-python-706370f431afa3e812b94878fd9bc6a09a0920d5.tar.gz | |
QPID-4973 : [Java Broker] Refactor DurableConfigurationStore
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1498976 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
3 files changed, 71 insertions, 138 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index d7c8102c0e..f6b7e1790f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -51,16 +51,10 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.*; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; @@ -150,8 +144,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private final EventManager _eventManager = new EventManager(); private String _storeLocation; - private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper(); - private Map<String, String> _envConfigMap; public AbstractBDBMessageStore() @@ -434,17 +426,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo { try { - List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - ExchangeRecoveryHandler erh = recoveryHandler.begin(this); - _configuredObjectHelper.recoverExchanges(erh, configuredObjects); - - QueueRecoveryHandler qrh = erh.completeExchangeRecovery(); - _configuredObjectHelper.recoverQueues(qrh, configuredObjects); - - BindingRecoveryHandler brh = qrh.completeQueueRecovery(); - _configuredObjectHelper.recoverBindings(brh, configuredObjects); + recoveryHandler.beginConfigurationRecovery(this); + loadConfiguredObjects(recoveryHandler); - brh.completeBindingRecovery(); + recoveryHandler.completeConfigurationRecovery(); } catch (DatabaseException e) { @@ -453,10 +438,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } - private List<ConfiguredObjectRecord> loadConfiguredObjects() throws DatabaseException + private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException { Cursor cursor = null; - List<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>(); try { cursor = _configuredObjectsDb.openCursor(null, null); @@ -464,10 +448,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo DatabaseEntry value = new DatabaseEntry(); while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { - ConfiguredObjectRecord configuredObject = ConfiguredObjectBinding.getInstance().entryToObject(value); UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - configuredObject.setId(id); - results.add(configuredObject); + + ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value); + crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes()); } } @@ -475,7 +459,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo { closeCursorSafely(cursor); } - return results; } private void closeCursorSafely(Cursor cursor) @@ -743,111 +726,43 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } } - /** - * @see DurableConfigurationStore#createExchange(Exchange) - */ - public void createExchange(Exchange exchange) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange); - storeConfiguredObjectEntry(configuredObject); - } - } - - /** - * @see DurableConfigurationStore#removeExchange(Exchange) - */ - public void removeExchange(Exchange exchange) throws AMQStoreException - { - UUID id = exchange.getId(); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void removeExchange(String name = " + exchange.getName() + ", uuid = " + id + "): called"); - } - OperationStatus status = removeConfiguredObject(id); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + id + " not found"); - } - } - - - /** - * @see DurableConfigurationStore#bindQueue(Binding) - */ - public void bindQueue(Binding binding) throws AMQStoreException + @Override + public void create(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException { if (_stateManager.isInState(State.ACTIVE)) { - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes); storeConfiguredObjectEntry(configuredObject); } } - /** - * @see DurableConfigurationStore#unbindQueue(Binding) - */ - public void unbindQueue(Binding binding) - throws AMQStoreException + @Override + public void remove(UUID id, String type) throws AMQStoreException { - UUID id = binding.getId(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("public void unbindQueue(Binding binding = " + binding + ", uuid = " + id + "): called"); + LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called"); } - OperationStatus status = removeConfiguredObject(id); if (status == OperationStatus.NOTFOUND) { - throw new AMQStoreException("Binding " + binding + " not found"); - } - } - - /** - * @see DurableConfigurationStore#createQueue(AMQQueue) - */ - public void createQueue(AMQQueue queue) throws AMQStoreException - { - createQueue(queue, null); - } - - /** - * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable) - */ - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException - { - if (_stateManager.isInState(State.ACTIVE)) - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + "), queue id" + queue.getId() - + ", arguments=" + arguments + "): called"); - } - ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments); - storeConfiguredObjectEntry(configuredObject); + throw new AMQStoreException("Configured object of type " + type + " with id " + id + " not found"); } } - /** - * Updates the specified queue in the persistent store, IF it is already present. If the queue - * is not present in the store, it will not be added. - * - * @param queue The queue to update the entry for. - * @throws AMQStoreException If the operation fails for any reason. - */ - public void updateQueue(final AMQQueue queue) throws AMQStoreException + @Override + public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Updating queue: " + queue.getName()); + LOGGER.debug("Updating " +type + ", id: " + id); } try { DatabaseEntry key = new DatabaseEntry(); UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); - keyBinding.objectToEntry(queue.getId(), key); + keyBinding.objectToEntry(id, key); DatabaseEntry value = new DatabaseEntry(); DatabaseEntry newValue = new DatabaseEntry(); @@ -856,8 +771,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT); if (status == OperationStatus.SUCCESS) { - ConfiguredObjectRecord queueRecord = configuredObjectBinding.entryToObject(value); - ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueRecord); + ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); // write the updated entry to the store configuredObjectBinding.objectToEntry(newQueueRecord, newValue); @@ -879,29 +793,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } /** - * Removes the specified queue from the persistent store. - * - * @param queue The queue to remove. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - public void removeQueue(final AMQQueue queue) throws AMQStoreException - { - UUID id = queue.getId(); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public void removeQueue(AMQShortString name = " + queue.getName() + ", uuid = " + id + "): called"); - } - - OperationStatus status = removeConfiguredObject(id); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Queue " + queue.getName() + " with id " + id + " not found"); - } - } - - - /** * Places a message onto a specified queue, in a given transaction. * * @param tx The transaction for the operation. diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java index 945bcf1d28..31cafbe74d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java @@ -20,38 +20,75 @@ */ package org.apache.qpid.server.store.berkeleydb.tuple; +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.store.ConfiguredObjectRecord; import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord> { - private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding(); + private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding(null); + private final UUID _uuid; + public static ConfiguredObjectBinding getInstance() { return INSTANCE; } - /** non-public constructor forces getInstance instead */ - private ConfiguredObjectBinding() + public ConfiguredObjectBinding(UUID uuid) { + _uuid = uuid; } public ConfiguredObjectRecord entryToObject(TupleInput tupleInput) { String type = tupleInput.readString(); String json = tupleInput.readString(); - ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(null, type, json); - return configuredObject; + ObjectMapper mapper = new ObjectMapper(); + try + { + Map<String,Object> value = mapper.readValue(json, Map.class); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(_uuid, type, value); + return configuredObject; + } + catch (IOException e) + { + //should never happen + throw new RuntimeException(e); + } + } public void objectToEntry(ConfiguredObjectRecord object, TupleOutput tupleOutput) { - tupleOutput.writeString(object.getType()); - tupleOutput.writeString(object.getAttributes()); + try + { + StringWriter writer = new StringWriter(); + new ObjectMapper().writeValue(writer, object.getAttributes()); + tupleOutput.writeString(object.getType()); + tupleOutput.writeString(writer.toString()); + } + catch (JsonMappingException e) + { + throw new RuntimeException(e); + } + catch (JsonGenerationException e) + { + throw new RuntimeException(e); + } + catch (IOException e) + { + throw new RuntimeException(e); + } } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java index 7f93f5691e..5a5d39081c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.store.berkeleydb.tuple; +import java.util.Collections; +import java.util.Map; import junit.framework.TestCase; import org.apache.qpid.server.model.UUIDGenerator; @@ -33,7 +35,9 @@ public class ConfiguredObjectBindingTest extends TestCase private ConfiguredObjectRecord _object; - private static final String DUMMY_ATTRIBUTES_STRING = "dummyAttributes"; + private static final Map<String, Object> DUMMY_ATTRIBUTES_MAP = + Collections.singletonMap("dummy",(Object) "attributes"); + private static final String DUMMY_TYPE_STRING = "dummyType"; private ConfiguredObjectBinding _configuredObjectBinding; @@ -42,7 +46,8 @@ public class ConfiguredObjectBindingTest extends TestCase { super.setUp(); _configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_STRING); + _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, + DUMMY_ATTRIBUTES_MAP); } public void testObjectToEntryAndEntryToObject() @@ -55,7 +60,7 @@ public class ConfiguredObjectBindingTest extends TestCase TupleInput tupleInput = new TupleInput(entryAsBytes); ConfiguredObjectRecord storedObject = _configuredObjectBinding.entryToObject(tupleInput); - assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_STRING, storedObject.getAttributes()); + assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_MAP, storedObject.getAttributes()); assertEquals("Unexpected type", DUMMY_TYPE_STRING, storedObject.getType()); } } |
