diff options
Diffstat (limited to 'qpid/java')
37 files changed, 634 insertions, 716 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index f344d415f0..69e05ad989 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.binding; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.NonDefaultExchange; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.BindingMessages; import org.apache.qpid.server.logging.subjects.BindingLogSubject; @@ -51,7 +50,7 @@ public class BindingImpl { private final String _bindingKey; private final AMQQueue _queue; - private final NonDefaultExchange _exchange; + private final ExchangeImpl _exchange; private final Map<String, Object> _arguments; private final UUID _id; private final AtomicLong _matches = new AtomicLong(); @@ -65,7 +64,7 @@ public class BindingImpl public BindingImpl(UUID id, final String bindingKey, final AMQQueue queue, - final NonDefaultExchange exchange, + final ExchangeImpl exchange, final Map<String, Object> arguments) { this(id, convertToAttributes(bindingKey, arguments), queue, exchange); @@ -82,7 +81,7 @@ public class BindingImpl return attributes; } - public BindingImpl(UUID id, Map<String, Object> attributes, AMQQueue queue, NonDefaultExchange exchange) + public BindingImpl(UUID id, Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange) { super(id,Collections.EMPTY_MAP,attributes,queue.getVirtualHost().getTaskExecutor()); _id = id; @@ -120,7 +119,7 @@ public class BindingImpl } @Override - public NonDefaultExchange getExchange() + public ExchangeImpl getExchange() { return _exchange; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 966ac50b85..2a688f497a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -24,7 +24,6 @@ import java.security.AccessControlException; import java.util.ArrayList; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ExchangeMessages; @@ -33,7 +32,6 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Publisher; @@ -68,13 +66,13 @@ import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractExchange<T extends AbstractExchange<T>> extends AbstractConfiguredObject<T> - implements NonDefaultExchange<T> + implements ExchangeImpl<T> { private static final Logger _logger = Logger.getLogger(AbstractExchange.class); private final LifetimePolicy _lifetimePolicy; private final AtomicBoolean _closed = new AtomicBoolean(); - private NonDefaultExchange _alternateExchange; + private ExchangeImpl _alternateExchange; private boolean _durable; @@ -329,12 +327,12 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> return !_bindings.isEmpty(); } - public NonDefaultExchange getAlternateExchange() + public ExchangeImpl getAlternateExchange() { return _alternateExchange; } - public void setAlternateExchange(NonDefaultExchange exchange) + public void setAlternateExchange(ExchangeImpl exchange) { if(_alternateExchange != null) { @@ -836,13 +834,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } @Override - public void setAlternateExchange(final ExchangeImpl exchange) - { - // todo - _alternateExchange = (NonDefaultExchange) exchange; - } - - @Override public org.apache.qpid.server.model.Binding createBinding(final String bindingKey, final Queue queue, final Map<String, Object> bindingArguments, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java new file mode 100644 index 0000000000..f59049d276 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.exchange; + +import org.apache.log4j.Logger; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class DefaultDestination implements MessageDestination +{ + + private VirtualHost _virtualHost; + private static final Logger _logger = Logger.getLogger(DefaultDestination.class); + + public DefaultDestination(VirtualHost virtualHost) + { + _virtualHost = virtualHost; + } + + @Override + public String getName() + { + return ExchangeDefaults.DEFAULT_EXCHANGE_NAME; + } + + + public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<? super MessageInstance> postEnqueueAction) + { + final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); + if(q == null) + { + return 0; + } + else + { + txn.enqueue(q,message, new ServerTransaction.Action() + { + MessageReference _reference = message.newReference(); + + public void postCommit() + { + try + { + q.enqueue(message, postEnqueueAction); + } + finally + { + _reference.release(); + } + } + + public void onRollback() + { + _reference.release(); + } + }); + return 1; + } + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java deleted file mode 100644 index 0d435f43bd..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.exchange; - -import java.security.AccessControlException; -import java.util.Collections; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.log4j.Logger; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.server.binding.BindingImpl; -import org.apache.qpid.server.consumer.Consumer; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.StateChangeListener; -import org.apache.qpid.server.virtualhost.VirtualHost; - -public class DefaultExchange implements ExchangeImpl<DirectExchange> -{ - - private final QueueRegistry _queueRegistry; - private UUID _id; - private VirtualHost _virtualHost; - private static final Logger _logger = Logger.getLogger(DefaultExchange.class); - - private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>(); - - public DefaultExchange(VirtualHost virtualHost, QueueRegistry queueRegistry, UUID id) - { - _virtualHost = virtualHost; - _queueRegistry = queueRegistry; - _id = id; - } - - @Override - public String getName() - { - return ExchangeDefaults.DEFAULT_EXCHANGE_NAME; - } - - @Override - public ExchangeType<DirectExchange> getExchangeType() - { - return DirectExchange.TYPE; - } - - - @Override - public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) - { - throw new AccessControlException("Cannot add bindings to the default exchange"); - } - - @Override - public boolean deleteBinding(final String bindingKey, final AMQQueue queue) - { - throw new AccessControlException("Cannot delete bindings from the default exchange"); - } - - @Override - public boolean hasBinding(final String bindingKey, final AMQQueue queue) - { - return false; - } - - @Override - public boolean replaceBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) - { - throw new AccessControlException("Cannot replace bindings on the default exchange"); - } - - @Override - public void restoreBinding(UUID id, String bindingKey, AMQQueue queue, Map<String, Object> argumentMap) - { - _logger.warn("Bindings to the default exchange should not be stored in the configuration store"); - } - - @Override - public String getTypeName() - { - return getExchangeType().getType(); - } - - @Override - public boolean isDurable() - { - return false; - } - - @Override - public boolean isAutoDelete() - { - return false; - } - - @Override - public void close() - { - throw new AccessControlException("Cannot close the default exchange"); - } - - @Override - public boolean isBound(AMQQueue queue) - { - return _virtualHost.getQueue(queue.getName()) == queue; - } - - @Override - public boolean hasBindings() - { - return !_virtualHost.getQueues().isEmpty(); - } - - @Override - public boolean isBound(String bindingKey, AMQQueue queue) - { - return isBound(queue) && queue.getName().equals(bindingKey); - } - - @Override - public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) - { - return isBound(bindingKey, queue) && (arguments == null || arguments.isEmpty()); - } - - @Override - public boolean isBound(Map<String, Object> arguments, AMQQueue queue) - { - return (arguments == null || arguments.isEmpty()) && isBound(queue); - } - - @Override - public boolean isBound(String bindingKey, Map<String, Object> arguments) - { - return (arguments == null || arguments.isEmpty()) && isBound(bindingKey); - } - - @Override - public boolean isBound(Map<String, Object> arguments) - { - return (arguments == null || arguments.isEmpty()) && hasBindings(); - } - - @Override - public boolean isBound(String bindingKey) - { - return _virtualHost.getQueue(bindingKey) != null; - } - - @Override - public ExchangeImpl getAlternateExchange() - { - return null; - } - - @Override - public void setAlternateExchange(ExchangeImpl exchange) - { - _logger.warn("Cannot set the alternate exchange for the default exchange"); - } - - @Override - public void removeReference(ExchangeReferrer exchange) - { - _referrers.remove(exchange); - } - - @Override - public void addReference(ExchangeReferrer exchange) - { - _referrers.put(exchange, Boolean.TRUE); - } - - @Override - public boolean hasReferrers() - { - return !_referrers.isEmpty(); - } - - @Override - public void addBindingListener(BindingListener listener) - { - - } - - @Override - public void removeBindingListener(BindingListener listener) - { - // TODO - } - - @Override - public UUID getId() - { - return _id; - } - - public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<? super MessageInstance> postEnqueueAction) - { - final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); - if(q == null) - { - return 0; - } - else - { - txn.enqueue(q,message, new ServerTransaction.Action() - { - MessageReference _reference = message.newReference(); - - public void postCommit() - { - try - { - q.enqueue(message, postEnqueueAction); - } - finally - { - _reference.release(); - } - } - - public void onRollback() - { - _reference.release(); - } - }); - return 1; - } - } - - private static final StateChangeListener<BindingImpl, State> STATE_CHANGE_LISTENER = - new StateChangeListener<BindingImpl, State>() - { - @Override - public void stateChanged(final BindingImpl object, final State oldState, final State newState) - { - if(newState == State.DELETED) - { - throw new AccessControlException("Cannot remove bindings to the default exchange"); - } - } - }; -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index e9d0740539..99d6487af8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -96,7 +96,7 @@ public class DefaultExchangeFactory implements ExchangeFactory } @Override - public NonDefaultExchange createExchange(final Map<String, Object> attributes) + public ExchangeImpl createExchange(final Map<String, Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException { String type = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributes); @@ -109,7 +109,7 @@ public class DefaultExchangeFactory implements ExchangeFactory } @Override - public NonDefaultExchange restoreExchange(Map<String,Object> attributes) + public ExchangeImpl restoreExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException { return createExchange(attributes); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index e48a640fcb..70eecbb164 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.QueueRegistry; @@ -46,9 +47,9 @@ public class DefaultExchangeRegistry implements ExchangeRegistry /** * Maps from exchange name to exchange instance */ - private ConcurrentMap<String, ExchangeImpl> _exchangeMap = new ConcurrentHashMap<String, ExchangeImpl>(); + private ConcurrentMap<String, ExchangeImpl<?>> _exchangeMap = new ConcurrentHashMap<String, ExchangeImpl<?>>(); - private ExchangeImpl _defaultExchange; + private MessageDestination _defaultExchange; private final VirtualHost _host; private final QueueRegistry _queueRegistry; @@ -68,9 +69,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry initialiseExchanges(exchangeFactory, getDurableConfigurationStore()); _defaultExchange = - new DefaultExchange(_host, _queueRegistry, - UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME, - _host.getName())); + new DefaultDestination(_host + ); } @@ -96,7 +96,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry attributes.put(org.apache.qpid.server.model.Exchange.NAME, name); attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type); attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true); - ExchangeImpl exchange = f.createExchange(attributes); + ExchangeImpl<?> exchange = f.createExchange(attributes); registerExchange(exchange); if(exchange.isDurable()) { @@ -135,7 +135,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry } } - public ExchangeImpl getDefaultExchange() + public MessageDestination getDefaultExchange() { return _defaultExchange; } @@ -170,17 +170,9 @@ public class DefaultExchangeRegistry implements ExchangeRegistry } - public Collection<ExchangeImpl> getExchanges() + public Collection<ExchangeImpl<?>> getExchanges() { - return new ArrayList<ExchangeImpl>(_exchangeMap.values()); - } - - @Override - public Collection<NonDefaultExchange> getExchangesExceptDefault() - { - Collection allExchanges = getExchanges(); - allExchanges.remove(_defaultExchange); - return allExchanges; + return new ArrayList<ExchangeImpl<?>>(_exchangeMap.values()); } public void addRegistryChangeListener(RegistryChangeListener listener) @@ -188,22 +180,15 @@ public class DefaultExchangeRegistry implements ExchangeRegistry _listeners.add(listener); } - public ExchangeImpl getExchange(String name) + public ExchangeImpl<?> getExchange(String name) { - if ((name == null) || name.length() == 0) - { - return getDefaultExchange(); - } - else - { - return _exchangeMap.get(name); - } + return _exchangeMap.get(name); } @Override public void clearAndUnregisterMbeans() { - for (final ExchangeImpl exchange : getExchanges()) + for (final ExchangeImpl<?> exchange : getExchanges()) { //TODO: this is a bit of a hack, what if the listeners aren't aware //that we are just unregistering the MBean because of HA, and aren't @@ -220,24 +205,18 @@ public class DefaultExchangeRegistry implements ExchangeRegistry } @Override - public synchronized ExchangeImpl getExchange(UUID exchangeId) + public synchronized ExchangeImpl<?> getExchange(UUID exchangeId) { - if (exchangeId == null) - { - return getDefaultExchange(); - } - else + Collection<ExchangeImpl<?>> exchanges = _exchangeMap.values(); + for (ExchangeImpl<?> exchange : exchanges) { - Collection<ExchangeImpl> exchanges = _exchangeMap.values(); - for (ExchangeImpl exchange : exchanges) + if (exchange.getId().equals(exchangeId)) { - if (exchange.getId().equals(exchangeId)) - { - return exchange; - } + return exchange; } - return null; } + return null; + } public boolean isReservedExchangeName(String name) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java index 0c7cece752..2731f665ac 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java @@ -32,8 +32,8 @@ public interface ExchangeFactory Collection<ExchangeType<? extends ExchangeImpl>> getRegisteredTypes(); - NonDefaultExchange createExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException; + ExchangeImpl createExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException; - NonDefaultExchange restoreExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException; + ExchangeImpl restoreExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java index f2f5fde603..35d28d3384 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java @@ -22,13 +22,14 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import java.util.Map; import java.util.UUID; -public interface ExchangeImpl<T extends NonDefaultExchange> extends ExchangeReferrer, MessageDestination +public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, ExchangeReferrer, MessageDestination { UUID getId(); @@ -46,7 +47,7 @@ public interface ExchangeImpl<T extends NonDefaultExchange> extends ExchangeRefe */ boolean isAutoDelete(); - <X extends NonDefaultExchange<X>> ExchangeImpl<X> getAlternateExchange(); + ExchangeImpl getAlternateExchange(); void setAlternateExchange(ExchangeImpl exchange); @@ -110,6 +111,7 @@ public interface ExchangeImpl<T extends NonDefaultExchange> extends ExchangeRefe boolean hasReferrers(); + BindingImpl getBinding(String bindingName, AMQQueue queue); public interface BindingListener diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index b4f14b00c8..7fa7a64a62 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -20,19 +20,21 @@ */ package org.apache.qpid.server.exchange; +import org.apache.qpid.server.message.MessageDestination; + import java.util.Collection; import java.util.UUID; public interface ExchangeRegistry { - void registerExchange(ExchangeImpl exchange); + void registerExchange(ExchangeImpl<?> exchange); - ExchangeImpl getDefaultExchange(); + MessageDestination getDefaultExchange(); void initialise(ExchangeFactory exchangeFactory); - ExchangeImpl getExchange(String exchangeName); + ExchangeImpl<?> getExchange(String exchangeName); /** * Unregister an exchange @@ -43,11 +45,9 @@ public interface ExchangeRegistry void clearAndUnregisterMbeans(); - ExchangeImpl getExchange(UUID exchangeId); - - Collection<ExchangeImpl> getExchanges(); + ExchangeImpl<?> getExchange(UUID exchangeId); - Collection<NonDefaultExchange> getExchangesExceptDefault(); + Collection<ExchangeImpl<?>> getExchanges(); void addRegistryChangeListener(RegistryChangeListener listener); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/NonDefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/NonDefaultExchange.java index 6e5eb98078..419dbdb9b4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/NonDefaultExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/NonDefaultExchange.java @@ -26,7 +26,4 @@ import org.apache.qpid.server.queue.AMQQueue; public interface NonDefaultExchange<T extends NonDefaultExchange<T>> extends Exchange<T>, ExchangeImpl<T> { - NonDefaultExchange getAlternateExchange(); - - BindingImpl getBinding(String bindingName, AMQQueue queue); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java index 0c14fb38c7..b1646e87c8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java @@ -36,7 +36,7 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X> // Attributes @ManagedAttribute - <T extends Exchange<T>> Exchange<T> getAlternateExchange(); + Exchange<?> getAlternateExchange(); //children Collection<? extends Binding> getBindings(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 97b6744f1b..72b316c784 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -41,7 +41,6 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.NonDefaultExchange; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.*; @@ -49,7 +48,6 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AbstractQueue; import org.apache.qpid.server.queue.ConflationQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; @@ -192,7 +190,7 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo public Collection<Exchange> getExchanges() { - return _virtualHost == null ? Collections.<Exchange>emptyList() : new ArrayList<Exchange>(_virtualHost.getExchangesExceptDefault()); + return _virtualHost == null ? Collections.<Exchange>emptyList() : new ArrayList<Exchange>(_virtualHost.getExchanges()); } @@ -290,7 +288,7 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo lifetime != null && lifetime != LifetimePolicy.PERMANENT ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); attributes1.put(Exchange.ALTERNATE_EXCHANGE, alternateExchange); - NonDefaultExchange exchange = _virtualHost.createExchange(attributes1); + ExchangeImpl exchange = _virtualHost.createExchange(attributes1); return exchange; } @@ -503,13 +501,13 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo public void exchangeRegistered(ExchangeImpl exchange) { - childAdded((NonDefaultExchange)exchange); + childAdded(exchange); } public void exchangeUnregistered(ExchangeImpl exchange) { - childRemoved((NonDefaultExchange)exchange); + childRemoved(exchange); } public void queueRegistered(AMQQueue queue) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java index 3017aded98..87d10f745b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java @@ -23,11 +23,10 @@ package org.apache.qpid.server.plugin; import java.util.Map; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.NonDefaultExchange; import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; -public interface ExchangeType<T extends NonDefaultExchange> extends Pluggable +public interface ExchangeType<T extends ExchangeImpl<T>> extends Pluggable { public String getType(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 9216061169..0db7d78576 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.exchange.ExchangeReferrer; -import org.apache.qpid.server.exchange.NonDefaultExchange; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; @@ -32,11 +31,9 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.protocol.CapacityChecker; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.Deletable; import org.apache.qpid.server.virtualhost.VirtualHost; -import javax.management.NotificationListener; import java.util.Collection; import java.util.List; import java.util.Set; @@ -174,9 +171,9 @@ public interface AMQQueue<X extends AMQQueue<X>> void stop(); - NonDefaultExchange getAlternateExchange(); + ExchangeImpl getAlternateExchange(); - void setAlternateExchange(NonDefaultExchange exchange); + void setAlternateExchange(ExchangeImpl exchange); Collection<String> getAvailableAttributes(); Object getAttribute(String attrName); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 34895b61e8..dd82dfd681 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -26,7 +26,6 @@ import java.util.UUID; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.NonDefaultExchange; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.exchange.ExchangeDefaults; @@ -154,14 +153,14 @@ public class AMQQueueFactory implements QueueFactory { final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE); - NonDefaultExchange altExchange; + ExchangeImpl altExchange; try { - altExchange = (NonDefaultExchange) _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); + altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); } catch(IllegalArgumentException e) { - altExchange = (NonDefaultExchange) _virtualHost.getExchange(altExchangeAttr); + altExchange = _virtualHost.getExchange(altExchangeAttr); } queue.setAlternateExchange(altExchange); } @@ -183,7 +182,7 @@ public class AMQQueueFactory implements QueueFactory final String dlExchangeName = getDeadLetterExchangeName(queueName); final String dlQueueName = getDeadLetterQueueName(queueName); - NonDefaultExchange dlExchange = null; + ExchangeImpl dlExchange = null; final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName()); try @@ -202,7 +201,7 @@ public class AMQQueueFactory implements QueueFactory catch(ExchangeExistsException e) { // We're ok if the exchange already exists - dlExchange = (NonDefaultExchange) e.getExistingExchange(); + dlExchange = e.getExistingExchange(); } catch (ReservedExchangeNameException e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index f7362e18c8..62634970a6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -35,7 +35,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.connection.SessionPrincipal; -import org.apache.qpid.server.exchange.NonDefaultExchange; +import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.Queue; @@ -106,7 +106,7 @@ public abstract class AbstractQueue private final boolean _durable; - private NonDefaultExchange _alternateExchange; + private ExchangeImpl _alternateExchange; private final QueueEntryList _entries; @@ -516,12 +516,12 @@ public abstract class AbstractQueue return _exclusivityPolicy != ExclusivityPolicy.NONE; } - public NonDefaultExchange getAlternateExchange() + public ExchangeImpl getAlternateExchange() { return _alternateExchange; } - public void setAlternateExchange(NonDefaultExchange exchange) + public void setAlternateExchange(ExchangeImpl exchange) { if(_alternateExchange != null) { @@ -2853,7 +2853,7 @@ public abstract class AbstractQueue if(childClass == Binding.class && otherParents.length == 1 && otherParents[0] instanceof Exchange) { final String bindingKey = (String) attributes.get("name"); - ((NonDefaultExchange)otherParents[0]).addBinding(bindingKey, this, attributes); + ((ExchangeImpl)otherParents[0]).addBinding(bindingKey, this, attributes); for(Binding binding : _bindings) { if(binding.getExchange() == otherParents[0] && binding.getName().equals(bindingKey)) @@ -2899,7 +2899,7 @@ public abstract class AbstractQueue else if(ALTERNATE_EXCHANGE.equals(name)) { // In future we may want to accept a UUID as an alternative way to identifying the exchange - NonDefaultExchange alternateExchange = (NonDefaultExchange) desired; + ExchangeImpl alternateExchange = (ExchangeImpl) desired; setAlternateExchange(alternateExchange); return true; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 1f52638279..739dd1cc5b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -37,7 +37,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.NonDefaultExchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.configuration.ExchangeConfiguration; @@ -55,7 +54,6 @@ import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageNode; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.model.adapter.VirtualHostAdapter; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.plugin.SystemNodeCreator; @@ -380,44 +378,44 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue); } - //get the exchange name (returns default exchange name if none was specified) + //get the exchange name (returns empty String if none was specified) String exchangeName = queueConfiguration.getExchange(); - ExchangeImpl exchange = _exchangeRegistry.getExchange(exchangeName); - if (exchange == null) + + if("".equals(exchangeName)) { - throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName); + //get routing keys in configuration (returns empty list if none are defined) + List<?> routingKeys = queueConfiguration.getRoutingKeys(); + if(!(routingKeys.isEmpty() || (routingKeys.size()==1 && routingKeys.contains(queueName)))) + { + throw new ConfigurationException("Attempt to bind queue '" + queueName + "' with binding key(s) " + + routingKeys + " without specifying an exchange"); + } } - - ExchangeImpl defaultExchange = _exchangeRegistry.getDefaultExchange(); - - //get routing keys in configuration (returns empty list if none are defined) - List<?> routingKeys = queueConfiguration.getRoutingKeys(); - - for (Object routingKeyNameObj : routingKeys) + else { - String routingKey = String.valueOf(routingKeyNameObj); - - if (exchange.equals(defaultExchange)) + ExchangeImpl exchange = _exchangeRegistry.getExchange(exchangeName); + if (exchange == null) { - if(!queueName.equals(routingKey)) - { - throw new ConfigurationException("Illegal attempt to bind queue '" + queueName + - "' to the default exchange with a key other than the queue name: " + routingKey); - } + throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName); } - else + + //get routing keys in configuration (returns empty list if none are defined) + List<?> routingKeys = queueConfiguration.getRoutingKeys(); + + for (Object routingKeyNameObj : routingKeys) { + String routingKey = String.valueOf(routingKeyNameObj); + configureBinding(queue, exchange, routingKey, (Map) queueConfiguration.getBindingArguments(routingKey)); } - } - if (!exchange.equals(defaultExchange) && !routingKeys.contains(queueName)) - { - //bind the queue to the named exchange using its name - configureBinding(queue, exchange, queueName, null); + if (!routingKeys.contains(queueName)) + { + //bind the queue to the named exchange using its name + configureBinding(queue, exchange, queueName, null); + } } - } private void configureBinding(AMQQueue queue, ExchangeImpl exchange, String routingKey, Map<String,Object> arguments) @@ -605,31 +603,25 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } @Override - public ExchangeImpl getDefaultExchange() + public MessageDestination getDefaultDestination() { return _exchangeRegistry.getDefaultExchange(); } @Override - public Collection<ExchangeImpl> getExchanges() + public Collection<ExchangeImpl<?>> getExchanges() { return Collections.unmodifiableCollection(_exchangeRegistry.getExchanges()); } @Override - public Collection<NonDefaultExchange> getExchangesExceptDefault() - { - return Collections.unmodifiableCollection(_exchangeRegistry.getExchangesExceptDefault()); - } - - @Override public Collection<ExchangeType<? extends ExchangeImpl>> getExchangeTypes() { return _exchangeFactory.getRegisteredTypes(); } @Override - public NonDefaultExchange createExchange(Map<String,Object> attributes) + public ExchangeImpl createExchange(Map<String,Object> attributes) throws ExchangeExistsException, ReservedExchangeNameException, UnknownExchangeException, AMQUnknownExchangeType { @@ -658,7 +650,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg UUIDGenerator.generateExchangeUUID(name, getName())); } - NonDefaultExchange exchange = _exchangeFactory.createExchange(attributes); + ExchangeImpl exchange = _exchangeFactory.createExchange(attributes); _exchangeRegistry.registerExchange(exchange); if(durable) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java index a682079de1..e6577e04e4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java @@ -28,7 +28,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.NonDefaultExchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; @@ -114,7 +113,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B _exchange.restoreBinding(_bindingId, _bindingName, _queue, _bindingArgumentsMap); } - return ((NonDefaultExchange)_exchange).getBinding(_bindingName, _queue); + return (_exchange).getBinding(_bindingName, _queue); } private class QueueDependency implements UnresolvedDependency<AMQQueue> diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java index bc65ece87e..2743b0ef59 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java @@ -59,7 +59,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer< private class UnresolvedExchange implements UnresolvedObject<ExchangeImpl> { - private ExchangeImpl _exchange; + private ExchangeImpl<?> _exchange; public UnresolvedExchange(final UUID id, final Map<String, Object> attributeMap) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index fd30119d7b..63b76f36a6 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -31,7 +31,6 @@ import org.apache.qpid.common.Closeable; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.NonDefaultExchange; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.plugin.ExchangeType; @@ -62,7 +61,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable AMQQueue createQueue(Map<String, Object> arguments) throws QueueExistsException; - NonDefaultExchange createExchange(Map<String,Object> attributes) + ExchangeImpl createExchange(Map<String,Object> attributes) throws ExchangeExistsException, ReservedExchangeNameException, UnknownExchangeException, AMQUnknownExchangeType; @@ -75,9 +74,9 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable ExchangeImpl getExchange(UUID id); - ExchangeImpl getDefaultExchange(); + MessageDestination getDefaultDestination(); - Collection<ExchangeImpl> getExchanges(); + Collection<ExchangeImpl<?>> getExchanges(); Collection<ExchangeType<? extends ExchangeImpl>> getExchangeTypes(); @@ -137,7 +136,5 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable TaskExecutor getTaskExecutor(); - Collection<NonDefaultExchange> getExchangesExceptDefault(); - org.apache.qpid.server.model.VirtualHost getModel(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java index 5ec9a7762d..b5fe5e0072 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java @@ -168,26 +168,7 @@ public class DefaultExchangeFactoryTest extends QpidTestCase public void testCreateDefaultExchangeFactoryWithCustomExchangeType() { - ExchangeType<?> customExchangeType = new ExchangeType<NonDefaultExchange>() - { - @Override - public String getType() - { - return "my-custom-exchange"; - } - - @Override - public NonDefaultExchange newInstance(VirtualHost host, Map<String,Object> attributes) - { - return null; - } - - @Override - public String getDefaultExchangeName() - { - return null; - } - }; + ExchangeType<?> customExchangeType = new CustomExchangeType(); _stubbedExchangeTypes.add(customExchangeType); _stubbedExchangeTypes.add(_directExchangeType); @@ -206,6 +187,31 @@ public class DefaultExchangeFactoryTest extends QpidTestCase assertTrue("Custom exchange type is not found", registeredTypes.contains(customExchangeType)); } + public static abstract class CustomExchange implements ExchangeImpl<CustomExchange> + { + } + + private static class CustomExchangeType implements ExchangeType<CustomExchange> + { + @Override + public String getType() + { + return "my-custom-exchange"; + } + + @Override + public CustomExchange newInstance(VirtualHost host, Map<String,Object> attributes) + { + return null; + } + + @Override + public String getDefaultExchangeName() + { + return null; + } + } + private final class TestExchangeFactory extends DefaultExchangeFactory { private TestExchangeFactory() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index 453a1b8e7d..8c9132d166 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -139,7 +139,7 @@ public class HeadersBindingTest extends TestCase private MockHeader matchHeaders = new MockHeader(); private int _count = 0; private AMQQueue _queue; - private NonDefaultExchange _exchange; + private ExchangeImpl _exchange; protected void setUp() { @@ -149,7 +149,7 @@ public class HeadersBindingTest extends TestCase when(_queue.getVirtualHost()).thenReturn(vhost); when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); CurrentActor.set(mock(LogActor.class)); - _exchange = mock(NonDefaultExchange.class); + _exchange = mock(ExchangeImpl.class); when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class)); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index d32f8bcf49..64bcf8f730 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.NonDefaultExchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -150,7 +149,7 @@ public class AMQQueueFactoryTest extends QpidTestCase final String name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributeValues); final UUID id = MapValueConverter.getUUIDAttribute(org.apache.qpid.server.model.Exchange.ID, attributeValues); - final NonDefaultExchange exchange = mock(NonDefaultExchange.class); + final ExchangeImpl exchange = mock(ExchangeImpl.class); ExchangeType exType = mock(ExchangeType.class); when(exchange.getName()).thenReturn(name); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 56b931f37e..613dd07741 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -40,7 +40,6 @@ import org.apache.commons.configuration.Configuration; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.NonDefaultExchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.EnqueueableMessage; @@ -81,7 +80,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; - private NonDefaultExchange _exchange = mock(NonDefaultExchange.class); + private ExchangeImpl _exchange = mock(ExchangeImpl.class); private static final String ROUTING_KEY = "routingKey"; private static final String QUEUE_NAME = "queueName"; private Map<String,Object> _bindingArgs; @@ -258,7 +257,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception { - NonDefaultExchange alternateExchange = createTestAlternateExchange(); + ExchangeImpl alternateExchange = createTestAlternateExchange(); AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange, null); DurableConfigurationStoreHelper.createQueue(_configStore, queue); @@ -274,10 +273,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } - private NonDefaultExchange createTestAlternateExchange() + private ExchangeImpl createTestAlternateExchange() { UUID exchUuid = UUID.randomUUID(); - NonDefaultExchange alternateExchange = mock(NonDefaultExchange.class); + ExchangeImpl alternateExchange = mock(ExchangeImpl.class); when(alternateExchange.getId()).thenReturn(exchUuid); return alternateExchange; } @@ -318,7 +317,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest DurableConfigurationStoreHelper.createQueue(_configStore, queue); // update the queue to have exclusive=false - NonDefaultExchange alternateExchange = createTestAlternateExchange(); + ExchangeImpl alternateExchange = createTestAlternateExchange(); queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange, attributes); DurableConfigurationStoreHelper.updateQueue(_configStore, queue); @@ -362,7 +361,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive, - NonDefaultExchange alternateExchange, + ExchangeImpl alternateExchange, final Map<String, Object> arguments) throws StoreException { AMQQueue<?> queue = mock(AMQQueue.class); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 90ba03a789..1eaccc4e5f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -28,7 +28,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; -import org.apache.qpid.server.exchange.NonDefaultExchange; +import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.exchange.DirectExchange; @@ -70,8 +70,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase private static final String CUSTOM_EXCHANGE_NAME = "customExchange"; private DurableConfigurationRecoverer _durableConfigurationRecoverer; - private NonDefaultExchange _directExchange; - private NonDefaultExchange _topicExchange; + private ExchangeImpl _directExchange; + private ExchangeImpl _topicExchange; private VirtualHost _vhost; private DurableConfigurationStore _store; private ExchangeFactory _exchangeFactory; @@ -84,11 +84,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase super.setUp(); - _directExchange = mock(NonDefaultExchange.class); + _directExchange = mock(ExchangeImpl.class); when(_directExchange.getExchangeType()).thenReturn(DirectExchange.TYPE); - _topicExchange = mock(NonDefaultExchange.class); + _topicExchange = mock(ExchangeImpl.class); when(_topicExchange.getExchangeType()).thenReturn(TopicExchange.TYPE); AMQQueue queue = mock(AMQQueue.class); @@ -101,14 +101,14 @@ public class DurableConfigurationRecovererTest extends QpidTestCase when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue); - final ArgumentCaptor<NonDefaultExchange> registeredExchange = ArgumentCaptor.forClass(NonDefaultExchange.class); + final ArgumentCaptor<ExchangeImpl> registeredExchange = ArgumentCaptor.forClass(ExchangeImpl.class); doAnswer(new Answer() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { - NonDefaultExchange exchange = registeredExchange.getValue(); + ExchangeImpl exchange = registeredExchange.getValue(); when(_exchangeRegistry.getExchange(eq(exchange.getId()))).thenReturn(exchange); when(_exchangeRegistry.getExchange(eq(exchange.getName()))).thenReturn(exchange); return null; @@ -139,14 +139,14 @@ public class DurableConfigurationRecovererTest extends QpidTestCase when(_vhost.getQueue(eq(queueName))).thenReturn(queue); when(_vhost.getQueue(eq(queueId))).thenReturn(queue); - final ArgumentCaptor<NonDefaultExchange> altExchangeArg = ArgumentCaptor.forClass(NonDefaultExchange.class); + final ArgumentCaptor<ExchangeImpl> altExchangeArg = ArgumentCaptor.forClass(ExchangeImpl.class); doAnswer( new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { - final NonDefaultExchange value = altExchangeArg.getValue(); + final ExchangeImpl value = altExchangeArg.getValue(); when(queue.getAlternateExchange()).thenReturn(value); return null; } @@ -157,8 +157,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase if (args.containsKey(Queue.ALTERNATE_EXCHANGE)) { final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString()); - final NonDefaultExchange exchange = - (NonDefaultExchange) _exchangeRegistry.getExchange(exchangeId); + final ExchangeImpl exchange = + (ExchangeImpl) _exchangeRegistry.getExchange(exchangeId); queue.setAlternateExchange(exchange); } return queue; @@ -267,13 +267,13 @@ public class DurableConfigurationRecovererTest extends QpidTestCase "org.apache.qpid.server.model.Exchange", createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE)); - final NonDefaultExchange customExchange = mock(NonDefaultExchange.class); + final ExchangeImpl customExchange = mock(ExchangeImpl.class); final ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class); - when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<NonDefaultExchange>() + when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<ExchangeImpl>() { @Override - public NonDefaultExchange answer(final InvocationOnMock invocation) throws Throwable + public ExchangeImpl answer(final InvocationOnMock invocation) throws Throwable { Map arguments = attributesCaptor.getValue(); if(CUSTOM_EXCHANGE_NAME.equals(arguments.get(org.apache.qpid.server.model.Exchange.NAME)) @@ -397,17 +397,17 @@ public class DurableConfigurationRecovererTest extends QpidTestCase final UUID queueId = new UUID(1, 0); final UUID exchangeId = new UUID(2, 0); - final NonDefaultExchange customExchange = mock(NonDefaultExchange.class); + final ExchangeImpl customExchange = mock(ExchangeImpl.class); when(customExchange.getId()).thenReturn(exchangeId); when(customExchange.getName()).thenReturn(CUSTOM_EXCHANGE_NAME); final ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class); - when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<NonDefaultExchange>() + when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<ExchangeImpl>() { @Override - public NonDefaultExchange answer(final InvocationOnMock invocation) throws Throwable + public ExchangeImpl answer(final InvocationOnMock invocation) throws Throwable { Map arguments = attributesCaptor.getValue(); if(CUSTOM_EXCHANGE_NAME.equals(arguments.get(org.apache.qpid.server.model.Exchange.NAME)) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index baa62b8ea2..90829ca271 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -27,7 +27,6 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.NonDefaultExchange; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.plugin.ExchangeType; @@ -160,7 +159,7 @@ public class MockVirtualHost implements VirtualHost } @Override - public NonDefaultExchange createExchange(Map<String,Object> attributes) + public ExchangeImpl createExchange(Map<String,Object> attributes) { return null; } @@ -189,13 +188,13 @@ public class MockVirtualHost implements VirtualHost } @Override - public ExchangeImpl getDefaultExchange() + public ExchangeImpl getDefaultDestination() { return null; } @Override - public Collection<ExchangeImpl> getExchanges() + public Collection<ExchangeImpl<?>> getExchanges() { return null; } @@ -360,12 +359,6 @@ public class MockVirtualHost implements VirtualHost } @Override - public Collection<NonDefaultExchange> getExchangesExceptDefault() - { - return null; - } - - @Override public org.apache.qpid.server.model.VirtualHost getModel() { return null; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java index 61569cc3e7..f081268337 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java @@ -117,7 +117,7 @@ public class StandardVirtualHostTest extends QpidTestCase { Throwable cause = e.getCause(); assertNotNull(cause); - assertEquals("Illegal attempt to bind queue '" + queueName + "' to the default exchange with a key other than the queue name: " + customBinding, cause.getMessage()); + assertEquals("Attempt to bind queue '" + queueName + "' with binding key(s) [" + customBinding + "] without specifying an exchange", cause.getMessage()); } } @@ -253,9 +253,6 @@ public class StandardVirtualHostTest extends QpidTestCase AMQQueue queue = vhost.getQueue(queueName); assertNotNull("queue should exist", queue); - ExchangeImpl defaultExch = vhost.getDefaultExchange(); - assertTrue("queue should have been bound to default exchange with its name", defaultExch.isBound(queueName, queue)); - ExchangeImpl exch = vhost.getExchange(exchangeName); assertTrue("queue should have been bound to " + exchangeName + " with its name", exch.isBound(queueName, queue)); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 1fb82efd2d..9d7764414f 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -27,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; @@ -683,83 +684,101 @@ public class ServerSessionDelegate extends SessionDelegate return; } } - - if(method.getPassive()) + if(method.getExchange() == null || method.getExchange().equals("")) { - ExchangeImpl exchange = getExchange(session, exchangeName); - - if(exchange == null) + if(!DirectExchange.TYPE.getType().equals(method.getType())) { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'"); + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to redeclare default exchange " + + " of type " + DirectExchange.TYPE.getType() + + " to " + method.getType() +"."); } - else + if(method.hasAlternateExchange() && !"".equals(method.getAlternateExchange())) { - if (!exchange.getTypeName().equals(method.getType()) - && (method.getType() != null && method.getType().length() > 0)) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " - + exchangeName + " of type " + exchange.getTypeName() + " to " + method.getType() + "."); - } + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to set alternate exchange of the default exchange " + + " to " + method.getAlternateExchange() +"."); } } else { - - try - { - Map<String,Object> attributes = new HashMap<String, Object>(); - - attributes.put(org.apache.qpid.server.model.Exchange.ID, null); - attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange()); - attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType()); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable()); - attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, - method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange()); - virtualHost.createExchange(attributes); - } - catch(ReservedExchangeNameException e) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: " - + exchangeName + " which begins with reserved name or prefix."); - } - catch(UnknownExchangeException e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, - "Unknown alternate exchange " + e.getExchangeName()); - } - catch(AMQUnknownExchangeType e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); - } - catch(ExchangeExistsException e) + if(method.getPassive()) { - ExchangeImpl exchange = e.getExistingExchange(); - if(!exchange.getTypeName().equals(method.getType())) + + ExchangeImpl exchange = getExchange(session, exchangeName); + + if(exchange == null) { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to redeclare exchange: " + exchangeName - + " of type " + exchange.getTypeName() - + " to " + method.getType() +"."); + exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'"); } - else if(method.hasAlternateExchange() - && (exchange.getAlternateExchange() == null || - !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) + else { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to change alternate exchange of: " + exchangeName - + " from " + exchange.getAlternateExchange() - + " to " + method.getAlternateExchange() +"."); + if (!exchange.getTypeName().equals(method.getType()) + && (method.getType() != null && method.getType().length() > 0)) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + exchange.getTypeName() + " to " + method.getType() + "."); + } } } - catch (AccessControlException e) + else { - exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); - } + + try + { + Map<String,Object> attributes = new HashMap<String, Object>(); + + attributes.put(org.apache.qpid.server.model.Exchange.ID, null); + attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange()); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType()); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable()); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange()); + virtualHost.createExchange(attributes); + } + catch(ReservedExchangeNameException e) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: " + + exchangeName + " which begins with reserved name or prefix."); + } + catch(UnknownExchangeException e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, + "Unknown alternate exchange " + e.getExchangeName()); + } + catch(AMQUnknownExchangeType e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); + } + catch(ExchangeExistsException e) + { + ExchangeImpl exchange = e.getExistingExchange(); + if(!exchange.getTypeName().equals(method.getType())) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to redeclare exchange: " + exchangeName + + " of type " + exchange.getTypeName() + + " to " + method.getType() +"."); + } + else if(method.hasAlternateExchange() + && (exchange.getAlternateExchange() == null || + !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to change alternate exchange of: " + exchangeName + + " from " + exchange.getAlternateExchange() + + " to " + method.getAlternateExchange() +"."); + } + } + catch (AccessControlException e) + { + exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); + } + } } - } private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description) @@ -789,12 +808,12 @@ public class ServerSessionDelegate extends SessionDelegate destination = virtualHost.getMessageDestination(xfr.getDestination()); if(destination == null) { - destination = virtualHost.getDefaultExchange(); + destination = virtualHost.getDefaultDestination(); } } else { - destination = virtualHost.getDefaultExchange(); + destination = virtualHost.getDefaultDestination(); } return destination; } @@ -878,19 +897,30 @@ public class ServerSessionDelegate extends SessionDelegate ExchangeQueryResult result = new ExchangeQueryResult(); - ExchangeImpl exchange = getExchange(session, method.getName()); - if(exchange != null) + final String exchangeName = method.getName(); + + if(exchangeName == null || exchangeName.equals("")) { - result.setDurable(exchange.isDurable()); - result.setType(exchange.getTypeName()); + result.setDurable(true); + result.setType(DirectExchange.TYPE.getType()); result.setNotFound(false); } else { - result.setNotFound(true); - } + ExchangeImpl exchange = getExchange(session, exchangeName); + if(exchange != null) + { + result.setDurable(exchange.isDurable()); + result.setType(exchange.getTypeName()); + result.setNotFound(false); + } + else + { + result.setNotFound(true); + } + } session.executionResult((int) method.getId(), result); } @@ -904,52 +934,56 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set"); } - else if (nameNullOrEmpty(method.getExchange())) - { - exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange"); - } else { - //TODO - here because of non-compliant python tests - // should raise exception ILLEGAL_ARGUMENT "binding-key not set" - if (!method.hasBindingKey()) - { - method.setBindingKey(method.getQueue()); - } - AMQQueue queue = virtualHost.getQueue(method.getQueue()); - ExchangeImpl exchange = virtualHost.getExchange(method.getExchange()); - if(queue == null) + final String exchangeName = method.getExchange(); + if (nameNullOrEmpty(exchangeName)) { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); - } - else if(exchange == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found"); - } - else if(exchange.getExchangeType().equals(HeadersExchange.TYPE) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match"))) - { - exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getType() + " require an x-match header"); + exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange"); } else { - if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue)) + //TODO - here because of non-compliant python tests + // should raise exception ILLEGAL_ARGUMENT "binding-key not set" + if (!method.hasBindingKey()) { - try + method.setBindingKey(method.getQueue()); + } + AMQQueue queue = virtualHost.getQueue(method.getQueue()); + ExchangeImpl exchange = virtualHost.getExchange(exchangeName); + if(queue == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); + } + else if(exchange == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + exchangeName + "' not found"); + } + else if(exchange.getExchangeType().equals(HeadersExchange.TYPE) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match"))) + { + exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getType() + " require an x-match header"); + } + else + { + if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue)) { - exchange.addBinding(method.getBindingKey(), queue, method.getArguments()); + try + { + exchange.addBinding(method.getBindingKey(), queue, method.getArguments()); + } + catch (AccessControlException e) + { + exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); + } } - catch (AccessControlException e) + else { - exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); + // todo } } - else - { - // todo - } - } + } } @@ -1010,8 +1044,10 @@ public class ServerSessionDelegate extends SessionDelegate VirtualHost virtualHost = getVirtualHost(session); ExchangeImpl exchange; AMQQueue queue; - if(method.hasExchange()) + boolean isDefaultExchange; + if(method.hasExchange() && !method.getExchange().equals("")) { + isDefaultExchange = false; exchange = virtualHost.getExchange(method.getExchange()); if(exchange == null) @@ -1021,11 +1057,47 @@ public class ServerSessionDelegate extends SessionDelegate } else { - exchange = virtualHost.getDefaultExchange(); + isDefaultExchange = true; + exchange = null; } + if(isDefaultExchange) + { + if(method.hasQueue()) + { + queue = getQueue(session, method.getQueue()); - if(method.hasQueue()) + if(queue == null) + { + result.setQueueNotFound(true); + } + else + { + if(method.hasBindingKey()) + { + if(!method.getBindingKey().equals(method.getQueue())) + { + result.setKeyNotMatched(true); + } + } + } + } + else if(method.hasBindingKey()) + { + if(getQueue(session, method.getBindingKey()) == null) + { + result.setKeyNotMatched(true); + } + } + + if(method.hasArguments() && !method.getArguments().isEmpty()) + { + result.setArgsNotMatched(true); + } + + + } + else if(method.hasQueue()) { queue = getQueue(session, method.getQueue()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java index 101a92242f..fc085e8ab1 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v0_8.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; @@ -62,16 +61,23 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic } AMQShortString exchangeName = body.getExchange(); + VirtualHost vHost = session.getVirtualHost(); + // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? - if (exchangeName == null) + + MessageDestination destination; + + if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName)) { - exchangeName = AMQShortString.valueOf(ExchangeDefaults.DEFAULT_EXCHANGE_NAME); + destination = vHost.getDefaultDestination(); + } + else + { + destination = vHost.getMessageDestination(exchangeName.toString()); } - VirtualHost vHost = session.getVirtualHost(); - MessageDestination exch = vHost.getMessageDestination(exchangeName.toString()); // if the exchange does not exist we raise a channel exception - if (exch == null) + if (destination == null) { throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name"); } @@ -91,7 +97,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic info.setExchange(exchangeName); try { - channel.setPublishFrame(info, exch); + channel.setPublishFrame(info, destination); } catch (AccessControlException e) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java index 27837844ff..fce1260155 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java @@ -79,107 +79,155 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo channel.sync(); - AMQShortString exchangeName = body.getExchange() == null ? AMQShortString.EMPTY_STRING : body.getExchange(); + AMQShortString exchangeName = body.getExchange(); AMQShortString queueName = body.getQueue(); AMQShortString routingKey = body.getRoutingKey(); - ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString()); ExchangeBoundOkBody response; - if (exchange == null) + if(exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING)) { - - - response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND, - new AMQShortString("Exchange '" + exchangeName + "' not found")); - } - else if (routingKey == null) - { - if (queueName == null) + if(routingKey == null) { - if (exchange.hasBindings()) + if(queueName == null) { - response = methodRegistry.createExchangeBoundOkBody(OK, null); + response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null); } else { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { - response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode - null); // replyText + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode + new AMQShortString("Queue '" + queueName + "' not found")); // replyText + } + else + { + response = methodRegistry.createExchangeBoundOkBody(OK, null); + } } } else { - - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) + if(queueName == null) { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - new AMQShortString("Queue '" + queueName + "' not found")); // replyText + response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null); } else { - if (exchange.isBound(queue)) + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) { - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode + new AMQShortString("Queue '" + queueName + "' not found")); // replyText } else { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode - new AMQShortString("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText + response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null); } } } } - else if (queueName != null) + else { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) + ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString()); + if (exchange == null) { - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - new AMQShortString("Queue '" + queueName + "' not found")); // replyText + + response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND, + new AMQShortString("Exchange '" + exchangeName + "' not found")); } - else + else if (routingKey == null) { - String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString(); - if (exchange.isBound(bindingKey, queue)) + if (queueName == null) { + if (exchange.hasBindings()) + { + response = methodRegistry.createExchangeBoundOkBody(OK, null); + } + else + { - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText + response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode + null); // replyText + } } else { - String message = "Queue '" + queueName + "' not bound with routing key '" + - body.getRoutingKey() + "' to exchange '" + exchangeName + "'"; + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { - if(message.length()>255) + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode + new AMQShortString("Queue '" + queueName + "' not found")); // replyText + } + else { - message = message.substring(0,254); + if (exchange.isBound(queue)) + { + + response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode + null); // replyText + } + else + { + + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode + new AMQShortString("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText + } } - response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode - new AMQShortString(message)); // replyText } } - } - else - { - if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString())) + else if (queueName != null) { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode + new AMQShortString("Queue '" + queueName + "' not found")); // replyText + } + else + { + String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString(); + if (exchange.isBound(bindingKey, queue)) + { - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText + response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode + null); // replyText + } + else + { + + String message = "Queue '" + queueName + "' not bound with routing key '" + + body.getRoutingKey() + "' to exchange '" + exchangeName + "'"; + + if(message.length()>255) + { + message = message.substring(0,254); + } + response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode + new AMQShortString(message)); // replyText + } + } } else { + if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString())) + { - response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode - new AMQShortString("No queue bound with routing key '" + body.getRoutingKey() + - "' to exchange '" + exchangeName + "'")); // replyText + response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode + null); // replyText + } + else + { + + response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode + new AMQShortString("No queue bound with routing key '" + body.getRoutingKey() + + "' to exchange '" + exchangeName + "'")); // replyText + } } } session.writeFrame(response.generateFrame(channelId)); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java index 3b630c684c..78d47aaa52 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java @@ -30,6 +30,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.protocol.v0_8.AMQChannel; @@ -78,76 +79,90 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange ExchangeImpl exchange; - if (body.getPassive()) + if(exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING)) { - exchange = virtualHost.getExchange(exchangeName == null ? null : exchangeName.toString()); - if(exchange == null) + if(!new AMQShortString(DirectExchange.TYPE.getType()).equals(body.getType())) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName); + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: " + + " of type " + + DirectExchange.TYPE.getType() + + " to " + body.getType() +".", + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); } - else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getTypeName().equals(body.getType().asString())) - { - - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + - exchangeName + " of type " + exchange.getTypeName() - + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null); - } - } else { - try + if (body.getPassive()) { - String name = exchangeName == null ? null : exchangeName.intern().toString(); - String type = body.getType() == null ? null : body.getType().intern().toString(); - Map<String,Object> attributes = new HashMap<String, Object>(); - - attributes.put(org.apache.qpid.server.model.Exchange.ID, null); - attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); - attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); - attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, - body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); - exchange = virtualHost.createExchange(attributes); + exchange = virtualHost.getExchange(exchangeName.toString()); + if(exchange == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName); + } + else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getTypeName().equals(body.getType().asString())) + { - } - catch(ReservedExchangeNameException e) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix."); + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + exchange.getTypeName() + + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null); + } } - catch(ExchangeExistsException e) + else { - exchange = e.getExistingExchange(); - if(!new AMQShortString(exchange.getTypeName()).equals(body.getType())) + try { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " - + exchangeName + " of type " - + exchange.getTypeName() - + " to " + body.getType() +".", - body.getClazz(), body.getMethod(), - body.getMajor(), body.getMinor(),null); + String name = exchangeName == null ? null : exchangeName.intern().toString(); + String type = body.getType() == null ? null : body.getType().intern().toString(); + Map<String,Object> attributes = new HashMap<String, Object>(); + + attributes.put(org.apache.qpid.server.model.Exchange.ID, null); + attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + exchange = virtualHost.createExchange(attributes); + + } + catch(ReservedExchangeNameException e) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Attempt to declare exchange: " + exchangeName + + " which begins with reserved prefix."); + + } + catch(ExchangeExistsException e) + { + exchange = e.getExistingExchange(); + if(!new AMQShortString(exchange.getTypeName()).equals(body.getType())) + { + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + + exchange.getTypeName() + + " to " + body.getType() +".", + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); + } + } + catch(AMQUnknownExchangeType e) + { + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e); + } + catch (AccessControlException e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + } + catch (UnknownExchangeException e) + { + // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur + throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e); } - } - catch(AMQUnknownExchangeType e) - { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); - } - catch (UnknownExchangeException e) - { - // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur - throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e); } } - if(!body.getNowait()) { MethodRegistry methodRegistry = session.getMethodRegistry(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java index 720677064b..bc723fc3dd 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java @@ -62,6 +62,11 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD { final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString(); + if(exchangeName == null || "".equals(exchangeName)) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted"); + } + final ExchangeImpl exchange = virtualHost.getExchange(exchangeName); if(exchange == null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java index 1e0382f456..7dc76d13d0 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java @@ -102,6 +102,12 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist."); } final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString(); + + if(exchangeName == null || "".equals(exchangeName)) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange"); + } + final ExchangeImpl exch = virtualHost.getExchange(exchangeName); if (exch == null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java index a828ca323d..abc9c8541c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java @@ -93,6 +93,12 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB { throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); } + + if(body.getExchange() == null || body.getExchange().equals(AMQShortString.EMPTY_STRING)) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue " + queue.getName() + " from the default exchange"); + } + final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString()); if (exch == null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java index 86adc585c3..580b912552 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -73,10 +74,18 @@ public class BrokerTestHelper_0_8 extends BrokerTestHelper when(info.getExchange()).thenReturn(exchangeNameAsShortString); when(info.getRoutingKey()).thenReturn(routingKey); - ExchangeImpl exchange = channel.getVirtualHost().getExchange(exchangeName); + MessageDestination destination; + if(exchangeName == null || "".equals(exchangeName)) + { + destination = channel.getVirtualHost().getDefaultDestination(); + } + else + { + destination = channel.getVirtualHost().getExchange(exchangeName); + } for (int count = 0; count < numberOfMessages; count++) { - channel.setPublishFrame(info, exchange); + channel.setPublishFrame(info, destination); // Set the body size ContentHeaderBody _headerBody = new ContentHeaderBody(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 1e5c8caa18..78dcab9d75 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -130,11 +130,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio MessageSource queue = getVirtualHost().getMessageSource(addr); if(queue != null) { - destination = new MessageSourceDestination(queue); - - - } else { @@ -145,7 +141,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } else { - endpoint.setSource(null); destination = null; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index e6f9c46523..bb57750426 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -34,7 +34,6 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; -import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; @@ -366,7 +365,7 @@ public class MessageStoreTest extends QpidTestCase { int origExchangeCount = getVirtualHost().getExchanges().size(); - Map<String, ExchangeImpl> oldExchanges = createExchanges(); + Map<String, ExchangeImpl<?>> oldExchanges = createExchanges(); assertEquals("Incorrect number of exchanges registered before recovery", origExchangeCount + 3, getVirtualHost().getExchanges().size()); @@ -421,7 +420,7 @@ public class MessageStoreTest extends QpidTestCase createAllQueues(); createAllTopicQueues(); - Map<String, ExchangeImpl> exchanges = createExchanges(); + Map<String, ExchangeImpl<?>> exchanges = createExchanges(); ExchangeImpl nonDurableExchange = exchanges.get(nonDurableExchangeName); ExchangeImpl directExchange = exchanges.get(directExchangeName); @@ -479,11 +478,11 @@ public class MessageStoreTest extends QpidTestCase * and that the new exchanges are not the same objects as the provided list (i.e. that the * reload actually generated new exchange objects) */ - private void validateExchanges(int originalNumExchanges, Map<String, ExchangeImpl> oldExchanges) + private void validateExchanges(int originalNumExchanges, Map<String, ExchangeImpl<?>> oldExchanges) { - Collection<ExchangeImpl> exchanges = getVirtualHost().getExchanges(); + Collection<ExchangeImpl<?>> exchanges = getVirtualHost().getExchanges(); Collection<String> exchangeNames = new ArrayList(exchanges.size()); - for(ExchangeImpl exchange : exchanges) + for(ExchangeImpl<?> exchange : exchanges) { exchangeNames.add(exchange.getName()); } @@ -709,9 +708,9 @@ public class MessageStoreTest extends QpidTestCase } - private Map<String, ExchangeImpl> createExchanges() throws Exception + private Map<String, ExchangeImpl<?>> createExchanges() throws Exception { - Map<String, ExchangeImpl> exchanges = new HashMap<String, ExchangeImpl>(); + Map<String, ExchangeImpl<?>> exchanges = new HashMap<String, ExchangeImpl<?>>(); //Register non-durable DirectExchange exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false)); @@ -723,9 +722,9 @@ public class MessageStoreTest extends QpidTestCase return exchanges; } - private ExchangeImpl createExchange(ExchangeType<?> type, String name, boolean durable) throws Exception + private ExchangeImpl<?> createExchange(ExchangeType<?> type, String name, boolean durable) throws Exception { - ExchangeImpl exchange = null; + ExchangeImpl<?> exchange = null; Map<String,Object> attributes = new HashMap<String, Object>(); |
