diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-21 20:15:20 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-21 20:15:20 +0000 |
| commit | 07bcf1792bd28910baaa870f590e2d36b303aa1b (patch) | |
| tree | e31117964a590508e9ce8a9a0b60fc8859b88087 /qpid/java | |
| parent | 7744a78e8f5c120c4eac13b0fa2f780de542ca26 (diff) | |
| download | qpid-python-07bcf1792bd28910baaa870f590e2d36b303aa1b.tar.gz | |
QPID-5577 : [Java Broker] create exchanges using an attribute map rather than multiple construction parameters
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1570697 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
33 files changed, 385 insertions, 302 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index a6aad93b27..21d8da0b94 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -42,6 +43,8 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -55,7 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -public abstract class AbstractExchange implements Exchange +public abstract class AbstractExchange<T extends Exchange> implements Exchange<T> { private static final Logger _logger = Logger.getLogger(AbstractExchange.class); private String _name; @@ -79,7 +82,6 @@ public abstract class AbstractExchange implements Exchange private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>(); private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>(); - private final ExchangeType<? extends Exchange> _type; private UUID _id; private final AtomicInteger _bindingCountHigh = new AtomicInteger(); private final AtomicLong _receivedMessageCount = new AtomicLong(); @@ -94,36 +96,63 @@ public abstract class AbstractExchange implements Exchange //TODO : persist creation time private long _createTime = System.currentTimeMillis(); - public AbstractExchange(final ExchangeType<? extends Exchange> type) + public AbstractExchange(VirtualHost vhost, Map<String, Object> attributes) throws UnknownExchangeException { - _type = type; - } + _virtualHost = vhost; - @Override - public String getTypeName() - { - return _type.getType(); - } - - public void initialise(UUID id, - VirtualHost host, - String name, - boolean durable, - boolean autoDelete) - { - _virtualHost = host; - _name = name; - _durable = durable; - _autoDelete = autoDelete; - - _id = id; + _id = MapValueConverter.getUUIDAttribute(org.apache.qpid.server.model.Exchange.ID, attributes); + _name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributes); + _durable = MapValueConverter.getBooleanAttribute(org.apache.qpid.server.model.Exchange.DURABLE, attributes); + _autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; _logSubject = new ExchangeLogSubject(this, this.getVirtualHost()); // check ACL - host.getSecurityManager().authoriseCreateExchange(this); + _virtualHost.getSecurityManager().authoriseCreateExchange(this); + + Object alternateExchangeAttr = attributes.get(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE); + if(alternateExchangeAttr != null) + { + if(alternateExchangeAttr instanceof Exchange) + { + _alternateExchange = (Exchange) alternateExchangeAttr; + } + else if(alternateExchangeAttr instanceof UUID) + { + _alternateExchange = vhost.getExchange((UUID)alternateExchangeAttr); + } + else if(alternateExchangeAttr instanceof String) + { + _alternateExchange = vhost.getExchange((String)alternateExchangeAttr); + if(_alternateExchange == null) + { + try + { + UUID altExcAsUUID = UUID.fromString((String)alternateExchangeAttr); + _alternateExchange = vhost.getExchange(altExcAsUUID); + } + catch (IllegalArgumentException e) + { + // ignore - we'll throw an exception shortly because _alternateExchange will be null + } + } + } + if(_alternateExchange == null) + { + throw new UnknownExchangeException(alternateExchangeAttr.toString()); + } + + } // Log Exchange creation - CurrentActor.get().message(ExchangeMessages.CREATED(getType().getType(), name, durable)); + CurrentActor.get().message(ExchangeMessages.CREATED(getType().getType(), _name, _durable)); + } + + public abstract ExchangeType<T> getType(); + + @Override + public String getTypeName() + { + return getType().getType(); } public boolean isDurable() @@ -365,11 +394,6 @@ public abstract class AbstractExchange implements Exchange return _name.toString(); } - public ExchangeType getType() - { - return _type; - } - public Map<String, Object> getArguments() { return Collections.emptyMap(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index cc6131f6b5..3c001f8b31 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -47,7 +47,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; -public class DefaultExchange implements Exchange +public class DefaultExchange implements Exchange<DirectExchange> { private final QueueRegistry _queueRegistry; @@ -56,24 +56,13 @@ public class DefaultExchange implements Exchange private static final Logger _logger = Logger.getLogger(DefaultExchange.class); private final AtomicBoolean _closed = new AtomicBoolean(); - private LogSubject _logSubject; private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>(); - public DefaultExchange(QueueRegistry queueRegistry) + public DefaultExchange(VirtualHost virtualHost, QueueRegistry queueRegistry, UUID id) { + _virtualHost = virtualHost; _queueRegistry = queueRegistry; - } - - - @Override - public void initialise(UUID id, - VirtualHost host, - String name, - boolean durable, - boolean autoDelete) - { _id = id; - _virtualHost = host; } @Override @@ -83,7 +72,7 @@ public class DefaultExchange implements Exchange } @Override - public ExchangeType getType() + public ExchangeType<DirectExchange> getType() { return DirectExchange.TYPE; } @@ -195,12 +184,7 @@ public class DefaultExchange implements Exchange @Override public void close() { - if(_closed.compareAndSet(false,true)) - { - - CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED()); - - } + throw new AccessControlException("Cannot close the default exchange"); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 80aa4fa49c..daf85e6d55 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -23,12 +23,13 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.plugin.QpidServiceLoader; +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -96,41 +97,24 @@ public class DefaultExchangeFactory implements ExchangeFactory return _exchangeClassMap.values(); } - public Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes() - { - Collection<ExchangeType<? extends Exchange>> publicTypes = - new ArrayList<ExchangeType<? extends Exchange>>(); - publicTypes.addAll(_exchangeClassMap.values()); - - return publicTypes; - } - - public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) - throws AMQUnknownExchangeType - { - - UUID id = UUIDGenerator.generateExchangeUUID(exchange, _host.getName()); - return createExchange(id, exchange, type, durable, autoDelete); - } - - public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) - throws AMQUnknownExchangeType + @Override + public Exchange createExchange(final Map<String, Object> attributes) + throws AMQUnknownExchangeType, UnknownExchangeException { - + String type = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributes); ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type); if (exchType == null) { throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null); } - - Exchange e = exchType.newInstance(id, _host, exchange, durable, autoDelete); - return e; + return exchType.newInstance(_host, attributes); } @Override - public Exchange restoreExchange(UUID id, String exchange, String type, boolean autoDelete) - throws AMQUnknownExchangeType + public Exchange restoreExchange(Map<String,Object> attributes) + throws AMQUnknownExchangeType, UnknownExchangeException { - return createExchange(id, exchange, type, true, autoDelete); + return createExchange(attributes); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index ffd515e385..1dfa23f657 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -26,11 +26,16 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.DurableConfigurationStoreHelper; +import org.apache.qpid.server.util.ServerScopedRuntimeException; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -60,17 +65,58 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void initialise(ExchangeFactory exchangeFactory) { //create 'standard' exchanges: - new ExchangeInitialiser().initialise(exchangeFactory, this, getDurableConfigurationStore()); + initialiseExchanges(exchangeFactory, getDurableConfigurationStore()); - _defaultExchange = new DefaultExchange(_queueRegistry); + _defaultExchange = + new DefaultExchange(_host, _queueRegistry, + UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME, + _host.getName())); - UUID defaultExchangeId = - UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME, _host.getName()); - _defaultExchange.initialise(defaultExchangeId, _host, ExchangeDefaults.DEFAULT_EXCHANGE_NAME,false, false); + } + + private void initialiseExchanges(ExchangeFactory factory, DurableConfigurationStore store) + { + for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes()) + { + defineExchange(factory, type.getDefaultExchangeName(), type.getType(), store); + } } + private void defineExchange(ExchangeFactory f, String name, String type, DurableConfigurationStore store) + { + try + { + if(getExchange(name) == null) + { + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put(org.apache.qpid.server.model.Exchange.ID, + UUIDGenerator.generateExchangeUUID(name, _host.getName())); + attributes.put(org.apache.qpid.server.model.Exchange.NAME, name); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true); + Exchange exchange = f.createExchange(attributes); + registerExchange(exchange); + if(exchange.isDurable()) + { + DurableConfigurationStoreHelper.createExchange(store, exchange); + } + } + } + catch (AMQUnknownExchangeType e) + { + throw new ServerScopedRuntimeException("Unknown exchange type while attempting to initialise exchanges - " + + "this is because necessary jar files are not on the classpath", e); + } + catch (UnknownExchangeException e) + { + throw new ServerScopedRuntimeException("Unknown alternate exchange type while attempting to initialise " + + "a mandatory exchange which should not have an alternate: '" + + name + "'"); + } + } + public DurableConfigurationStore getDurableConfigurationStore() { return _host.getDurableConfigurationStore(); @@ -89,11 +135,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry } } - public void setDefaultExchange(Exchange exchange) - { - _defaultExchange = exchange; - } - public Exchange getDefaultExchange() { return _defaultExchange; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index e2db7611eb..f9e654f1f2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -35,6 +35,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.Collections; @@ -42,7 +44,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; -public class DirectExchange extends AbstractExchange +public class DirectExchange extends AbstractExchange<DirectExchange> { private static final Logger _logger = Logger.getLogger(DirectExchange.class); @@ -128,9 +130,16 @@ public class DirectExchange extends AbstractExchange public static final ExchangeType<DirectExchange> TYPE = new DirectExchangeType(); - public DirectExchange() + public DirectExchange(final VirtualHost vhost, + final Map<String, Object> attributes) throws UnknownExchangeException { - super(TYPE); + super(vhost, attributes); + } + + @Override + public ExchangeType<DirectExchange> getType() + { + return TYPE; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java index 3a4f7a9a3b..8e7b392fe9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.server.exchange; +import java.util.Map; import java.util.UUID; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; public class DirectExchangeType implements ExchangeType<DirectExchange> @@ -34,14 +36,11 @@ public class DirectExchangeType implements ExchangeType<DirectExchange> return ExchangeDefaults.DIRECT_EXCHANGE_CLASS; } - public DirectExchange newInstance(UUID id, VirtualHost host, - String name, - boolean durable, - boolean autoDelete) + @Override + public DirectExchange newInstance(final VirtualHost virtualHost, + final Map<String, Object> attributes) throws UnknownExchangeException { - DirectExchange exch = new DirectExchange(); - exch.initialise(id, host,name,durable, autoDelete); - return exch; + return new DirectExchange(virtualHost, attributes); } public String getDefaultExchangeName() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 5625a0aca4..7cce4fde88 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -30,16 +30,14 @@ import java.util.Collection; import java.util.Map; import java.util.UUID; -public interface Exchange extends ExchangeReferrer, MessageDestination +public interface Exchange<T extends Exchange> extends ExchangeReferrer, MessageDestination { - void initialise(UUID id, VirtualHost host, String name, boolean durable, boolean autoDelete); - UUID getId(); String getName(); - ExchangeType getType(); + ExchangeType<T> getType(); String getTypeName(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java index a8839d2dfd..de30b7a244 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java @@ -21,8 +21,10 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; import java.util.Collection; +import java.util.Map; import java.util.UUID; @@ -31,13 +33,8 @@ public interface ExchangeFactory Collection<ExchangeType<? extends Exchange>> getRegisteredTypes(); - Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes(); + Exchange createExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException; - Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) - throws AMQUnknownExchangeType; - - Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQUnknownExchangeType; - Exchange restoreExchange(UUID id, String exchange, String type, boolean autoDelete) - throws AMQUnknownExchangeType; + Exchange restoreExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java deleted file mode 100644 index 1443074e18..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.store.DurableConfigurationStoreHelper; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.util.ServerScopedRuntimeException; - -public class ExchangeInitialiser -{ - public void initialise(ExchangeFactory factory, ExchangeRegistry registry, DurableConfigurationStore store) - { - for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes()) - { - define (registry, factory, type.getDefaultExchangeName(), type.getType(), store); - } - - } - - private void define(ExchangeRegistry r, ExchangeFactory f, - String name, String type, DurableConfigurationStore store) - { - try - { - if(r.getExchange(name)== null) - { - Exchange exchange = f.createExchange(name, type, true, false); - r.registerExchange(exchange); - if(exchange.isDurable()) - { - DurableConfigurationStoreHelper.createExchange(store, exchange); - } - } - } - catch (AMQUnknownExchangeType e) - { - throw new ServerScopedRuntimeException("Unknown exchange type while attempting to initialise exchanges - " + - "this is because necessary jar files are not on the classpath", e); - } - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index fc1a5ea3de..a4d73ed4c4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -37,10 +37,12 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; -public class FanoutExchange extends AbstractExchange +public class FanoutExchange extends AbstractExchange<FanoutExchange> { private static final Logger _logger = Logger.getLogger(FanoutExchange.class); @@ -64,9 +66,16 @@ public class FanoutExchange extends AbstractExchange public static final ExchangeType<FanoutExchange> TYPE = new FanoutExchangeType(); - public FanoutExchange() + public FanoutExchange(final VirtualHost vhost, + final Map<String, Object> attributes) throws UnknownExchangeException { - super(TYPE); + super(vhost, attributes); + } + + @Override + public ExchangeType<FanoutExchange> getType() + { + return TYPE; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java index 5e1f6784bd..25e6afeeca 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.server.exchange; +import java.util.Map; import java.util.UUID; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; public class FanoutExchangeType implements ExchangeType<FanoutExchange> @@ -34,12 +36,11 @@ public class FanoutExchangeType implements ExchangeType<FanoutExchange> return ExchangeDefaults.FANOUT_EXCHANGE_CLASS; } - public FanoutExchange newInstance(UUID id, VirtualHost host, String name, - boolean durable, boolean autoDelete) + @Override + public FanoutExchange newInstance(final VirtualHost virtualHost, final Map<String, Object> attributes) + throws UnknownExchangeException { - FanoutExchange exch = new FanoutExchange(); - exch.initialise(id, host, name, durable, autoDelete); - return exch; + return new FanoutExchange(virtualHost, attributes); } public String getDefaultExchangeName() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index a8b0ae601c..6017e15446 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -29,6 +29,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.LinkedHashSet; @@ -64,7 +66,7 @@ import java.util.concurrent.CopyOnWriteArraySet; * amq.match - pub/sub on field content/value * </pre> */ -public class HeadersExchange extends AbstractExchange +public class HeadersExchange extends AbstractExchange<HeadersExchange> { private static final Logger _logger = Logger.getLogger(HeadersExchange.class); @@ -78,9 +80,16 @@ public class HeadersExchange extends AbstractExchange public static final ExchangeType<HeadersExchange> TYPE = new HeadersExchangeType(); - public HeadersExchange() + public HeadersExchange(final VirtualHost vhost, + final Map<String, Object> attributes) throws UnknownExchangeException { - super(TYPE); + super(vhost, attributes); + } + + @Override + public ExchangeType<HeadersExchange> getType() + { + return TYPE; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java index 19b830b6b2..3afc6f71c1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.server.exchange; +import java.util.Map; import java.util.UUID; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; public class HeadersExchangeType implements ExchangeType<HeadersExchange> @@ -34,13 +36,11 @@ public class HeadersExchangeType implements ExchangeType<HeadersExchange> return ExchangeDefaults.HEADERS_EXCHANGE_CLASS; } - public HeadersExchange newInstance(UUID id, VirtualHost host, String name, boolean durable, - boolean autoDelete) + @Override + public HeadersExchange newInstance(final VirtualHost virtualHost, final Map<String, Object> attributes) + throws UnknownExchangeException { - HeadersExchange exch = new HeadersExchange(); - - exch.initialise(id, host, name, durable, autoDelete); - return exch; + return new HeadersExchange(virtualHost, attributes); } public String getDefaultExchangeName() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 0389638d4f..2efeb4f2ff 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -42,8 +42,10 @@ import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; +import org.apache.qpid.server.virtualhost.VirtualHost; -public class TopicExchange extends AbstractExchange +public class TopicExchange extends AbstractExchange<TopicExchange> { public static final ExchangeType<TopicExchange> TYPE = new TopicExchangeType(); @@ -57,9 +59,15 @@ public class TopicExchange extends AbstractExchange private final Map<Binding, Map<String,Object>> _bindings = new HashMap<Binding, Map<String,Object>>(); - public TopicExchange() + public TopicExchange(final VirtualHost vhost, final Map attributes) throws UnknownExchangeException { - super(TYPE); + super(vhost, attributes); + } + + @Override + public ExchangeType<TopicExchange> getType() + { + return TYPE; } protected synchronized void registerQueue(final Binding binding) throws AMQInvalidArgumentException diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java index f76d556ded..27778ff502 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.server.exchange; +import java.util.Map; import java.util.UUID; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; public class TopicExchangeType implements ExchangeType<TopicExchange> @@ -34,14 +36,11 @@ public class TopicExchangeType implements ExchangeType<TopicExchange> return ExchangeDefaults.TOPIC_EXCHANGE_CLASS; } - public TopicExchange newInstance(UUID id, VirtualHost host, - String name, - boolean durable, - boolean autoDelete) + @Override + public TopicExchange newInstance(final VirtualHost virtualHost, final Map<String, Object> attributes) + throws UnknownExchangeException { - TopicExchange exch = new TopicExchange(); - exch.initialise(id, host, name, durable, autoDelete); - return exch; + return new TopicExchange(virtualHost, attributes); } public String getDefaultExchangeName() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index d3a42bddc4..4c694657a8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -73,6 +73,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.util.ServerScopedRuntimeException; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.UnknownExchangeException; @@ -348,12 +349,17 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } } } - org.apache.qpid.server.exchange.Exchange exchange = _virtualHost.createExchange(null, - name, - type, - durable, - lifetime != null && lifetime != LifetimePolicy.PERMANENT, - alternateExchange); + Map<String,Object> attributes1 = new HashMap<String, Object>(); + + attributes1.put(ID, null); + attributes1.put(NAME, name); + attributes1.put(Exchange.TYPE, type); + attributes1.put(Exchange.DURABLE, durable); + attributes1.put(Exchange.LIFETIME_POLICY, + lifetime != null && lifetime != LifetimePolicy.PERMANENT + ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes1.put(Exchange.ALTERNATE_EXCHANGE, alternateExchange); + org.apache.qpid.server.exchange.Exchange exchange = _virtualHost.createExchange(attributes1); synchronized (_exchangeAdapters) { return _exchangeAdapters.get(exchange); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java index 910dfbf33d..a1d49aa49a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java @@ -20,17 +20,18 @@ */ package org.apache.qpid.server.plugin; +import java.util.Map; import java.util.UUID; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; public interface ExchangeType<T extends Exchange> extends Pluggable { public String getType(); - public T newInstance(UUID id, VirtualHost host, String name, - boolean durable, boolean autoDelete); + public T newInstance(final VirtualHost virtualHost, Map<String, Object> attributes) throws UnknownExchangeException; public String getDefaultExchangeName(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 11d2bccb1e..42052ba3be 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.util.ServerScopedRuntimeException; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.UnknownExchangeException; @@ -187,10 +188,16 @@ public class AMQQueueFactory implements QueueFactory try { - dlExchange = _virtualHost.createExchange(dlExchangeId, - dlExchangeName, - ExchangeDefaults.FANOUT_EXCHANGE_CLASS, - true, false, null); + Map<String,Object> attributes = new HashMap<String, Object>(); + + attributes.put(org.apache.qpid.server.model.Exchange.ID, dlExchangeId); + attributes.put(org.apache.qpid.server.model.Exchange.NAME, dlExchangeName); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + dlExchange = _virtualHost.createExchange(attributes); } catch(ExchangeExistsException e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index 15a5513e11..ec3b6f69fb 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -132,7 +132,8 @@ public class AutoCommitTransaction implements ServerTransaction } postTransactionAction.postCommit(); postTransactionAction = null; - }finally + } + finally { rollbackIfNecessary(postTransactionAction, txn); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 190226f33c..b5fdad027d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -349,8 +349,16 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg boolean autodelete = exchangeConfiguration.getAutoDelete(); try { - Exchange newExchange = createExchange(null, exchangeConfiguration.getName(), - exchangeConfiguration.getType(), durable, autodelete, null); + Map<String,Object> attributes = new HashMap<String, Object>(); + + attributes.put(org.apache.qpid.server.model.Exchange.ID, null); + attributes.put(org.apache.qpid.server.model.Exchange.NAME, exchangeConfiguration.getName()); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, exchangeConfiguration.getType()); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + autodelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + Exchange newExchange = createExchange(attributes); } catch(ExchangeExistsException e) { @@ -620,15 +628,15 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } @Override - public Exchange createExchange(UUID id, - String name, - String type, - boolean durable, - boolean autoDelete, - String alternateExchangeName) + public Exchange createExchange(Map<String,Object> attributes) throws ExchangeExistsException, ReservedExchangeNameException, UnknownExchangeException, AMQUnknownExchangeType { + String name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributes); + boolean durable = + MapValueConverter.getBooleanAttribute(org.apache.qpid.server.model.Exchange.DURABLE, attributes); + + synchronized (_exchangeRegistry) { Exchange existing; @@ -641,28 +649,16 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg throw new ReservedExchangeNameException(name); } - Exchange alternateExchange; - if(alternateExchangeName != null) + if(attributes.get(org.apache.qpid.server.model.Exchange.ID) == null) { - alternateExchange = _exchangeRegistry.getExchange(alternateExchangeName); - if(alternateExchange == null) - { - throw new UnknownExchangeException(alternateExchangeName); - } - } - else - { - alternateExchange = null; + attributes = new LinkedHashMap<String, Object>(attributes); + attributes.put(org.apache.qpid.server.model.Exchange.ID, + UUIDGenerator.generateExchangeUUID(name, getName())); } - if(id == null) - { - id = UUIDGenerator.generateExchangeUUID(name, getName()); - } + Exchange exchange = _exchangeFactory.createExchange(attributes); - Exchange exchange = _exchangeFactory.createExchange(id, name, type, durable, autoDelete); - exchange.setAlternateExchange(alternateExchange); _exchangeRegistry.registerExchange(exchange); if(durable) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java index ce91efacc3..9857e7da72 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; @@ -65,10 +66,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer< final Map<String, Object> attributeMap) { String exchangeName = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.NAME); - String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE); String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY); - boolean autoDelete = lifeTimePolicy == null - || LifetimePolicy.valueOf(lifeTimePolicy) != LifetimePolicy.PERMANENT; try { _exchange = _exchangeRegistry.getExchange(id); @@ -78,7 +76,10 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer< } if (_exchange == null) { - _exchange = _exchangeFactory.restoreExchange(id, exchangeName, exchangeType, autoDelete); + Map<String,Object> attributesWithId = new HashMap<String,Object>(attributeMap); + attributesWithId.put(org.apache.qpid.server.model.Exchange.ID,id); + attributesWithId.put(org.apache.qpid.server.model.Exchange.DURABLE,true); + _exchange = _exchangeFactory.restoreExchange(attributesWithId); _exchangeRegistry.registerExchange(_exchange); } } @@ -87,6 +88,11 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer< throw new ServerScopedRuntimeException("Unknown exchange type found when attempting to restore " + "exchanges, check classpath", e); } + catch (UnknownExchangeException e) + { + throw new ServerScopedRuntimeException("Unknown alternate exchange type found when attempting to restore " + + "exchanges: ", e); + } } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index ab5a406cff..304001cdd0 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -61,13 +61,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable AMQQueue createQueue(Map<String, Object> arguments) throws QueueExistsException; - - Exchange createExchange(UUID id, - String exchange, - String type, - boolean durable, - boolean autoDelete, - String alternateExchange) + Exchange createExchange(Map<String,Object> attributes) throws ExchangeExistsException, ReservedExchangeNameException, UnknownExchangeException, AMQUnknownExchangeType; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java index c5ff2a8900..276e098efe 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.qpid.server.plugin.ExchangeType; @@ -177,8 +178,7 @@ public class DefaultExchangeFactoryTest extends QpidTestCase } @Override - public Exchange newInstance(UUID id, VirtualHost host, String name, boolean durable, - boolean autoDelete) + public Exchange newInstance(VirtualHost host, Map<String,Object> attributes) { return null; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index f42c22c753..014279ddf0 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -37,9 +38,12 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.*; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -49,16 +53,18 @@ public class FanoutExchangeTest extends TestCase private FanoutExchange _exchange; private VirtualHost _virtualHost; - public void setUp() + public void setUp() throws UnknownExchangeException { CurrentActor.setDefault(mock(LogActor.class)); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Exchange.ID, UUID.randomUUID()); + attributes.put(Exchange.NAME, "test"); + attributes.put(Exchange.DURABLE, false); - _exchange = new FanoutExchange(); _virtualHost = mock(VirtualHost.class); SecurityManager securityManager = mock(SecurityManager.class); when(_virtualHost.getSecurityManager()).thenReturn(securityManager); - - _exchange.initialise(UUID.randomUUID(), _virtualHost, "test", false, false); + _exchange = new FanoutExchange(_virtualHost, attributes); } public void testIsBoundStringMapAMQQueueWhenQueueIsNull() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 11342ee0ae..58ba6c9140 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -35,6 +35,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.*; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.security.SecurityManager; @@ -58,12 +60,15 @@ public class HeadersExchangeTest extends TestCase super.setUp(); CurrentActor.setDefault(mock(LogActor.class)); - _exchange = new HeadersExchange(); _virtualHost = mock(VirtualHost.class); SecurityManager securityManager = mock(SecurityManager.class); when(_virtualHost.getSecurityManager()).thenReturn(securityManager); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Exchange.ID, UUID.randomUUID()); + attributes.put(Exchange.NAME, "test"); + attributes.put(Exchange.DURABLE, false); - _exchange.initialise(UUID.randomUUID(), _virtualHost, "test", false, false); + _exchange = new HeadersExchange(_virtualHost, attributes); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index c900b72ae5..2faefa1525 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import junit.framework.Assert; @@ -30,8 +31,8 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.*; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.util.BrokerTestHelper; @@ -54,8 +55,13 @@ public class TopicExchangeTest extends QpidTestCase { super.setUp(); BrokerTestHelper.setUp(); - _exchange = new TopicExchange(); _vhost = BrokerTestHelper.createVirtualHost(getName()); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(Exchange.ID, UUID.randomUUID()); + attributes.put(Exchange.NAME, "test"); + attributes.put(Exchange.DURABLE, false); + + _exchange = new TopicExchange(_vhost, attributes); } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 57f7cf9729..4e517609bc 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.security.*; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; import org.mockito.ArgumentCaptor; @@ -136,19 +137,18 @@ public class AMQQueueFactoryTest extends QpidTestCase private void mockExchangeCreation() throws Exception { - final ArgumentCaptor<UUID> idCapture = ArgumentCaptor.forClass(UUID.class); - final ArgumentCaptor<String> exchangeNameCapture = ArgumentCaptor.forClass(String.class); - final ArgumentCaptor<String> type = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<Map> attributes = ArgumentCaptor.forClass(Map.class); + - when(_virtualHost.createExchange(idCapture.capture(), exchangeNameCapture.capture(), type.capture(), - anyBoolean(), anyBoolean(), anyString())).then( + when(_virtualHost.createExchange(attributes.capture())).then( new Answer<Exchange>() { @Override public Exchange answer(InvocationOnMock invocation) throws Throwable { - final String name = exchangeNameCapture.getValue(); - final UUID id = idCapture.getValue(); + Map attributeValues = attributes.getValue(); + final String name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributeValues); + final UUID id = MapValueConverter.getUUIDAttribute(org.apache.qpid.server.model.Exchange.ID, attributeValues); final Exchange exchange = mock(Exchange.class); ExchangeType exType = mock(ExchangeType.class); @@ -157,7 +157,7 @@ public class AMQQueueFactoryTest extends QpidTestCase when(exchange.getId()).thenReturn(id); when(exchange.getType()).thenReturn(exType); - final String typeName = type.getValue(); + final String typeName = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributeValues); when(exType.getType()).thenReturn(typeName); when(exchange.getTypeName()).thenReturn(typeName); @@ -166,7 +166,8 @@ public class AMQQueueFactoryTest extends QpidTestCase final ArgumentCaptor<AMQQueue> queue = ArgumentCaptor.forClass(AMQQueue.class); - when(exchange.addBinding(anyString(),queue.capture(),anyMap())).then(new Answer<Boolean>() { + when(exchange.addBinding(anyString(), queue.capture(), anyMap())).then(new Answer<Boolean>() + { @Override public Boolean answer(InvocationOnMock invocation) throws Throwable @@ -179,7 +180,7 @@ public class AMQQueueFactoryTest extends QpidTestCase return exchange; } } - ); + ); } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 8f57c52299..1a7dfe9ca0 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.util; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.only; import static org.mockito.Mockito.when; import java.net.SocketAddress; @@ -177,7 +178,12 @@ public class BrokerTestHelper when(virtualHost.getName()).thenReturn(hostName); when(virtualHost.getSecurityManager()).thenReturn(securityManager); DefaultExchangeFactory factory = new DefaultExchangeFactory(virtualHost); - return factory.createExchange("amp.direct", "direct", false, false); + Map<String,Object> attributes = new HashMap<String, Object>(); + attributes.put(org.apache.qpid.server.model.Exchange.ID, UUIDGenerator.generateExchangeUUID("amp.direct", virtualHost.getName())); + attributes.put(org.apache.qpid.server.model.Exchange.NAME, "amq.direct"); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, "direct"); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, false); + return factory.createExchange(attributes); } public static AMQQueue createQueue(String queueName, VirtualHost virtualHost) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 1e25aac197..6801c50722 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -269,10 +269,27 @@ public class DurableConfigurationRecovererTest extends QpidTestCase final Exchange customExchange = mock(Exchange.class); - when(_exchangeFactory.restoreExchange(eq(customExchangeId), - eq(CUSTOM_EXCHANGE_NAME), - eq(HeadersExchange.TYPE.getType()), - anyBoolean())).thenReturn(customExchange); + final ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class); + when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<Exchange>() + { + @Override + public Exchange answer(final InvocationOnMock invocation) throws Throwable + { + Map arguments = attributesCaptor.getValue(); + if(CUSTOM_EXCHANGE_NAME.equals(arguments.get(org.apache.qpid.server.model.Exchange.NAME)) + && HeadersExchange.TYPE.getType().equals(arguments.get(org.apache.qpid.server.model.Exchange.TYPE)) + && customExchangeId.equals(arguments.get(org.apache.qpid.server.model.Exchange.ID))) + { + return customExchange; + } + else + { + return null; + } + } + }); + + final ConfiguredObjectRecord[] expected = { new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", @@ -385,10 +402,26 @@ public class DurableConfigurationRecovererTest extends QpidTestCase when(customExchange.getId()).thenReturn(exchangeId); when(customExchange.getName()).thenReturn(CUSTOM_EXCHANGE_NAME); - when(_exchangeFactory.restoreExchange(eq(exchangeId), - eq(CUSTOM_EXCHANGE_NAME), - eq(HeadersExchange.TYPE.getType()), - anyBoolean())).thenReturn(customExchange); + final ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class); + + when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<Exchange>() + { + @Override + public Exchange answer(final InvocationOnMock invocation) throws Throwable + { + Map arguments = attributesCaptor.getValue(); + if(CUSTOM_EXCHANGE_NAME.equals(arguments.get(org.apache.qpid.server.model.Exchange.NAME)) + && HeadersExchange.TYPE.getType().equals(arguments.get(org.apache.qpid.server.model.Exchange.TYPE)) + && exchangeId.equals(arguments.get(org.apache.qpid.server.model.Exchange.ID))) + { + return customExchange; + } + else + { + return null; + } + } + }); _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index e68d5bb2ba..8c6725ea14 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -158,12 +158,7 @@ public class MockVirtualHost implements VirtualHost } @Override - public Exchange createExchange(UUID id, - String exchange, - String type, - boolean durable, - boolean autoDelete, - String alternateExchange) + public Exchange createExchange(Map<String,Object> attributes) { return null; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 5d36aa5321..c03daf20b3 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import java.security.AccessControlException; import java.util.EnumSet; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; @@ -61,13 +62,7 @@ import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.server.virtualhost.ExchangeExistsException; -import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; -import org.apache.qpid.server.virtualhost.RequiredExchangeException; -import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; -import org.apache.qpid.server.virtualhost.UnknownExchangeException; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.QueueExistsException; +import org.apache.qpid.server.virtualhost.*; import org.apache.qpid.transport.*; import java.nio.ByteBuffer; @@ -712,12 +707,16 @@ public class ServerSessionDelegate extends SessionDelegate try { - virtualHost.createExchange(null, - method.getExchange(), - method.getType(), - method.getDurable(), - method.getAutoDelete(), - method.getAlternateExchange()); + Map<String,Object> attributes = new HashMap<String, Object>(); + + attributes.put(org.apache.qpid.server.model.Exchange.ID, null); + attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange()); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType()); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable()); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange()); + virtualHost.createExchange(attributes); } catch(ReservedExchangeNameException e) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java index 87622b88e7..9446f53188 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java @@ -30,17 +30,21 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; import java.security.AccessControlException; +import java.util.HashMap; +import java.util.Map; public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody> { @@ -95,12 +99,18 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { try { - exchange = virtualHost.createExchange(null, - exchangeName == null ? null : exchangeName.intern().toString(), - body.getType() == null ? null : body.getType().intern().toString(), - body.getDurable(), - body.getAutoDelete(), - null); + String name = exchangeName == null ? null : exchangeName.intern().toString(); + String type = body.getType() == null ? null : body.getType().intern().toString(); + Map<String,Object> attributes = new HashMap<String, Object>(); + + attributes.put(org.apache.qpid.server.model.Exchange.ID, null); + attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + exchange = virtualHost.createExchange(attributes); } catch(ReservedExchangeNameException e) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index e19b15461a..c44216afac 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -56,6 +56,7 @@ import org.apache.qpid.server.queue.StandardQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -726,7 +727,15 @@ public class MessageStoreTest extends QpidTestCase { Exchange exchange = null; - exchange = getVirtualHost().createExchange(null, name, type.getType(), durable, false, null); + Map<String,Object> attributes = new HashMap<String, Object>(); + + attributes.put(org.apache.qpid.server.model.Exchange.NAME, name); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType()); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + exchange = getVirtualHost().createExchange(attributes); return exchange; } |
