summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-04-18 16:03:11 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-04-18 16:03:11 +0000
commit178745059c1265f8cae71f8b19caf448b580afb0 (patch)
tree8f427b369bed4fa9646cfe2c52b23bcdd38f3230 /qpid/java
parentef4adc559cc4b81a0c681807986c62fc0b9a13e4 (diff)
downloadqpid-python-178745059c1265f8cae71f8b19caf448b580afb0.tar.gz
QPID-5710 : [Java Broker] Use common creation/recovery mechanism for Bindings
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588501 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java48
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java18
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java28
-rw-r--r--qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java32
8 files changed, 145 insertions, 23 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
new file mode 100644
index 0000000000..4050f7675e
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.binding;
+
+import java.util.Map;
+
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class BindingFactory extends AbstractConfiguredObjectTypeFactory<BindingImpl>
+{
+ public BindingFactory()
+ {
+ super(BindingImpl.class);
+ }
+
+ @Override
+ protected BindingImpl createInstance(final Map<String, Object> attributes, final ConfiguredObject<?>... parents)
+ {
+ ExchangeImpl<?> exchange = (ExchangeImpl<?>) getParent(Exchange.class, parents);
+ AMQQueue<?> queue = (AMQQueue<?>) getParent(Queue.class, parents);
+ BindingImpl binding = new BindingImpl(attributes, queue, exchange);
+ exchange.addBinding(binding);
+ return binding;
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
index 634b250d31..a533c0bc75 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -81,7 +82,11 @@ public class BindingImpl
public BindingImpl(UUID id, Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange)
{
- super(parentsMap(queue,exchange),enhanceWithDurable(combineIdWithAttributes(id, attributes), queue, exchange),queue.getVirtualHost().getTaskExecutor());
+ this(enhanceWithDurable(combineIdWithAttributes(id,attributes), queue, exchange), queue, exchange);
+ }
+ public BindingImpl(Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange)
+ {
+ super(parentsMap(queue,exchange),attributes,queue.getVirtualHost().getTaskExecutor());
_bindingKey = (String)attributes.get(org.apache.qpid.server.model.Binding.NAME);
_queue = queue;
_exchange = exchange;
@@ -99,6 +104,17 @@ public class BindingImpl
}
+ @Override
+ protected void onCreate()
+ {
+ super.onCreate();
+ if (isDurable())
+ {
+ DurableConfigurationStoreHelper.createBinding(_queue.getVirtualHost().getDurableConfigurationStore(), this);
+ }
+
+ }
+
private static Map<String, Object> enhanceWithDurable(Map<String, Object> attributes,
final AMQQueue queue,
final ExchangeImpl exchange)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index b1dd6a3721..4f1643967e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -626,12 +626,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
true);
}
- @Override
- public void restoreBinding(final UUID id, final String bindingKey, final AMQQueue queue,
- final Map<String, Object> argumentMap)
- {
- makeBinding(id, bindingKey,queue, argumentMap,true, false);
- }
private void removeBinding(final BindingImpl binding)
{
@@ -713,18 +707,10 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
if (existingMapping == null)
{
BindingImpl b = new BindingImpl(id, attributes, queue, this);
- b.addStateChangeListener(_bindingListener);
- b.open();
+ b.create();
- if (b.isDurable() && !restore)
- {
- DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
- }
- _bindingsMap.put(bindingIdentifier, b);
- queue.addBinding(b);
- childAdded(b);
+ addBinding(b);
- doAddBinding(b);
return true;
}
@@ -742,6 +728,20 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
}
+ @Override
+ public void addBinding(final BindingImpl b)
+ {
+ b.addStateChangeListener(_bindingListener);
+
+ BindingIdentifier identifier = new BindingIdentifier(b.getName(), b.getAMQQueue());
+
+ _bindingsMap.put(identifier, b);
+ b.getAMQQueue().addBinding(b);
+ childAdded(b);
+
+ doAddBinding(b);
+ }
+
protected abstract void onBindingUpdated(final BindingImpl binding,
final Map<String, Object> oldArguments);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
index 57929b7306..38913762d8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
@@ -59,9 +59,6 @@ public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, Ex
AMQQueue queue,
Map<String, Object> arguments);
- void restoreBinding(UUID id, String bindingKey, AMQQueue queue,
- Map<String, Object> argumentMap);
-
void delete();
/**
@@ -114,6 +111,8 @@ public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, Ex
EventLogger getEventLogger();
+ void addBinding(BindingImpl binding);
+
public interface BindingListener
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
index 57062cb7a2..54a23a0389 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
@@ -63,6 +63,10 @@ public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory
{
_defaultTypes.put(categoryName, annotation.defaultType());
}
+ else
+ {
+ _defaultTypes.put(categoryName, categoryName);
+ }
}
if(categoryFactories.put(factory.getType(),factory) != null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
index 6e399d950e..526760aea7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.virtualhost;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -29,11 +30,16 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.UnresolvedConfiguredObject;
import org.apache.qpid.server.store.UnresolvedDependency;
import org.apache.qpid.server.store.UnresolvedObject;
@@ -41,11 +47,14 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
{
private static final Logger _logger = Logger.getLogger(BindingRecoverer.class);
- private final VirtualHostImpl _virtualHost;
+ private final VirtualHostImpl<?,?,?> _virtualHost;
+ private final ConfiguredObjectFactory _objectFactory;
- public BindingRecoverer(final VirtualHostImpl virtualHost)
+ public BindingRecoverer(final VirtualHostImpl<?,?,?> virtualHost)
{
_virtualHost = virtualHost;
+ Broker<?> broker = _virtualHost.getParent(Broker.class);
+ _objectFactory = broker.getObjectFactory();
}
@Override
@@ -67,6 +76,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
private final UUID _queueId;
private final UUID _exchangeId;
private final UUID _bindingId;
+ private final ConfiguredObjectRecord _record;
private List<UnresolvedDependency> _unresolvedDependencies =
new ArrayList<UnresolvedDependency>();
@@ -76,6 +86,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
public UnresolvedBinding(final ConfiguredObjectRecord record)
{
+ _record = record;
_bindingId = record.getId();
_exchangeId = record.getParents().get(Exchange.class.getSimpleName()).getId();
_queueId = record.getParents().get(Queue.class.getSimpleName()).getId();
@@ -90,6 +101,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
_unresolvedDependencies.add(new QueueDependency());
}
+
_bindingName = (String) record.getAttributes().get(org.apache.qpid.server.model.Binding.NAME);
_bindingArgumentsMap = (Map<String, Object>) record.getAttributes().get(org.apache.qpid.server.model.Binding.ARGUMENTS);
}
@@ -108,7 +120,17 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
_logger.info("Restoring binding: (Exchange: " + _exchange.getName() + ", Queue: " + _queue.getName()
+ ", Routing Key: " + _bindingName + ", Arguments: " + _bindingArgumentsMap + ")");
- _exchange.restoreBinding(_bindingId, _bindingName, _queue, _bindingArgumentsMap);
+
+ Map<String,Object> attributesWithId = new HashMap<String,Object>(_record.getAttributes());
+ attributesWithId.put(org.apache.qpid.server.model.Exchange.ID,_record.getId());
+ attributesWithId.put(org.apache.qpid.server.model.Exchange.DURABLE,true);
+
+ ConfiguredObjectTypeFactory<? extends Binding> configuredObjectTypeFactory =
+ _objectFactory.getConfiguredObjectTypeFactory(Binding.class, attributesWithId);
+ UnresolvedConfiguredObject<? extends Binding> unresolvedConfiguredObject =
+ configuredObjectTypeFactory.recover(_record, _exchange, _queue);
+ Binding binding = (Binding<?>) unresolvedConfiguredObject.resolve();
+
}
return (_exchange).getBinding(_bindingName, _queue);
}
diff --git a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
index cc000d49e6..3ec2b900e3 100644
--- a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
+++ b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
@@ -44,6 +44,7 @@ org.apache.qpid.server.exchange.DirectExchangeFactory
org.apache.qpid.server.exchange.FanoutExchangeFactory
org.apache.qpid.server.exchange.HeadersExchangeFactory
org.apache.qpid.server.exchange.TopicExchangeFactory
+org.apache.qpid.server.binding.BindingFactory
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 2822476b1c..49b2a61965 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -84,6 +84,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
private ConfiguredObjectFactory _configuredObjectFactory;
private ConfiguredObjectTypeFactory _exchangeFactory;
private ConfiguredObjectTypeFactory _queueFactory;
+ private ConfiguredObjectTypeFactory _bindingFactory;
@Override
public void setUp() throws Exception
@@ -92,6 +93,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
_configuredObjectFactory = mock(ConfiguredObjectFactory.class);
_exchangeFactory = mock(ConfiguredObjectTypeFactory.class);
_queueFactory = mock(ConfiguredObjectTypeFactory.class);
+ _bindingFactory = mock(ConfiguredObjectTypeFactory.class);
+
AMQQueue<?> queue = mock(AMQQueue.class);
@@ -109,6 +112,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Exchange.class), anyMap())).thenReturn(_exchangeFactory);
when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Queue.class), anyMap())).thenReturn(_queueFactory);
+ when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Binding.class), anyMap())).thenReturn(_bindingFactory);
+
final ArgumentCaptor<ConfiguredObjectRecord> recoveredExchange = ArgumentCaptor.forClass(ConfiguredObjectRecord.class);
@@ -169,6 +174,33 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
}).when(_queueFactory).recover(recoveredQueue.capture(), any(ConfiguredObject.class));
+ final ArgumentCaptor<ConfiguredObjectRecord> recoveredBinding = ArgumentCaptor.forClass(ConfiguredObjectRecord.class);
+ final ArgumentCaptor<ConfiguredObject> parent1 = ArgumentCaptor.forClass(ConfiguredObject.class);
+ final ArgumentCaptor<ConfiguredObject> parent2 = ArgumentCaptor.forClass(ConfiguredObject.class);
+
+ doAnswer(new Answer()
+ {
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ ConfiguredObjectRecord queueRecord = recoveredBinding.getValue();
+ Binding binding = mock(Binding.class);
+ UUID id = queueRecord.getId();
+ String name = (String) queueRecord.getAttributes().get("name");
+ when(binding.getId()).thenReturn(id);
+ when(binding.getName()).thenReturn(name);
+
+ UnresolvedConfiguredObject unresolved = mock(UnresolvedConfiguredObject.class);
+ when(unresolved.resolve()).thenReturn(binding);
+
+
+ return unresolved;
+ }
+ }).when(_bindingFactory).recover(recoveredBinding.capture(), parent1.capture(), parent2.capture());
+
+
+
DurableConfiguredObjectRecoverer[] recoverers = {
new QueueRecoverer(_vhost),
new ExchangeRecoverer(_vhost),