summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-21 20:15:20 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-21 20:15:20 +0000
commit07bcf1792bd28910baaa870f590e2d36b303aa1b (patch)
treee31117964a590508e9ce8a9a0b60fc8859b88087 /qpid/java
parent7744a78e8f5c120c4eac13b0fa2f780de542ca26 (diff)
downloadqpid-python-07bcf1792bd28910baaa870f590e2d36b303aa1b.tar.gz
QPID-5577 : [Java Broker] create exchanges using an attribute map rather than multiple construction parameters
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1570697 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java84
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java40
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java61
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java15
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java13
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java60
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java15
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java15
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java13
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java18
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java15
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java46
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java14
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java8
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java14
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java12
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java21
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java8
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java49
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java7
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java25
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java11
33 files changed, 385 insertions, 302 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index a6aad93b27..21d8da0b94 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -42,6 +43,8 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -55,7 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-public abstract class AbstractExchange implements Exchange
+public abstract class AbstractExchange<T extends Exchange> implements Exchange<T>
{
private static final Logger _logger = Logger.getLogger(AbstractExchange.class);
private String _name;
@@ -79,7 +82,6 @@ public abstract class AbstractExchange implements Exchange
private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>();
private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
- private final ExchangeType<? extends Exchange> _type;
private UUID _id;
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
private final AtomicLong _receivedMessageCount = new AtomicLong();
@@ -94,36 +96,63 @@ public abstract class AbstractExchange implements Exchange
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
- public AbstractExchange(final ExchangeType<? extends Exchange> type)
+ public AbstractExchange(VirtualHost vhost, Map<String, Object> attributes) throws UnknownExchangeException
{
- _type = type;
- }
+ _virtualHost = vhost;
- @Override
- public String getTypeName()
- {
- return _type.getType();
- }
-
- public void initialise(UUID id,
- VirtualHost host,
- String name,
- boolean durable,
- boolean autoDelete)
- {
- _virtualHost = host;
- _name = name;
- _durable = durable;
- _autoDelete = autoDelete;
-
- _id = id;
+ _id = MapValueConverter.getUUIDAttribute(org.apache.qpid.server.model.Exchange.ID, attributes);
+ _name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributes);
+ _durable = MapValueConverter.getBooleanAttribute(org.apache.qpid.server.model.Exchange.DURABLE, attributes);
+ _autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
_logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
// check ACL
- host.getSecurityManager().authoriseCreateExchange(this);
+ _virtualHost.getSecurityManager().authoriseCreateExchange(this);
+
+ Object alternateExchangeAttr = attributes.get(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE);
+ if(alternateExchangeAttr != null)
+ {
+ if(alternateExchangeAttr instanceof Exchange)
+ {
+ _alternateExchange = (Exchange) alternateExchangeAttr;
+ }
+ else if(alternateExchangeAttr instanceof UUID)
+ {
+ _alternateExchange = vhost.getExchange((UUID)alternateExchangeAttr);
+ }
+ else if(alternateExchangeAttr instanceof String)
+ {
+ _alternateExchange = vhost.getExchange((String)alternateExchangeAttr);
+ if(_alternateExchange == null)
+ {
+ try
+ {
+ UUID altExcAsUUID = UUID.fromString((String)alternateExchangeAttr);
+ _alternateExchange = vhost.getExchange(altExcAsUUID);
+ }
+ catch (IllegalArgumentException e)
+ {
+ // ignore - we'll throw an exception shortly because _alternateExchange will be null
+ }
+ }
+ }
+ if(_alternateExchange == null)
+ {
+ throw new UnknownExchangeException(alternateExchangeAttr.toString());
+ }
+
+ }
// Log Exchange creation
- CurrentActor.get().message(ExchangeMessages.CREATED(getType().getType(), name, durable));
+ CurrentActor.get().message(ExchangeMessages.CREATED(getType().getType(), _name, _durable));
+ }
+
+ public abstract ExchangeType<T> getType();
+
+ @Override
+ public String getTypeName()
+ {
+ return getType().getType();
}
public boolean isDurable()
@@ -365,11 +394,6 @@ public abstract class AbstractExchange implements Exchange
return _name.toString();
}
- public ExchangeType getType()
- {
- return _type;
- }
-
public Map<String, Object> getArguments()
{
return Collections.emptyMap();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index cc6131f6b5..3c001f8b31 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -47,7 +47,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class DefaultExchange implements Exchange
+public class DefaultExchange implements Exchange<DirectExchange>
{
private final QueueRegistry _queueRegistry;
@@ -56,24 +56,13 @@ public class DefaultExchange implements Exchange
private static final Logger _logger = Logger.getLogger(DefaultExchange.class);
private final AtomicBoolean _closed = new AtomicBoolean();
- private LogSubject _logSubject;
private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>();
- public DefaultExchange(QueueRegistry queueRegistry)
+ public DefaultExchange(VirtualHost virtualHost, QueueRegistry queueRegistry, UUID id)
{
+ _virtualHost = virtualHost;
_queueRegistry = queueRegistry;
- }
-
-
- @Override
- public void initialise(UUID id,
- VirtualHost host,
- String name,
- boolean durable,
- boolean autoDelete)
- {
_id = id;
- _virtualHost = host;
}
@Override
@@ -83,7 +72,7 @@ public class DefaultExchange implements Exchange
}
@Override
- public ExchangeType getType()
+ public ExchangeType<DirectExchange> getType()
{
return DirectExchange.TYPE;
}
@@ -195,12 +184,7 @@ public class DefaultExchange implements Exchange
@Override
public void close()
{
- if(_closed.compareAndSet(false,true))
- {
-
- CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED());
-
- }
+ throw new AccessControlException("Cannot close the default exchange");
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index 80aa4fa49c..daf85e6d55 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -23,12 +23,13 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -96,41 +97,24 @@ public class DefaultExchangeFactory implements ExchangeFactory
return _exchangeClassMap.values();
}
- public Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes()
- {
- Collection<ExchangeType<? extends Exchange>> publicTypes =
- new ArrayList<ExchangeType<? extends Exchange>>();
- publicTypes.addAll(_exchangeClassMap.values());
-
- return publicTypes;
- }
-
- public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete)
- throws AMQUnknownExchangeType
- {
-
- UUID id = UUIDGenerator.generateExchangeUUID(exchange, _host.getName());
- return createExchange(id, exchange, type, durable, autoDelete);
- }
-
- public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete)
- throws AMQUnknownExchangeType
+ @Override
+ public Exchange createExchange(final Map<String, Object> attributes)
+ throws AMQUnknownExchangeType, UnknownExchangeException
{
-
+ String type = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributes);
ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type);
if (exchType == null)
{
throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null);
}
-
- Exchange e = exchType.newInstance(id, _host, exchange, durable, autoDelete);
- return e;
+ return exchType.newInstance(_host, attributes);
}
@Override
- public Exchange restoreExchange(UUID id, String exchange, String type, boolean autoDelete)
- throws AMQUnknownExchangeType
+ public Exchange restoreExchange(Map<String,Object> attributes)
+ throws AMQUnknownExchangeType, UnknownExchangeException
{
- return createExchange(id, exchange, type, true, autoDelete);
+ return createExchange(attributes);
+
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index ffd515e385..1dfa23f657 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -26,11 +26,16 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -60,17 +65,58 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public void initialise(ExchangeFactory exchangeFactory)
{
//create 'standard' exchanges:
- new ExchangeInitialiser().initialise(exchangeFactory, this, getDurableConfigurationStore());
+ initialiseExchanges(exchangeFactory, getDurableConfigurationStore());
- _defaultExchange = new DefaultExchange(_queueRegistry);
+ _defaultExchange =
+ new DefaultExchange(_host, _queueRegistry,
+ UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME,
+ _host.getName()));
- UUID defaultExchangeId =
- UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME, _host.getName());
- _defaultExchange.initialise(defaultExchangeId, _host, ExchangeDefaults.DEFAULT_EXCHANGE_NAME,false, false);
+ }
+
+ private void initialiseExchanges(ExchangeFactory factory, DurableConfigurationStore store)
+ {
+ for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes())
+ {
+ defineExchange(factory, type.getDefaultExchangeName(), type.getType(), store);
+ }
}
+ private void defineExchange(ExchangeFactory f, String name, String type, DurableConfigurationStore store)
+ {
+ try
+ {
+ if(getExchange(name) == null)
+ {
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(org.apache.qpid.server.model.Exchange.ID,
+ UUIDGenerator.generateExchangeUUID(name, _host.getName()));
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME, name);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type);
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
+ Exchange exchange = f.createExchange(attributes);
+ registerExchange(exchange);
+ if(exchange.isDurable())
+ {
+ DurableConfigurationStoreHelper.createExchange(store, exchange);
+ }
+ }
+ }
+ catch (AMQUnknownExchangeType e)
+ {
+ throw new ServerScopedRuntimeException("Unknown exchange type while attempting to initialise exchanges - " +
+ "this is because necessary jar files are not on the classpath", e);
+ }
+ catch (UnknownExchangeException e)
+ {
+ throw new ServerScopedRuntimeException("Unknown alternate exchange type while attempting to initialise " +
+ "a mandatory exchange which should not have an alternate: '" +
+ name + "'");
+ }
+ }
+
public DurableConfigurationStore getDurableConfigurationStore()
{
return _host.getDurableConfigurationStore();
@@ -89,11 +135,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
}
}
- public void setDefaultExchange(Exchange exchange)
- {
- _defaultExchange = exchange;
- }
-
public Exchange getDefaultExchange()
{
return _defaultExchange;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index e2db7611eb..f9e654f1f2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -35,6 +35,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.Collections;
@@ -42,7 +44,7 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
-public class DirectExchange extends AbstractExchange
+public class DirectExchange extends AbstractExchange<DirectExchange>
{
private static final Logger _logger = Logger.getLogger(DirectExchange.class);
@@ -128,9 +130,16 @@ public class DirectExchange extends AbstractExchange
public static final ExchangeType<DirectExchange> TYPE = new DirectExchangeType();
- public DirectExchange()
+ public DirectExchange(final VirtualHost vhost,
+ final Map<String, Object> attributes) throws UnknownExchangeException
{
- super(TYPE);
+ super(vhost, attributes);
+ }
+
+ @Override
+ public ExchangeType<DirectExchange> getType()
+ {
+ return TYPE;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
index 3a4f7a9a3b..8e7b392fe9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.Map;
import java.util.UUID;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class DirectExchangeType implements ExchangeType<DirectExchange>
@@ -34,14 +36,11 @@ public class DirectExchangeType implements ExchangeType<DirectExchange>
return ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
}
- public DirectExchange newInstance(UUID id, VirtualHost host,
- String name,
- boolean durable,
- boolean autoDelete)
+ @Override
+ public DirectExchange newInstance(final VirtualHost virtualHost,
+ final Map<String, Object> attributes) throws UnknownExchangeException
{
- DirectExchange exch = new DirectExchange();
- exch.initialise(id, host,name,durable, autoDelete);
- return exch;
+ return new DirectExchange(virtualHost, attributes);
}
public String getDefaultExchangeName()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 5625a0aca4..7cce4fde88 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -30,16 +30,14 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
-public interface Exchange extends ExchangeReferrer, MessageDestination
+public interface Exchange<T extends Exchange> extends ExchangeReferrer, MessageDestination
{
- void initialise(UUID id, VirtualHost host, String name, boolean durable, boolean autoDelete);
-
UUID getId();
String getName();
- ExchangeType getType();
+ ExchangeType<T> getType();
String getTypeName();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
index a8839d2dfd..de30b7a244 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
@@ -21,8 +21,10 @@
package org.apache.qpid.server.exchange;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import java.util.Collection;
+import java.util.Map;
import java.util.UUID;
@@ -31,13 +33,8 @@ public interface ExchangeFactory
Collection<ExchangeType<? extends Exchange>> getRegisteredTypes();
- Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes();
+ Exchange createExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException;
- Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete)
- throws AMQUnknownExchangeType;
-
- Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQUnknownExchangeType;
- Exchange restoreExchange(UUID id, String exchange, String type, boolean autoDelete)
- throws AMQUnknownExchangeType;
+ Exchange restoreExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
deleted file mode 100644
index 1443074e18..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-
-public class ExchangeInitialiser
-{
- public void initialise(ExchangeFactory factory, ExchangeRegistry registry, DurableConfigurationStore store)
- {
- for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes())
- {
- define (registry, factory, type.getDefaultExchangeName(), type.getType(), store);
- }
-
- }
-
- private void define(ExchangeRegistry r, ExchangeFactory f,
- String name, String type, DurableConfigurationStore store)
- {
- try
- {
- if(r.getExchange(name)== null)
- {
- Exchange exchange = f.createExchange(name, type, true, false);
- r.registerExchange(exchange);
- if(exchange.isDurable())
- {
- DurableConfigurationStoreHelper.createExchange(store, exchange);
- }
- }
- }
- catch (AMQUnknownExchangeType e)
- {
- throw new ServerScopedRuntimeException("Unknown exchange type while attempting to initialise exchanges - " +
- "this is because necessary jar files are not on the classpath", e);
- }
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index fc1a5ea3de..a4d73ed4c4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -37,10 +37,12 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
-public class FanoutExchange extends AbstractExchange
+public class FanoutExchange extends AbstractExchange<FanoutExchange>
{
private static final Logger _logger = Logger.getLogger(FanoutExchange.class);
@@ -64,9 +66,16 @@ public class FanoutExchange extends AbstractExchange
public static final ExchangeType<FanoutExchange> TYPE = new FanoutExchangeType();
- public FanoutExchange()
+ public FanoutExchange(final VirtualHost vhost,
+ final Map<String, Object> attributes) throws UnknownExchangeException
{
- super(TYPE);
+ super(vhost, attributes);
+ }
+
+ @Override
+ public ExchangeType<FanoutExchange> getType()
+ {
+ return TYPE;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
index 5e1f6784bd..25e6afeeca 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.Map;
import java.util.UUID;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class FanoutExchangeType implements ExchangeType<FanoutExchange>
@@ -34,12 +36,11 @@ public class FanoutExchangeType implements ExchangeType<FanoutExchange>
return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
}
- public FanoutExchange newInstance(UUID id, VirtualHost host, String name,
- boolean durable, boolean autoDelete)
+ @Override
+ public FanoutExchange newInstance(final VirtualHost virtualHost, final Map<String, Object> attributes)
+ throws UnknownExchangeException
{
- FanoutExchange exch = new FanoutExchange();
- exch.initialise(id, host, name, durable, autoDelete);
- return exch;
+ return new FanoutExchange(virtualHost, attributes);
}
public String getDefaultExchangeName()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index a8b0ae601c..6017e15446 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -29,6 +29,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.LinkedHashSet;
@@ -64,7 +66,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
* amq.match - pub/sub on field content/value
* </pre>
*/
-public class HeadersExchange extends AbstractExchange
+public class HeadersExchange extends AbstractExchange<HeadersExchange>
{
private static final Logger _logger = Logger.getLogger(HeadersExchange.class);
@@ -78,9 +80,16 @@ public class HeadersExchange extends AbstractExchange
public static final ExchangeType<HeadersExchange> TYPE = new HeadersExchangeType();
- public HeadersExchange()
+ public HeadersExchange(final VirtualHost vhost,
+ final Map<String, Object> attributes) throws UnknownExchangeException
{
- super(TYPE);
+ super(vhost, attributes);
+ }
+
+ @Override
+ public ExchangeType<HeadersExchange> getType()
+ {
+ return TYPE;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
index 19b830b6b2..3afc6f71c1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.Map;
import java.util.UUID;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class HeadersExchangeType implements ExchangeType<HeadersExchange>
@@ -34,13 +36,11 @@ public class HeadersExchangeType implements ExchangeType<HeadersExchange>
return ExchangeDefaults.HEADERS_EXCHANGE_CLASS;
}
- public HeadersExchange newInstance(UUID id, VirtualHost host, String name, boolean durable,
- boolean autoDelete)
+ @Override
+ public HeadersExchange newInstance(final VirtualHost virtualHost, final Map<String, Object> attributes)
+ throws UnknownExchangeException
{
- HeadersExchange exch = new HeadersExchange();
-
- exch.initialise(id, host, name, durable, autoDelete);
- return exch;
+ return new HeadersExchange(virtualHost, attributes);
}
public String getDefaultExchangeName()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 0389638d4f..2efeb4f2ff 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -42,8 +42,10 @@ import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
-public class TopicExchange extends AbstractExchange
+public class TopicExchange extends AbstractExchange<TopicExchange>
{
public static final ExchangeType<TopicExchange> TYPE = new TopicExchangeType();
@@ -57,9 +59,15 @@ public class TopicExchange extends AbstractExchange
private final Map<Binding, Map<String,Object>> _bindings = new HashMap<Binding, Map<String,Object>>();
- public TopicExchange()
+ public TopicExchange(final VirtualHost vhost, final Map attributes) throws UnknownExchangeException
{
- super(TYPE);
+ super(vhost, attributes);
+ }
+
+ @Override
+ public ExchangeType<TopicExchange> getType()
+ {
+ return TYPE;
}
protected synchronized void registerQueue(final Binding binding) throws AMQInvalidArgumentException
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
index f76d556ded..27778ff502 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.Map;
import java.util.UUID;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class TopicExchangeType implements ExchangeType<TopicExchange>
@@ -34,14 +36,11 @@ public class TopicExchangeType implements ExchangeType<TopicExchange>
return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
}
- public TopicExchange newInstance(UUID id, VirtualHost host,
- String name,
- boolean durable,
- boolean autoDelete)
+ @Override
+ public TopicExchange newInstance(final VirtualHost virtualHost, final Map<String, Object> attributes)
+ throws UnknownExchangeException
{
- TopicExchange exch = new TopicExchange();
- exch.initialise(id, host, name, durable, autoDelete);
- return exch;
+ return new TopicExchange(virtualHost, attributes);
}
public String getDefaultExchangeName()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index d3a42bddc4..4c694657a8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -73,6 +73,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
@@ -348,12 +349,17 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
}
}
- org.apache.qpid.server.exchange.Exchange exchange = _virtualHost.createExchange(null,
- name,
- type,
- durable,
- lifetime != null && lifetime != LifetimePolicy.PERMANENT,
- alternateExchange);
+ Map<String,Object> attributes1 = new HashMap<String, Object>();
+
+ attributes1.put(ID, null);
+ attributes1.put(NAME, name);
+ attributes1.put(Exchange.TYPE, type);
+ attributes1.put(Exchange.DURABLE, durable);
+ attributes1.put(Exchange.LIFETIME_POLICY,
+ lifetime != null && lifetime != LifetimePolicy.PERMANENT
+ ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ attributes1.put(Exchange.ALTERNATE_EXCHANGE, alternateExchange);
+ org.apache.qpid.server.exchange.Exchange exchange = _virtualHost.createExchange(attributes1);
synchronized (_exchangeAdapters)
{
return _exchangeAdapters.get(exchange);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
index 910dfbf33d..a1d49aa49a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
@@ -20,17 +20,18 @@
*/
package org.apache.qpid.server.plugin;
+import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public interface ExchangeType<T extends Exchange> extends Pluggable
{
public String getType();
- public T newInstance(UUID id, VirtualHost host, String name,
- boolean durable, boolean autoDelete);
+ public T newInstance(final VirtualHost virtualHost, Map<String, Object> attributes) throws UnknownExchangeException;
public String getDefaultExchangeName();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 11d2bccb1e..42052ba3be 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
@@ -187,10 +188,16 @@ public class AMQQueueFactory implements QueueFactory
try
{
- dlExchange = _virtualHost.createExchange(dlExchangeId,
- dlExchangeName,
- ExchangeDefaults.FANOUT_EXCHANGE_CLASS,
- true, false, null);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, dlExchangeId);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME, dlExchangeName);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ dlExchange = _virtualHost.createExchange(attributes);
}
catch(ExchangeExistsException e)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index 15a5513e11..ec3b6f69fb 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -132,7 +132,8 @@ public class AutoCommitTransaction implements ServerTransaction
}
postTransactionAction.postCommit();
postTransactionAction = null;
- }finally
+ }
+ finally
{
rollbackIfNecessary(postTransactionAction, txn);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 190226f33c..b5fdad027d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -349,8 +349,16 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
boolean autodelete = exchangeConfiguration.getAutoDelete();
try
{
- Exchange newExchange = createExchange(null, exchangeConfiguration.getName(),
- exchangeConfiguration.getType(), durable, autodelete, null);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME, exchangeConfiguration.getName());
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE, exchangeConfiguration.getType());
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable);
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ autodelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ Exchange newExchange = createExchange(attributes);
}
catch(ExchangeExistsException e)
{
@@ -620,15 +628,15 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
@Override
- public Exchange createExchange(UUID id,
- String name,
- String type,
- boolean durable,
- boolean autoDelete,
- String alternateExchangeName)
+ public Exchange createExchange(Map<String,Object> attributes)
throws ExchangeExistsException, ReservedExchangeNameException,
UnknownExchangeException, AMQUnknownExchangeType
{
+ String name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributes);
+ boolean durable =
+ MapValueConverter.getBooleanAttribute(org.apache.qpid.server.model.Exchange.DURABLE, attributes);
+
+
synchronized (_exchangeRegistry)
{
Exchange existing;
@@ -641,28 +649,16 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
throw new ReservedExchangeNameException(name);
}
- Exchange alternateExchange;
- if(alternateExchangeName != null)
+ if(attributes.get(org.apache.qpid.server.model.Exchange.ID) == null)
{
- alternateExchange = _exchangeRegistry.getExchange(alternateExchangeName);
- if(alternateExchange == null)
- {
- throw new UnknownExchangeException(alternateExchangeName);
- }
- }
- else
- {
- alternateExchange = null;
+ attributes = new LinkedHashMap<String, Object>(attributes);
+ attributes.put(org.apache.qpid.server.model.Exchange.ID,
+ UUIDGenerator.generateExchangeUUID(name, getName()));
}
- if(id == null)
- {
- id = UUIDGenerator.generateExchangeUUID(name, getName());
- }
+ Exchange exchange = _exchangeFactory.createExchange(attributes);
- Exchange exchange = _exchangeFactory.createExchange(id, name, type, durable, autoDelete);
- exchange.setAlternateExchange(alternateExchange);
_exchangeRegistry.registerExchange(exchange);
if(durable)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
index ce91efacc3..9857e7da72 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
@@ -65,10 +66,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<
final Map<String, Object> attributeMap)
{
String exchangeName = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.NAME);
- String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE);
String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY);
- boolean autoDelete = lifeTimePolicy == null
- || LifetimePolicy.valueOf(lifeTimePolicy) != LifetimePolicy.PERMANENT;
try
{
_exchange = _exchangeRegistry.getExchange(id);
@@ -78,7 +76,10 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<
}
if (_exchange == null)
{
- _exchange = _exchangeFactory.restoreExchange(id, exchangeName, exchangeType, autoDelete);
+ Map<String,Object> attributesWithId = new HashMap<String,Object>(attributeMap);
+ attributesWithId.put(org.apache.qpid.server.model.Exchange.ID,id);
+ attributesWithId.put(org.apache.qpid.server.model.Exchange.DURABLE,true);
+ _exchange = _exchangeFactory.restoreExchange(attributesWithId);
_exchangeRegistry.registerExchange(_exchange);
}
}
@@ -87,6 +88,11 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<
throw new ServerScopedRuntimeException("Unknown exchange type found when attempting to restore " +
"exchanges, check classpath", e);
}
+ catch (UnknownExchangeException e)
+ {
+ throw new ServerScopedRuntimeException("Unknown alternate exchange type found when attempting to restore " +
+ "exchanges: ", e);
+ }
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index ab5a406cff..304001cdd0 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -61,13 +61,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
AMQQueue createQueue(Map<String, Object> arguments) throws QueueExistsException;
-
- Exchange createExchange(UUID id,
- String exchange,
- String type,
- boolean durable,
- boolean autoDelete,
- String alternateExchange)
+ Exchange createExchange(Map<String,Object> attributes)
throws ExchangeExistsException, ReservedExchangeNameException,
UnknownExchangeException, AMQUnknownExchangeType;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
index c5ff2a8900..276e098efe 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -177,8 +178,7 @@ public class DefaultExchangeFactoryTest extends QpidTestCase
}
@Override
- public Exchange newInstance(UUID id, VirtualHost host, String name, boolean durable,
- boolean autoDelete)
+ public Exchange newInstance(VirtualHost host, Map<String,Object> attributes)
{
return null;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index f42c22c753..014279ddf0 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -37,9 +38,12 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -49,16 +53,18 @@ public class FanoutExchangeTest extends TestCase
private FanoutExchange _exchange;
private VirtualHost _virtualHost;
- public void setUp()
+ public void setUp() throws UnknownExchangeException
{
CurrentActor.setDefault(mock(LogActor.class));
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Exchange.ID, UUID.randomUUID());
+ attributes.put(Exchange.NAME, "test");
+ attributes.put(Exchange.DURABLE, false);
- _exchange = new FanoutExchange();
_virtualHost = mock(VirtualHost.class);
SecurityManager securityManager = mock(SecurityManager.class);
when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
-
- _exchange.initialise(UUID.randomUUID(), _virtualHost, "test", false, false);
+ _exchange = new FanoutExchange(_virtualHost, attributes);
}
public void testIsBoundStringMapAMQQueueWhenQueueIsNull()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 11342ee0ae..58ba6c9140 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -35,6 +35,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityManager;
@@ -58,12 +60,15 @@ public class HeadersExchangeTest extends TestCase
super.setUp();
CurrentActor.setDefault(mock(LogActor.class));
- _exchange = new HeadersExchange();
_virtualHost = mock(VirtualHost.class);
SecurityManager securityManager = mock(SecurityManager.class);
when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Exchange.ID, UUID.randomUUID());
+ attributes.put(Exchange.NAME, "test");
+ attributes.put(Exchange.DURABLE, false);
- _exchange.initialise(UUID.randomUUID(), _virtualHost, "test", false, false);
+ _exchange = new HeadersExchange(_virtualHost, attributes);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index c900b72ae5..2faefa1525 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import junit.framework.Assert;
@@ -30,8 +31,8 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -54,8 +55,13 @@ public class TopicExchangeTest extends QpidTestCase
{
super.setUp();
BrokerTestHelper.setUp();
- _exchange = new TopicExchange();
_vhost = BrokerTestHelper.createVirtualHost(getName());
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Exchange.ID, UUID.randomUUID());
+ attributes.put(Exchange.NAME, "test");
+ attributes.put(Exchange.DURABLE, false);
+
+ _exchange = new TopicExchange(_vhost, attributes);
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 57f7cf9729..4e517609bc 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.security.*;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
import org.mockito.ArgumentCaptor;
@@ -136,19 +137,18 @@ public class AMQQueueFactoryTest extends QpidTestCase
private void mockExchangeCreation() throws Exception
{
- final ArgumentCaptor<UUID> idCapture = ArgumentCaptor.forClass(UUID.class);
- final ArgumentCaptor<String> exchangeNameCapture = ArgumentCaptor.forClass(String.class);
- final ArgumentCaptor<String> type = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<Map> attributes = ArgumentCaptor.forClass(Map.class);
+
- when(_virtualHost.createExchange(idCapture.capture(), exchangeNameCapture.capture(), type.capture(),
- anyBoolean(), anyBoolean(), anyString())).then(
+ when(_virtualHost.createExchange(attributes.capture())).then(
new Answer<Exchange>()
{
@Override
public Exchange answer(InvocationOnMock invocation) throws Throwable
{
- final String name = exchangeNameCapture.getValue();
- final UUID id = idCapture.getValue();
+ Map attributeValues = attributes.getValue();
+ final String name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributeValues);
+ final UUID id = MapValueConverter.getUUIDAttribute(org.apache.qpid.server.model.Exchange.ID, attributeValues);
final Exchange exchange = mock(Exchange.class);
ExchangeType exType = mock(ExchangeType.class);
@@ -157,7 +157,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
when(exchange.getId()).thenReturn(id);
when(exchange.getType()).thenReturn(exType);
- final String typeName = type.getValue();
+ final String typeName = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributeValues);
when(exType.getType()).thenReturn(typeName);
when(exchange.getTypeName()).thenReturn(typeName);
@@ -166,7 +166,8 @@ public class AMQQueueFactoryTest extends QpidTestCase
final ArgumentCaptor<AMQQueue> queue = ArgumentCaptor.forClass(AMQQueue.class);
- when(exchange.addBinding(anyString(),queue.capture(),anyMap())).then(new Answer<Boolean>() {
+ when(exchange.addBinding(anyString(), queue.capture(), anyMap())).then(new Answer<Boolean>()
+ {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable
@@ -179,7 +180,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
return exchange;
}
}
- );
+ );
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index 8f57c52299..1a7dfe9ca0 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.util;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.only;
import static org.mockito.Mockito.when;
import java.net.SocketAddress;
@@ -177,7 +178,12 @@ public class BrokerTestHelper
when(virtualHost.getName()).thenReturn(hostName);
when(virtualHost.getSecurityManager()).thenReturn(securityManager);
DefaultExchangeFactory factory = new DefaultExchangeFactory(virtualHost);
- return factory.createExchange("amp.direct", "direct", false, false);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, UUIDGenerator.generateExchangeUUID("amp.direct", virtualHost.getName()));
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME, "amq.direct");
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE, "direct");
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, false);
+ return factory.createExchange(attributes);
}
public static AMQQueue createQueue(String queueName, VirtualHost virtualHost)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 1e25aac197..6801c50722 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -269,10 +269,27 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
final Exchange customExchange = mock(Exchange.class);
- when(_exchangeFactory.restoreExchange(eq(customExchangeId),
- eq(CUSTOM_EXCHANGE_NAME),
- eq(HeadersExchange.TYPE.getType()),
- anyBoolean())).thenReturn(customExchange);
+ final ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class);
+ when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<Exchange>()
+ {
+ @Override
+ public Exchange answer(final InvocationOnMock invocation) throws Throwable
+ {
+ Map arguments = attributesCaptor.getValue();
+ if(CUSTOM_EXCHANGE_NAME.equals(arguments.get(org.apache.qpid.server.model.Exchange.NAME))
+ && HeadersExchange.TYPE.getType().equals(arguments.get(org.apache.qpid.server.model.Exchange.TYPE))
+ && customExchangeId.equals(arguments.get(org.apache.qpid.server.model.Exchange.ID)))
+ {
+ return customExchange;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ });
+
+
final ConfiguredObjectRecord[] expected = {
new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
@@ -385,10 +402,26 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
when(customExchange.getId()).thenReturn(exchangeId);
when(customExchange.getName()).thenReturn(CUSTOM_EXCHANGE_NAME);
- when(_exchangeFactory.restoreExchange(eq(exchangeId),
- eq(CUSTOM_EXCHANGE_NAME),
- eq(HeadersExchange.TYPE.getType()),
- anyBoolean())).thenReturn(customExchange);
+ final ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class);
+
+ when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<Exchange>()
+ {
+ @Override
+ public Exchange answer(final InvocationOnMock invocation) throws Throwable
+ {
+ Map arguments = attributesCaptor.getValue();
+ if(CUSTOM_EXCHANGE_NAME.equals(arguments.get(org.apache.qpid.server.model.Exchange.NAME))
+ && HeadersExchange.TYPE.getType().equals(arguments.get(org.apache.qpid.server.model.Exchange.TYPE))
+ && exchangeId.equals(arguments.get(org.apache.qpid.server.model.Exchange.ID)))
+ {
+ return customExchange;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ });
_durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index e68d5bb2ba..8c6725ea14 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -158,12 +158,7 @@ public class MockVirtualHost implements VirtualHost
}
@Override
- public Exchange createExchange(UUID id,
- String exchange,
- String type,
- boolean durable,
- boolean autoDelete,
- String alternateExchange)
+ public Exchange createExchange(Map<String,Object> attributes)
{
return null;
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 5d36aa5321..c03daf20b3 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import java.security.AccessControlException;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
@@ -61,13 +62,7 @@ import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.ExchangeExistsException;
-import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
-import org.apache.qpid.server.virtualhost.RequiredExchangeException;
-import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
-import org.apache.qpid.server.virtualhost.UnknownExchangeException;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.QueueExistsException;
+import org.apache.qpid.server.virtualhost.*;
import org.apache.qpid.transport.*;
import java.nio.ByteBuffer;
@@ -712,12 +707,16 @@ public class ServerSessionDelegate extends SessionDelegate
try
{
- virtualHost.createExchange(null,
- method.getExchange(),
- method.getType(),
- method.getDurable(),
- method.getAutoDelete(),
- method.getAlternateExchange());
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange());
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType());
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable());
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange());
+ virtualHost.createExchange(attributes);
}
catch(ReservedExchangeNameException e)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
index 87622b88e7..9446f53188 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
@@ -30,17 +30,21 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.security.AccessControlException;
+import java.util.HashMap;
+import java.util.Map;
public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
{
@@ -95,12 +99,18 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
try
{
- exchange = virtualHost.createExchange(null,
- exchangeName == null ? null : exchangeName.intern().toString(),
- body.getType() == null ? null : body.getType().intern().toString(),
- body.getDurable(),
- body.getAutoDelete(),
- null);
+ String name = exchangeName == null ? null : exchangeName.intern().toString();
+ String type = body.getType() == null ? null : body.getType().intern().toString();
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ exchange = virtualHost.createExchange(attributes);
}
catch(ReservedExchangeNameException e)
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index e19b15461a..c44216afac 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -56,6 +56,7 @@ import org.apache.qpid.server.queue.StandardQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -726,7 +727,15 @@ public class MessageStoreTest extends QpidTestCase
{
Exchange exchange = null;
- exchange = getVirtualHost().createExchange(null, name, type.getType(), durable, false, null);
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME, name);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType());
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable);
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ exchange = getVirtualHost().createExchange(attributes);
return exchange;
}