summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-02 15:26:42 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-02 15:26:42 +0000
commit706370f431afa3e812b94878fd9bc6a09a0920d5 (patch)
treefc3ab91d71d6a12f56de333eee696ea9d902c735 /qpid/java/bdbstore
parent296e94464d89b8d9affa0e73ba2d015a475dc88d (diff)
downloadqpid-python-706370f431afa3e812b94878fd9bc6a09a0920d5.tar.gz
QPID-4973 : [Java Broker] Refactor DurableConfigurationStore
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1498976 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java147
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java51
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java11
3 files changed, 71 insertions, 138 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index d7c8102c0e..f6b7e1790f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -51,16 +51,10 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.*;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
@@ -150,8 +144,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private final EventManager _eventManager = new EventManager();
private String _storeLocation;
- private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
-
private Map<String, String> _envConfigMap;
public AbstractBDBMessageStore()
@@ -434,17 +426,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
{
try
{
- List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
- _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
-
- QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
- _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
-
- BindingRecoveryHandler brh = qrh.completeQueueRecovery();
- _configuredObjectHelper.recoverBindings(brh, configuredObjects);
+ recoveryHandler.beginConfigurationRecovery(this);
+ loadConfiguredObjects(recoveryHandler);
- brh.completeBindingRecovery();
+ recoveryHandler.completeConfigurationRecovery();
}
catch (DatabaseException e)
{
@@ -453,10 +438,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
- private List<ConfiguredObjectRecord> loadConfiguredObjects() throws DatabaseException
+ private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException
{
Cursor cursor = null;
- List<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
try
{
cursor = _configuredObjectsDb.openCursor(null, null);
@@ -464,10 +448,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- ConfiguredObjectRecord configuredObject = ConfiguredObjectBinding.getInstance().entryToObject(value);
UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
- configuredObject.setId(id);
- results.add(configuredObject);
+
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value);
+ crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes());
}
}
@@ -475,7 +459,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
{
closeCursorSafely(cursor);
}
- return results;
}
private void closeCursorSafely(Cursor cursor)
@@ -743,111 +726,43 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
}
- /**
- * @see DurableConfigurationStore#createExchange(Exchange)
- */
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange);
- storeConfiguredObjectEntry(configuredObject);
- }
- }
-
- /**
- * @see DurableConfigurationStore#removeExchange(Exchange)
- */
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
- UUID id = exchange.getId();
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public void removeExchange(String name = " + exchange.getName() + ", uuid = " + id + "): called");
- }
- OperationStatus status = removeConfiguredObject(id);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + id + " not found");
- }
- }
-
-
- /**
- * @see DurableConfigurationStore#bindQueue(Binding)
- */
- public void bindQueue(Binding binding) throws AMQStoreException
+ @Override
+ public void create(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
if (_stateManager.isInState(State.ACTIVE))
{
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding);
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes);
storeConfiguredObjectEntry(configuredObject);
}
}
- /**
- * @see DurableConfigurationStore#unbindQueue(Binding)
- */
- public void unbindQueue(Binding binding)
- throws AMQStoreException
+ @Override
+ public void remove(UUID id, String type) throws AMQStoreException
{
- UUID id = binding.getId();
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("public void unbindQueue(Binding binding = " + binding + ", uuid = " + id + "): called");
+ LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called");
}
-
OperationStatus status = removeConfiguredObject(id);
if (status == OperationStatus.NOTFOUND)
{
- throw new AMQStoreException("Binding " + binding + " not found");
- }
- }
-
- /**
- * @see DurableConfigurationStore#createQueue(AMQQueue)
- */
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- createQueue(queue, null);
- }
-
- /**
- * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable)
- */
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- if (_stateManager.isInState(State.ACTIVE))
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + "), queue id" + queue.getId()
- + ", arguments=" + arguments + "): called");
- }
- ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments);
- storeConfiguredObjectEntry(configuredObject);
+ throw new AMQStoreException("Configured object of type " + type + " with id " + id + " not found");
}
}
- /**
- * Updates the specified queue in the persistent store, IF it is already present. If the queue
- * is not present in the store, it will not be added.
- *
- * @param queue The queue to update the entry for.
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void updateQueue(final AMQQueue queue) throws AMQStoreException
+ @Override
+ public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Updating queue: " + queue.getName());
+ LOGGER.debug("Updating " +type + ", id: " + id);
}
try
{
DatabaseEntry key = new DatabaseEntry();
UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
- keyBinding.objectToEntry(queue.getId(), key);
+ keyBinding.objectToEntry(id, key);
DatabaseEntry value = new DatabaseEntry();
DatabaseEntry newValue = new DatabaseEntry();
@@ -856,8 +771,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
- ConfiguredObjectRecord queueRecord = configuredObjectBinding.entryToObject(value);
- ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueRecord);
+ ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
// write the updated entry to the store
configuredObjectBinding.objectToEntry(newQueueRecord, newValue);
@@ -879,29 +793,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
/**
- * Removes the specified queue from the persistent store.
- *
- * @param queue The queue to remove.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void removeQueue(final AMQQueue queue) throws AMQStoreException
- {
- UUID id = queue.getId();
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public void removeQueue(AMQShortString name = " + queue.getName() + ", uuid = " + id + "): called");
- }
-
- OperationStatus status = removeConfiguredObject(id);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Queue " + queue.getName() + " with id " + id + " not found");
- }
- }
-
-
- /**
* Places a message onto a specified queue, in a given transaction.
*
* @param tx The transaction for the operation.
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
index 945bcf1d28..31cafbe74d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
@@ -20,38 +20,75 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuple;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.UUID;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord>
{
- private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding();
+ private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding(null);
+ private final UUID _uuid;
+
public static ConfiguredObjectBinding getInstance()
{
return INSTANCE;
}
- /** non-public constructor forces getInstance instead */
- private ConfiguredObjectBinding()
+ public ConfiguredObjectBinding(UUID uuid)
{
+ _uuid = uuid;
}
public ConfiguredObjectRecord entryToObject(TupleInput tupleInput)
{
String type = tupleInput.readString();
String json = tupleInput.readString();
- ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(null, type, json);
- return configuredObject;
+ ObjectMapper mapper = new ObjectMapper();
+ try
+ {
+ Map<String,Object> value = mapper.readValue(json, Map.class);
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(_uuid, type, value);
+ return configuredObject;
+ }
+ catch (IOException e)
+ {
+ //should never happen
+ throw new RuntimeException(e);
+ }
+
}
public void objectToEntry(ConfiguredObjectRecord object, TupleOutput tupleOutput)
{
- tupleOutput.writeString(object.getType());
- tupleOutput.writeString(object.getAttributes());
+ try
+ {
+ StringWriter writer = new StringWriter();
+ new ObjectMapper().writeValue(writer, object.getAttributes());
+ tupleOutput.writeString(object.getType());
+ tupleOutput.writeString(writer.toString());
+ }
+ catch (JsonMappingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
index 7f93f5691e..5a5d39081c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuple;
+import java.util.Collections;
+import java.util.Map;
import junit.framework.TestCase;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -33,7 +35,9 @@ public class ConfiguredObjectBindingTest extends TestCase
private ConfiguredObjectRecord _object;
- private static final String DUMMY_ATTRIBUTES_STRING = "dummyAttributes";
+ private static final Map<String, Object> DUMMY_ATTRIBUTES_MAP =
+ Collections.singletonMap("dummy",(Object) "attributes");
+
private static final String DUMMY_TYPE_STRING = "dummyType";
private ConfiguredObjectBinding _configuredObjectBinding;
@@ -42,7 +46,8 @@ public class ConfiguredObjectBindingTest extends TestCase
{
super.setUp();
_configuredObjectBinding = ConfiguredObjectBinding.getInstance();
- _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_STRING);
+ _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING,
+ DUMMY_ATTRIBUTES_MAP);
}
public void testObjectToEntryAndEntryToObject()
@@ -55,7 +60,7 @@ public class ConfiguredObjectBindingTest extends TestCase
TupleInput tupleInput = new TupleInput(entryAsBytes);
ConfiguredObjectRecord storedObject = _configuredObjectBinding.entryToObject(tupleInput);
- assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_STRING, storedObject.getAttributes());
+ assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_MAP, storedObject.getAttributes());
assertEquals("Unexpected type", DUMMY_TYPE_STRING, storedObject.getType());
}
}