diff options
Diffstat (limited to 'qpid/java')
6 files changed, 44 insertions, 38 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 8e81e9a7b0..3eeac71ebf 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -60,6 +60,7 @@ import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.*; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; @@ -413,16 +414,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore try { List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - QueueRecoveryHandler qrh = recoveryHandler.begin(this); - _configuredObjectHelper.recoverQueues(qrh, configuredObjects); - - ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); + ExchangeRecoveryHandler erh = recoveryHandler.begin(this); _configuredObjectHelper.recoverExchanges(erh, configuredObjects); - BindingRecoveryHandler brh = erh.completeExchangeRecovery(); + QueueRecoveryHandler qrh = erh.completeExchangeRecovery(); + _configuredObjectHelper.recoverQueues(qrh, configuredObjects); + + BindingRecoveryHandler brh = qrh.completeQueueRecovery(); _configuredObjectHelper.recoverBindings(brh, configuredObjects); - ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); + BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); recoverBrokerLinks(lrh); } catch (DatabaseException e) diff --git a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index ad6777d0ea..8ae4fec975 100644 --- a/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -261,13 +261,18 @@ public class QueueManagementTest extends QpidBrokerTestCase */ public void testAlternateExchangeSurvivesRestart() throws Exception { + String nonMandatoryExchangeName = "exch" + getName(); + + final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + managedBroker.createNewExchange(nonMandatoryExchangeName, "fanout", true); + String queueName1 = getTestQueueName() + "1"; String altExchange1 = "amq.fanout"; String addr1WithAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName1, altExchange1); Queue queue1 = _session.createQueue(addr1WithAltExch); String queueName2 = getTestQueueName() + "2"; - String addr2WithoutAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue,}}", queueName2); + String addr2WithoutAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue}}", queueName2); Queue queue2 = _session.createQueue(addr2WithoutAltExch); createQueueOnBroker(queue1); @@ -279,7 +284,7 @@ public class QueueManagementTest extends QpidBrokerTestCase ManagedQueue managedQueue2 = _jmxUtils.getManagedQueue(queueName2); assertNull("Newly created queue2 does not have expected alternate exchange", managedQueue2.getAlternateExchange()); - String altExchange2 = "amq.fanout"; + String altExchange2 = nonMandatoryExchangeName; managedQueue2.setAlternateExchange(altExchange2); restartBroker(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java index f1053f60ad..ede01d247e 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -28,20 +28,21 @@ import java.util.UUID; public interface ConfigurationRecoveryHandler { - QueueRecoveryHandler begin(MessageStore store); + ExchangeRecoveryHandler begin(MessageStore store); - public static interface QueueRecoveryHandler + public static interface ExchangeRecoveryHandler { - void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId); - ExchangeRecoveryHandler completeQueueRecovery(); + void exchange(UUID id, String exchangeName, String type, boolean autoDelete); + QueueRecoveryHandler completeExchangeRecovery(); } - public static interface ExchangeRecoveryHandler + public static interface QueueRecoveryHandler { - void exchange(UUID id, String exchangeName, String type, boolean autoDelete); - BindingRecoveryHandler completeExchangeRecovery(); + void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId); + BindingRecoveryHandler completeQueueRecovery(); } + public static interface BindingRecoveryHandler { void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index ab374b4917..281522c0ef 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -60,6 +60,7 @@ import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectHelper; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.Event; @@ -78,6 +79,9 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; /** * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence @@ -558,16 +562,17 @@ public class DerbyMessageStore implements MessageStore try { List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this); - _configuredObjectHelper.recoverQueues(qrh, configuredObjects); - ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); + ExchangeRecoveryHandler erh = recoveryHandler.begin(this); _configuredObjectHelper.recoverExchanges(erh, configuredObjects); - ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery(); + QueueRecoveryHandler qrh = erh.completeExchangeRecovery(); + _configuredObjectHelper.recoverQueues(qrh, configuredObjects); + + BindingRecoveryHandler brh = qrh.completeQueueRecovery(); _configuredObjectHelper.recoverBindings(brh, configuredObjects); - ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); + BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); recoverBrokerLinks(lrh); } catch (SQLException e) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index acd6101ff8..ea2f0f15e4 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -76,15 +76,12 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa private final VirtualHost _virtualHost; - private MessageStoreLogSubject _logSubject; - - private MessageStore _store; - private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); - private Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>(); - private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>(); - + private final Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>(); + private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>(); + private MessageStoreLogSubject _logSubject; + private MessageStore _store; public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost) { @@ -131,12 +128,12 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e); } } - public ExchangeRecoveryHandler completeQueueRecovery() + @Override + public BindingRecoveryHandler completeQueueRecovery() { return this; } @@ -156,19 +153,17 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + throw new RuntimeException("Error recovering exchange uuid " + id + " name " + exchangeName, e); } } - public BindingRecoveryHandler completeExchangeRecovery() + public QueueRecoveryHandler completeExchangeRecovery() { return this; } public StoredMessageRecoveryHandler begin() { - // TODO - log begin return this; } @@ -193,7 +188,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void completeMessageRecovery() { - //TODO - log end } public BridgeRecoveryHandler brokerLink(final UUID id, diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java index 6dd1aa7dbf..9c8f525120 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java @@ -113,9 +113,9 @@ public class DurableConfigurationStoreTest extends QpidTestCase _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); - when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_queueRecoveryHandler); - when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_exchangeRecoveryHandler); - when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_bindingRecoveryHandler); + when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler); + when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_queueRecoveryHandler); + when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_bindingRecoveryHandler); when(_bindingRecoveryHandler.completeBindingRecovery()).thenReturn(_linkRecoveryHandler); when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); |
