diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-04-18 16:03:11 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-04-18 16:03:11 +0000 |
| commit | 178745059c1265f8cae71f8b19caf448b580afb0 (patch) | |
| tree | 8f427b369bed4fa9646cfe2c52b23bcdd38f3230 /qpid/java | |
| parent | ef4adc559cc4b81a0c681807986c62fc0b9a13e4 (diff) | |
| download | qpid-python-178745059c1265f8cae71f8b19caf448b580afb0.tar.gz | |
QPID-5710 : [Java Broker] Use common creation/recovery mechanism for Bindings
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588501 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
8 files changed, 145 insertions, 23 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java new file mode 100644 index 0000000000..4050f7675e --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.binding; + +import java.util.Map; + +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.queue.AMQQueue; + +public class BindingFactory extends AbstractConfiguredObjectTypeFactory<BindingImpl> +{ + public BindingFactory() + { + super(BindingImpl.class); + } + + @Override + protected BindingImpl createInstance(final Map<String, Object> attributes, final ConfiguredObject<?>... parents) + { + ExchangeImpl<?> exchange = (ExchangeImpl<?>) getParent(Exchange.class, parents); + AMQQueue<?> queue = (AMQQueue<?>) getParent(Queue.class, parents); + BindingImpl binding = new BindingImpl(attributes, queue, exchange); + exchange.addBinding(binding); + return binding; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index 634b250d31..a533c0bc75 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -81,7 +82,11 @@ public class BindingImpl public BindingImpl(UUID id, Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange) { - super(parentsMap(queue,exchange),enhanceWithDurable(combineIdWithAttributes(id, attributes), queue, exchange),queue.getVirtualHost().getTaskExecutor()); + this(enhanceWithDurable(combineIdWithAttributes(id,attributes), queue, exchange), queue, exchange); + } + public BindingImpl(Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange) + { + super(parentsMap(queue,exchange),attributes,queue.getVirtualHost().getTaskExecutor()); _bindingKey = (String)attributes.get(org.apache.qpid.server.model.Binding.NAME); _queue = queue; _exchange = exchange; @@ -99,6 +104,17 @@ public class BindingImpl } + @Override + protected void onCreate() + { + super.onCreate(); + if (isDurable()) + { + DurableConfigurationStoreHelper.createBinding(_queue.getVirtualHost().getDurableConfigurationStore(), this); + } + + } + private static Map<String, Object> enhanceWithDurable(Map<String, Object> attributes, final AMQQueue queue, final ExchangeImpl exchange) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index b1dd6a3721..4f1643967e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -626,12 +626,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> true); } - @Override - public void restoreBinding(final UUID id, final String bindingKey, final AMQQueue queue, - final Map<String, Object> argumentMap) - { - makeBinding(id, bindingKey,queue, argumentMap,true, false); - } private void removeBinding(final BindingImpl binding) { @@ -713,18 +707,10 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> if (existingMapping == null) { BindingImpl b = new BindingImpl(id, attributes, queue, this); - b.addStateChangeListener(_bindingListener); - b.open(); + b.create(); - if (b.isDurable() && !restore) - { - DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b); - } - _bindingsMap.put(bindingIdentifier, b); - queue.addBinding(b); - childAdded(b); + addBinding(b); - doAddBinding(b); return true; } @@ -742,6 +728,20 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } } + @Override + public void addBinding(final BindingImpl b) + { + b.addStateChangeListener(_bindingListener); + + BindingIdentifier identifier = new BindingIdentifier(b.getName(), b.getAMQQueue()); + + _bindingsMap.put(identifier, b); + b.getAMQQueue().addBinding(b); + childAdded(b); + + doAddBinding(b); + } + protected abstract void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java index 57929b7306..38913762d8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java @@ -59,9 +59,6 @@ public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, Ex AMQQueue queue, Map<String, Object> arguments); - void restoreBinding(UUID id, String bindingKey, AMQQueue queue, - Map<String, Object> argumentMap); - void delete(); /** @@ -114,6 +111,8 @@ public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, Ex EventLogger getEventLogger(); + void addBinding(BindingImpl binding); + public interface BindingListener { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java index 57062cb7a2..54a23a0389 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java @@ -63,6 +63,10 @@ public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory { _defaultTypes.put(categoryName, annotation.defaultType()); } + else + { + _defaultTypes.put(categoryName, categoryName); + } } if(categoryFactories.put(factory.getType(),factory) != null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java index 6e399d950e..526760aea7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.virtualhost; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -29,11 +30,16 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.UnresolvedConfiguredObject; import org.apache.qpid.server.store.UnresolvedDependency; import org.apache.qpid.server.store.UnresolvedObject; @@ -41,11 +47,14 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B { private static final Logger _logger = Logger.getLogger(BindingRecoverer.class); - private final VirtualHostImpl _virtualHost; + private final VirtualHostImpl<?,?,?> _virtualHost; + private final ConfiguredObjectFactory _objectFactory; - public BindingRecoverer(final VirtualHostImpl virtualHost) + public BindingRecoverer(final VirtualHostImpl<?,?,?> virtualHost) { _virtualHost = virtualHost; + Broker<?> broker = _virtualHost.getParent(Broker.class); + _objectFactory = broker.getObjectFactory(); } @Override @@ -67,6 +76,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B private final UUID _queueId; private final UUID _exchangeId; private final UUID _bindingId; + private final ConfiguredObjectRecord _record; private List<UnresolvedDependency> _unresolvedDependencies = new ArrayList<UnresolvedDependency>(); @@ -76,6 +86,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B public UnresolvedBinding(final ConfiguredObjectRecord record) { + _record = record; _bindingId = record.getId(); _exchangeId = record.getParents().get(Exchange.class.getSimpleName()).getId(); _queueId = record.getParents().get(Queue.class.getSimpleName()).getId(); @@ -90,6 +101,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B _unresolvedDependencies.add(new QueueDependency()); } + _bindingName = (String) record.getAttributes().get(org.apache.qpid.server.model.Binding.NAME); _bindingArgumentsMap = (Map<String, Object>) record.getAttributes().get(org.apache.qpid.server.model.Binding.ARGUMENTS); } @@ -108,7 +120,17 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B _logger.info("Restoring binding: (Exchange: " + _exchange.getName() + ", Queue: " + _queue.getName() + ", Routing Key: " + _bindingName + ", Arguments: " + _bindingArgumentsMap + ")"); - _exchange.restoreBinding(_bindingId, _bindingName, _queue, _bindingArgumentsMap); + + Map<String,Object> attributesWithId = new HashMap<String,Object>(_record.getAttributes()); + attributesWithId.put(org.apache.qpid.server.model.Exchange.ID,_record.getId()); + attributesWithId.put(org.apache.qpid.server.model.Exchange.DURABLE,true); + + ConfiguredObjectTypeFactory<? extends Binding> configuredObjectTypeFactory = + _objectFactory.getConfiguredObjectTypeFactory(Binding.class, attributesWithId); + UnresolvedConfiguredObject<? extends Binding> unresolvedConfiguredObject = + configuredObjectTypeFactory.recover(_record, _exchange, _queue); + Binding binding = (Binding<?>) unresolvedConfiguredObject.resolve(); + } return (_exchange).getBinding(_bindingName, _queue); } diff --git a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory index cc000d49e6..3ec2b900e3 100644 --- a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory +++ b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory @@ -44,6 +44,7 @@ org.apache.qpid.server.exchange.DirectExchangeFactory org.apache.qpid.server.exchange.FanoutExchangeFactory org.apache.qpid.server.exchange.HeadersExchangeFactory org.apache.qpid.server.exchange.TopicExchangeFactory +org.apache.qpid.server.binding.BindingFactory diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 2822476b1c..49b2a61965 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -84,6 +84,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase private ConfiguredObjectFactory _configuredObjectFactory; private ConfiguredObjectTypeFactory _exchangeFactory; private ConfiguredObjectTypeFactory _queueFactory; + private ConfiguredObjectTypeFactory _bindingFactory; @Override public void setUp() throws Exception @@ -92,6 +93,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _configuredObjectFactory = mock(ConfiguredObjectFactory.class); _exchangeFactory = mock(ConfiguredObjectTypeFactory.class); _queueFactory = mock(ConfiguredObjectTypeFactory.class); + _bindingFactory = mock(ConfiguredObjectTypeFactory.class); + AMQQueue<?> queue = mock(AMQQueue.class); @@ -109,6 +112,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Exchange.class), anyMap())).thenReturn(_exchangeFactory); when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Queue.class), anyMap())).thenReturn(_queueFactory); + when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Binding.class), anyMap())).thenReturn(_bindingFactory); + final ArgumentCaptor<ConfiguredObjectRecord> recoveredExchange = ArgumentCaptor.forClass(ConfiguredObjectRecord.class); @@ -169,6 +174,33 @@ public class DurableConfigurationRecovererTest extends QpidTestCase }).when(_queueFactory).recover(recoveredQueue.capture(), any(ConfiguredObject.class)); + final ArgumentCaptor<ConfiguredObjectRecord> recoveredBinding = ArgumentCaptor.forClass(ConfiguredObjectRecord.class); + final ArgumentCaptor<ConfiguredObject> parent1 = ArgumentCaptor.forClass(ConfiguredObject.class); + final ArgumentCaptor<ConfiguredObject> parent2 = ArgumentCaptor.forClass(ConfiguredObject.class); + + doAnswer(new Answer() + { + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + ConfiguredObjectRecord queueRecord = recoveredBinding.getValue(); + Binding binding = mock(Binding.class); + UUID id = queueRecord.getId(); + String name = (String) queueRecord.getAttributes().get("name"); + when(binding.getId()).thenReturn(id); + when(binding.getName()).thenReturn(name); + + UnresolvedConfiguredObject unresolved = mock(UnresolvedConfiguredObject.class); + when(unresolved.resolve()).thenReturn(binding); + + + return unresolved; + } + }).when(_bindingFactory).recover(recoveredBinding.capture(), parent1.capture(), parent2.capture()); + + + DurableConfiguredObjectRecoverer[] recoverers = { new QueueRecoverer(_vhost), new ExchangeRecoverer(_vhost), |
