summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java17
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java89
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java275
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java59
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/NonDefaultExchange.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java46
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java13
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java34
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java13
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java276
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java20
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java152
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java123
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java13
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java19
37 files changed, 634 insertions, 716 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
index f344d415f0..69e05ad989 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.binding;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BindingMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
@@ -51,7 +50,7 @@ public class BindingImpl
{
private final String _bindingKey;
private final AMQQueue _queue;
- private final NonDefaultExchange _exchange;
+ private final ExchangeImpl _exchange;
private final Map<String, Object> _arguments;
private final UUID _id;
private final AtomicLong _matches = new AtomicLong();
@@ -65,7 +64,7 @@ public class BindingImpl
public BindingImpl(UUID id,
final String bindingKey,
final AMQQueue queue,
- final NonDefaultExchange exchange,
+ final ExchangeImpl exchange,
final Map<String, Object> arguments)
{
this(id, convertToAttributes(bindingKey, arguments), queue, exchange);
@@ -82,7 +81,7 @@ public class BindingImpl
return attributes;
}
- public BindingImpl(UUID id, Map<String, Object> attributes, AMQQueue queue, NonDefaultExchange exchange)
+ public BindingImpl(UUID id, Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange)
{
super(id,Collections.EMPTY_MAP,attributes,queue.getVirtualHost().getTaskExecutor());
_id = id;
@@ -120,7 +119,7 @@ public class BindingImpl
}
@Override
- public NonDefaultExchange getExchange()
+ public ExchangeImpl getExchange()
{
return _exchange;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 966ac50b85..2a688f497a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -24,7 +24,6 @@ import java.security.AccessControlException;
import java.util.ArrayList;
import org.apache.log4j.Logger;
import org.apache.qpid.server.binding.BindingImpl;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
@@ -33,7 +32,6 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Publisher;
@@ -68,13 +66,13 @@ import java.util.concurrent.atomic.AtomicLong;
public abstract class AbstractExchange<T extends AbstractExchange<T>>
extends AbstractConfiguredObject<T>
- implements NonDefaultExchange<T>
+ implements ExchangeImpl<T>
{
private static final Logger _logger = Logger.getLogger(AbstractExchange.class);
private final LifetimePolicy _lifetimePolicy;
private final AtomicBoolean _closed = new AtomicBoolean();
- private NonDefaultExchange _alternateExchange;
+ private ExchangeImpl _alternateExchange;
private boolean _durable;
@@ -329,12 +327,12 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
return !_bindings.isEmpty();
}
- public NonDefaultExchange getAlternateExchange()
+ public ExchangeImpl getAlternateExchange()
{
return _alternateExchange;
}
- public void setAlternateExchange(NonDefaultExchange exchange)
+ public void setAlternateExchange(ExchangeImpl exchange)
{
if(_alternateExchange != null)
{
@@ -836,13 +834,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
@Override
- public void setAlternateExchange(final ExchangeImpl exchange)
- {
- // todo
- _alternateExchange = (NonDefaultExchange) exchange;
- }
-
- @Override
public org.apache.qpid.server.model.Binding createBinding(final String bindingKey,
final Queue queue,
final Map<String, Object> bindingArguments,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
new file mode 100644
index 0000000000..f59049d276
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class DefaultDestination implements MessageDestination
+{
+
+ private VirtualHost _virtualHost;
+ private static final Logger _logger = Logger.getLogger(DefaultDestination.class);
+
+ public DefaultDestination(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ @Override
+ public String getName()
+ {
+ return ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+ }
+
+
+ public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<? super MessageInstance> postEnqueueAction)
+ {
+ final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
+ if(q == null)
+ {
+ return 0;
+ }
+ else
+ {
+ txn.enqueue(q,message, new ServerTransaction.Action()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
+ {
+ try
+ {
+ q.enqueue(message, postEnqueueAction);
+ }
+ finally
+ {
+ _reference.release();
+ }
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return 1;
+ }
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
deleted file mode 100644
index 0d435f43bd..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.exchange;
-
-import java.security.AccessControlException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.server.binding.BindingImpl;
-import org.apache.qpid.server.consumer.Consumer;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.StateChangeListener;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-public class DefaultExchange implements ExchangeImpl<DirectExchange>
-{
-
- private final QueueRegistry _queueRegistry;
- private UUID _id;
- private VirtualHost _virtualHost;
- private static final Logger _logger = Logger.getLogger(DefaultExchange.class);
-
- private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>();
-
- public DefaultExchange(VirtualHost virtualHost, QueueRegistry queueRegistry, UUID id)
- {
- _virtualHost = virtualHost;
- _queueRegistry = queueRegistry;
- _id = id;
- }
-
- @Override
- public String getName()
- {
- return ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
- }
-
- @Override
- public ExchangeType<DirectExchange> getExchangeType()
- {
- return DirectExchange.TYPE;
- }
-
-
- @Override
- public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
- {
- throw new AccessControlException("Cannot add bindings to the default exchange");
- }
-
- @Override
- public boolean deleteBinding(final String bindingKey, final AMQQueue queue)
- {
- throw new AccessControlException("Cannot delete bindings from the default exchange");
- }
-
- @Override
- public boolean hasBinding(final String bindingKey, final AMQQueue queue)
- {
- return false;
- }
-
- @Override
- public boolean replaceBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
- {
- throw new AccessControlException("Cannot replace bindings on the default exchange");
- }
-
- @Override
- public void restoreBinding(UUID id, String bindingKey, AMQQueue queue, Map<String, Object> argumentMap)
- {
- _logger.warn("Bindings to the default exchange should not be stored in the configuration store");
- }
-
- @Override
- public String getTypeName()
- {
- return getExchangeType().getType();
- }
-
- @Override
- public boolean isDurable()
- {
- return false;
- }
-
- @Override
- public boolean isAutoDelete()
- {
- return false;
- }
-
- @Override
- public void close()
- {
- throw new AccessControlException("Cannot close the default exchange");
- }
-
- @Override
- public boolean isBound(AMQQueue queue)
- {
- return _virtualHost.getQueue(queue.getName()) == queue;
- }
-
- @Override
- public boolean hasBindings()
- {
- return !_virtualHost.getQueues().isEmpty();
- }
-
- @Override
- public boolean isBound(String bindingKey, AMQQueue queue)
- {
- return isBound(queue) && queue.getName().equals(bindingKey);
- }
-
- @Override
- public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
- {
- return isBound(bindingKey, queue) && (arguments == null || arguments.isEmpty());
- }
-
- @Override
- public boolean isBound(Map<String, Object> arguments, AMQQueue queue)
- {
- return (arguments == null || arguments.isEmpty()) && isBound(queue);
- }
-
- @Override
- public boolean isBound(String bindingKey, Map<String, Object> arguments)
- {
- return (arguments == null || arguments.isEmpty()) && isBound(bindingKey);
- }
-
- @Override
- public boolean isBound(Map<String, Object> arguments)
- {
- return (arguments == null || arguments.isEmpty()) && hasBindings();
- }
-
- @Override
- public boolean isBound(String bindingKey)
- {
- return _virtualHost.getQueue(bindingKey) != null;
- }
-
- @Override
- public ExchangeImpl getAlternateExchange()
- {
- return null;
- }
-
- @Override
- public void setAlternateExchange(ExchangeImpl exchange)
- {
- _logger.warn("Cannot set the alternate exchange for the default exchange");
- }
-
- @Override
- public void removeReference(ExchangeReferrer exchange)
- {
- _referrers.remove(exchange);
- }
-
- @Override
- public void addReference(ExchangeReferrer exchange)
- {
- _referrers.put(exchange, Boolean.TRUE);
- }
-
- @Override
- public boolean hasReferrers()
- {
- return !_referrers.isEmpty();
- }
-
- @Override
- public void addBindingListener(BindingListener listener)
- {
-
- }
-
- @Override
- public void removeBindingListener(BindingListener listener)
- {
- // TODO
- }
-
- @Override
- public UUID getId()
- {
- return _id;
- }
-
- public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
- final InstanceProperties instanceProperties,
- final ServerTransaction txn,
- final Action<? super MessageInstance> postEnqueueAction)
- {
- final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
- if(q == null)
- {
- return 0;
- }
- else
- {
- txn.enqueue(q,message, new ServerTransaction.Action()
- {
- MessageReference _reference = message.newReference();
-
- public void postCommit()
- {
- try
- {
- q.enqueue(message, postEnqueueAction);
- }
- finally
- {
- _reference.release();
- }
- }
-
- public void onRollback()
- {
- _reference.release();
- }
- });
- return 1;
- }
- }
-
- private static final StateChangeListener<BindingImpl, State> STATE_CHANGE_LISTENER =
- new StateChangeListener<BindingImpl, State>()
- {
- @Override
- public void stateChanged(final BindingImpl object, final State oldState, final State newState)
- {
- if(newState == State.DELETED)
- {
- throw new AccessControlException("Cannot remove bindings to the default exchange");
- }
- }
- };
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index e9d0740539..99d6487af8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -96,7 +96,7 @@ public class DefaultExchangeFactory implements ExchangeFactory
}
@Override
- public NonDefaultExchange createExchange(final Map<String, Object> attributes)
+ public ExchangeImpl createExchange(final Map<String, Object> attributes)
throws AMQUnknownExchangeType, UnknownExchangeException
{
String type = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributes);
@@ -109,7 +109,7 @@ public class DefaultExchangeFactory implements ExchangeFactory
}
@Override
- public NonDefaultExchange restoreExchange(Map<String,Object> attributes)
+ public ExchangeImpl restoreExchange(Map<String,Object> attributes)
throws AMQUnknownExchangeType, UnknownExchangeException
{
return createExchange(attributes);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index e48a640fcb..70eecbb164 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -46,9 +47,9 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
/**
* Maps from exchange name to exchange instance
*/
- private ConcurrentMap<String, ExchangeImpl> _exchangeMap = new ConcurrentHashMap<String, ExchangeImpl>();
+ private ConcurrentMap<String, ExchangeImpl<?>> _exchangeMap = new ConcurrentHashMap<String, ExchangeImpl<?>>();
- private ExchangeImpl _defaultExchange;
+ private MessageDestination _defaultExchange;
private final VirtualHost _host;
private final QueueRegistry _queueRegistry;
@@ -68,9 +69,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
initialiseExchanges(exchangeFactory, getDurableConfigurationStore());
_defaultExchange =
- new DefaultExchange(_host, _queueRegistry,
- UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME,
- _host.getName()));
+ new DefaultDestination(_host
+ );
}
@@ -96,7 +96,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
attributes.put(org.apache.qpid.server.model.Exchange.NAME, name);
attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type);
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
- ExchangeImpl exchange = f.createExchange(attributes);
+ ExchangeImpl<?> exchange = f.createExchange(attributes);
registerExchange(exchange);
if(exchange.isDurable())
{
@@ -135,7 +135,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
}
}
- public ExchangeImpl getDefaultExchange()
+ public MessageDestination getDefaultExchange()
{
return _defaultExchange;
}
@@ -170,17 +170,9 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
}
- public Collection<ExchangeImpl> getExchanges()
+ public Collection<ExchangeImpl<?>> getExchanges()
{
- return new ArrayList<ExchangeImpl>(_exchangeMap.values());
- }
-
- @Override
- public Collection<NonDefaultExchange> getExchangesExceptDefault()
- {
- Collection allExchanges = getExchanges();
- allExchanges.remove(_defaultExchange);
- return allExchanges;
+ return new ArrayList<ExchangeImpl<?>>(_exchangeMap.values());
}
public void addRegistryChangeListener(RegistryChangeListener listener)
@@ -188,22 +180,15 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
_listeners.add(listener);
}
- public ExchangeImpl getExchange(String name)
+ public ExchangeImpl<?> getExchange(String name)
{
- if ((name == null) || name.length() == 0)
- {
- return getDefaultExchange();
- }
- else
- {
- return _exchangeMap.get(name);
- }
+ return _exchangeMap.get(name);
}
@Override
public void clearAndUnregisterMbeans()
{
- for (final ExchangeImpl exchange : getExchanges())
+ for (final ExchangeImpl<?> exchange : getExchanges())
{
//TODO: this is a bit of a hack, what if the listeners aren't aware
//that we are just unregistering the MBean because of HA, and aren't
@@ -220,24 +205,18 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
}
@Override
- public synchronized ExchangeImpl getExchange(UUID exchangeId)
+ public synchronized ExchangeImpl<?> getExchange(UUID exchangeId)
{
- if (exchangeId == null)
- {
- return getDefaultExchange();
- }
- else
+ Collection<ExchangeImpl<?>> exchanges = _exchangeMap.values();
+ for (ExchangeImpl<?> exchange : exchanges)
{
- Collection<ExchangeImpl> exchanges = _exchangeMap.values();
- for (ExchangeImpl exchange : exchanges)
+ if (exchange.getId().equals(exchangeId))
{
- if (exchange.getId().equals(exchangeId))
- {
- return exchange;
- }
+ return exchange;
}
- return null;
}
+ return null;
+
}
public boolean isReservedExchangeName(String name)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
index 0c7cece752..2731f665ac 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
@@ -32,8 +32,8 @@ public interface ExchangeFactory
Collection<ExchangeType<? extends ExchangeImpl>> getRegisteredTypes();
- NonDefaultExchange createExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException;
+ ExchangeImpl createExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException;
- NonDefaultExchange restoreExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException;
+ ExchangeImpl restoreExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
index f2f5fde603..35d28d3384 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
@@ -22,13 +22,14 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import java.util.Map;
import java.util.UUID;
-public interface ExchangeImpl<T extends NonDefaultExchange> extends ExchangeReferrer, MessageDestination
+public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, ExchangeReferrer, MessageDestination
{
UUID getId();
@@ -46,7 +47,7 @@ public interface ExchangeImpl<T extends NonDefaultExchange> extends ExchangeRefe
*/
boolean isAutoDelete();
- <X extends NonDefaultExchange<X>> ExchangeImpl<X> getAlternateExchange();
+ ExchangeImpl getAlternateExchange();
void setAlternateExchange(ExchangeImpl exchange);
@@ -110,6 +111,7 @@ public interface ExchangeImpl<T extends NonDefaultExchange> extends ExchangeRefe
boolean hasReferrers();
+ BindingImpl getBinding(String bindingName, AMQQueue queue);
public interface BindingListener
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index b4f14b00c8..7fa7a64a62 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -20,19 +20,21 @@
*/
package org.apache.qpid.server.exchange;
+import org.apache.qpid.server.message.MessageDestination;
+
import java.util.Collection;
import java.util.UUID;
public interface ExchangeRegistry
{
- void registerExchange(ExchangeImpl exchange);
+ void registerExchange(ExchangeImpl<?> exchange);
- ExchangeImpl getDefaultExchange();
+ MessageDestination getDefaultExchange();
void initialise(ExchangeFactory exchangeFactory);
- ExchangeImpl getExchange(String exchangeName);
+ ExchangeImpl<?> getExchange(String exchangeName);
/**
* Unregister an exchange
@@ -43,11 +45,9 @@ public interface ExchangeRegistry
void clearAndUnregisterMbeans();
- ExchangeImpl getExchange(UUID exchangeId);
-
- Collection<ExchangeImpl> getExchanges();
+ ExchangeImpl<?> getExchange(UUID exchangeId);
- Collection<NonDefaultExchange> getExchangesExceptDefault();
+ Collection<ExchangeImpl<?>> getExchanges();
void addRegistryChangeListener(RegistryChangeListener listener);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/NonDefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/NonDefaultExchange.java
index 6e5eb98078..419dbdb9b4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/NonDefaultExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/NonDefaultExchange.java
@@ -26,7 +26,4 @@ import org.apache.qpid.server.queue.AMQQueue;
public interface NonDefaultExchange<T extends NonDefaultExchange<T>> extends Exchange<T>, ExchangeImpl<T>
{
- NonDefaultExchange getAlternateExchange();
-
- BindingImpl getBinding(String bindingName, AMQQueue queue);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index 0c14fb38c7..b1646e87c8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -36,7 +36,7 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>
// Attributes
@ManagedAttribute
- <T extends Exchange<T>> Exchange<T> getAlternateExchange();
+ Exchange<?> getAlternateExchange();
//children
Collection<? extends Binding> getBindings();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 97b6744f1b..72b316c784 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -41,7 +41,6 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
@@ -49,7 +48,6 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.queue.ConflationQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
@@ -192,7 +190,7 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo
public Collection<Exchange> getExchanges()
{
- return _virtualHost == null ? Collections.<Exchange>emptyList() : new ArrayList<Exchange>(_virtualHost.getExchangesExceptDefault());
+ return _virtualHost == null ? Collections.<Exchange>emptyList() : new ArrayList<Exchange>(_virtualHost.getExchanges());
}
@@ -290,7 +288,7 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo
lifetime != null && lifetime != LifetimePolicy.PERMANENT
? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
attributes1.put(Exchange.ALTERNATE_EXCHANGE, alternateExchange);
- NonDefaultExchange exchange = _virtualHost.createExchange(attributes1);
+ ExchangeImpl exchange = _virtualHost.createExchange(attributes1);
return exchange;
}
@@ -503,13 +501,13 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo
public void exchangeRegistered(ExchangeImpl exchange)
{
- childAdded((NonDefaultExchange)exchange);
+ childAdded(exchange);
}
public void exchangeUnregistered(ExchangeImpl exchange)
{
- childRemoved((NonDefaultExchange)exchange);
+ childRemoved(exchange);
}
public void queueRegistered(AMQQueue queue)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
index 3017aded98..87d10f745b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
@@ -23,11 +23,10 @@ package org.apache.qpid.server.plugin;
import java.util.Map;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public interface ExchangeType<T extends NonDefaultExchange> extends Pluggable
+public interface ExchangeType<T extends ExchangeImpl<T>> extends Pluggable
{
public String getType();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 9216061169..0db7d78576 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.exchange.ExchangeReferrer;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
@@ -32,11 +31,9 @@ import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.protocol.CapacityChecker;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import javax.management.NotificationListener;
import java.util.Collection;
import java.util.List;
import java.util.Set;
@@ -174,9 +171,9 @@ public interface AMQQueue<X extends AMQQueue<X>>
void stop();
- NonDefaultExchange getAlternateExchange();
+ ExchangeImpl getAlternateExchange();
- void setAlternateExchange(NonDefaultExchange exchange);
+ void setAlternateExchange(ExchangeImpl exchange);
Collection<String> getAvailableAttributes();
Object getAttribute(String attrName);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 34895b61e8..dd82dfd681 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -26,7 +26,6 @@ import java.util.UUID;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -154,14 +153,14 @@ public class AMQQueueFactory implements QueueFactory
{
final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE);
- NonDefaultExchange altExchange;
+ ExchangeImpl altExchange;
try
{
- altExchange = (NonDefaultExchange) _virtualHost.getExchange(UUID.fromString(altExchangeAttr));
+ altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr));
}
catch(IllegalArgumentException e)
{
- altExchange = (NonDefaultExchange) _virtualHost.getExchange(altExchangeAttr);
+ altExchange = _virtualHost.getExchange(altExchangeAttr);
}
queue.setAlternateExchange(altExchange);
}
@@ -183,7 +182,7 @@ public class AMQQueueFactory implements QueueFactory
final String dlExchangeName = getDeadLetterExchangeName(queueName);
final String dlQueueName = getDeadLetterQueueName(queueName);
- NonDefaultExchange dlExchange = null;
+ ExchangeImpl dlExchange = null;
final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName());
try
@@ -202,7 +201,7 @@ public class AMQQueueFactory implements QueueFactory
catch(ExchangeExistsException e)
{
// We're ok if the exchange already exists
- dlExchange = (NonDefaultExchange) e.getExistingExchange();
+ dlExchange = e.getExistingExchange();
}
catch (ReservedExchangeNameException e)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index f7362e18c8..62634970a6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.Queue;
@@ -106,7 +106,7 @@ public abstract class AbstractQueue
private final boolean _durable;
- private NonDefaultExchange _alternateExchange;
+ private ExchangeImpl _alternateExchange;
private final QueueEntryList _entries;
@@ -516,12 +516,12 @@ public abstract class AbstractQueue
return _exclusivityPolicy != ExclusivityPolicy.NONE;
}
- public NonDefaultExchange getAlternateExchange()
+ public ExchangeImpl getAlternateExchange()
{
return _alternateExchange;
}
- public void setAlternateExchange(NonDefaultExchange exchange)
+ public void setAlternateExchange(ExchangeImpl exchange)
{
if(_alternateExchange != null)
{
@@ -2853,7 +2853,7 @@ public abstract class AbstractQueue
if(childClass == Binding.class && otherParents.length == 1 && otherParents[0] instanceof Exchange)
{
final String bindingKey = (String) attributes.get("name");
- ((NonDefaultExchange)otherParents[0]).addBinding(bindingKey, this, attributes);
+ ((ExchangeImpl)otherParents[0]).addBinding(bindingKey, this, attributes);
for(Binding binding : _bindings)
{
if(binding.getExchange() == otherParents[0] && binding.getName().equals(bindingKey))
@@ -2899,7 +2899,7 @@ public abstract class AbstractQueue
else if(ALTERNATE_EXCHANGE.equals(name))
{
// In future we may want to accept a UUID as an alternative way to identifying the exchange
- NonDefaultExchange alternateExchange = (NonDefaultExchange) desired;
+ ExchangeImpl alternateExchange = (ExchangeImpl) desired;
setAlternateExchange(alternateExchange);
return true;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 1f52638279..739dd1cc5b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -37,7 +37,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
@@ -55,7 +54,6 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
@@ -380,44 +378,44 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue);
}
- //get the exchange name (returns default exchange name if none was specified)
+ //get the exchange name (returns empty String if none was specified)
String exchangeName = queueConfiguration.getExchange();
- ExchangeImpl exchange = _exchangeRegistry.getExchange(exchangeName);
- if (exchange == null)
+
+ if("".equals(exchangeName))
{
- throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName);
+ //get routing keys in configuration (returns empty list if none are defined)
+ List<?> routingKeys = queueConfiguration.getRoutingKeys();
+ if(!(routingKeys.isEmpty() || (routingKeys.size()==1 && routingKeys.contains(queueName))))
+ {
+ throw new ConfigurationException("Attempt to bind queue '" + queueName + "' with binding key(s) " +
+ routingKeys + " without specifying an exchange");
+ }
}
-
- ExchangeImpl defaultExchange = _exchangeRegistry.getDefaultExchange();
-
- //get routing keys in configuration (returns empty list if none are defined)
- List<?> routingKeys = queueConfiguration.getRoutingKeys();
-
- for (Object routingKeyNameObj : routingKeys)
+ else
{
- String routingKey = String.valueOf(routingKeyNameObj);
-
- if (exchange.equals(defaultExchange))
+ ExchangeImpl exchange = _exchangeRegistry.getExchange(exchangeName);
+ if (exchange == null)
{
- if(!queueName.equals(routingKey))
- {
- throw new ConfigurationException("Illegal attempt to bind queue '" + queueName +
- "' to the default exchange with a key other than the queue name: " + routingKey);
- }
+ throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName);
}
- else
+
+ //get routing keys in configuration (returns empty list if none are defined)
+ List<?> routingKeys = queueConfiguration.getRoutingKeys();
+
+ for (Object routingKeyNameObj : routingKeys)
{
+ String routingKey = String.valueOf(routingKeyNameObj);
+
configureBinding(queue, exchange, routingKey, (Map) queueConfiguration.getBindingArguments(routingKey));
}
- }
- if (!exchange.equals(defaultExchange) && !routingKeys.contains(queueName))
- {
- //bind the queue to the named exchange using its name
- configureBinding(queue, exchange, queueName, null);
+ if (!routingKeys.contains(queueName))
+ {
+ //bind the queue to the named exchange using its name
+ configureBinding(queue, exchange, queueName, null);
+ }
}
-
}
private void configureBinding(AMQQueue queue, ExchangeImpl exchange, String routingKey, Map<String,Object> arguments)
@@ -605,31 +603,25 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
@Override
- public ExchangeImpl getDefaultExchange()
+ public MessageDestination getDefaultDestination()
{
return _exchangeRegistry.getDefaultExchange();
}
@Override
- public Collection<ExchangeImpl> getExchanges()
+ public Collection<ExchangeImpl<?>> getExchanges()
{
return Collections.unmodifiableCollection(_exchangeRegistry.getExchanges());
}
@Override
- public Collection<NonDefaultExchange> getExchangesExceptDefault()
- {
- return Collections.unmodifiableCollection(_exchangeRegistry.getExchangesExceptDefault());
- }
-
- @Override
public Collection<ExchangeType<? extends ExchangeImpl>> getExchangeTypes()
{
return _exchangeFactory.getRegisteredTypes();
}
@Override
- public NonDefaultExchange createExchange(Map<String,Object> attributes)
+ public ExchangeImpl createExchange(Map<String,Object> attributes)
throws ExchangeExistsException, ReservedExchangeNameException,
UnknownExchangeException, AMQUnknownExchangeType
{
@@ -658,7 +650,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
UUIDGenerator.generateExchangeUUID(name, getName()));
}
- NonDefaultExchange exchange = _exchangeFactory.createExchange(attributes);
+ ExchangeImpl exchange = _exchangeFactory.createExchange(attributes);
_exchangeRegistry.registerExchange(exchange);
if(durable)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
index a682079de1..e6577e04e4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
@@ -28,7 +28,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
@@ -114,7 +113,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
_exchange.restoreBinding(_bindingId, _bindingName, _queue, _bindingArgumentsMap);
}
- return ((NonDefaultExchange)_exchange).getBinding(_bindingName, _queue);
+ return (_exchange).getBinding(_bindingName, _queue);
}
private class QueueDependency implements UnresolvedDependency<AMQQueue>
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
index bc65ece87e..2743b0ef59 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
@@ -59,7 +59,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<
private class UnresolvedExchange implements UnresolvedObject<ExchangeImpl>
{
- private ExchangeImpl _exchange;
+ private ExchangeImpl<?> _exchange;
public UnresolvedExchange(final UUID id,
final Map<String, Object> attributeMap)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index fd30119d7b..63b76f36a6 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -31,7 +31,6 @@ import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -62,7 +61,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
AMQQueue createQueue(Map<String, Object> arguments) throws QueueExistsException;
- NonDefaultExchange createExchange(Map<String,Object> attributes)
+ ExchangeImpl createExchange(Map<String,Object> attributes)
throws ExchangeExistsException, ReservedExchangeNameException,
UnknownExchangeException, AMQUnknownExchangeType;
@@ -75,9 +74,9 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
ExchangeImpl getExchange(UUID id);
- ExchangeImpl getDefaultExchange();
+ MessageDestination getDefaultDestination();
- Collection<ExchangeImpl> getExchanges();
+ Collection<ExchangeImpl<?>> getExchanges();
Collection<ExchangeType<? extends ExchangeImpl>> getExchangeTypes();
@@ -137,7 +136,5 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
TaskExecutor getTaskExecutor();
- Collection<NonDefaultExchange> getExchangesExceptDefault();
-
org.apache.qpid.server.model.VirtualHost getModel();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
index 5ec9a7762d..b5fe5e0072 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
@@ -168,26 +168,7 @@ public class DefaultExchangeFactoryTest extends QpidTestCase
public void testCreateDefaultExchangeFactoryWithCustomExchangeType()
{
- ExchangeType<?> customExchangeType = new ExchangeType<NonDefaultExchange>()
- {
- @Override
- public String getType()
- {
- return "my-custom-exchange";
- }
-
- @Override
- public NonDefaultExchange newInstance(VirtualHost host, Map<String,Object> attributes)
- {
- return null;
- }
-
- @Override
- public String getDefaultExchangeName()
- {
- return null;
- }
- };
+ ExchangeType<?> customExchangeType = new CustomExchangeType();
_stubbedExchangeTypes.add(customExchangeType);
_stubbedExchangeTypes.add(_directExchangeType);
@@ -206,6 +187,31 @@ public class DefaultExchangeFactoryTest extends QpidTestCase
assertTrue("Custom exchange type is not found", registeredTypes.contains(customExchangeType));
}
+ public static abstract class CustomExchange implements ExchangeImpl<CustomExchange>
+ {
+ }
+
+ private static class CustomExchangeType implements ExchangeType<CustomExchange>
+ {
+ @Override
+ public String getType()
+ {
+ return "my-custom-exchange";
+ }
+
+ @Override
+ public CustomExchange newInstance(VirtualHost host, Map<String,Object> attributes)
+ {
+ return null;
+ }
+
+ @Override
+ public String getDefaultExchangeName()
+ {
+ return null;
+ }
+ }
+
private final class TestExchangeFactory extends DefaultExchangeFactory
{
private TestExchangeFactory()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 453a1b8e7d..8c9132d166 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -139,7 +139,7 @@ public class HeadersBindingTest extends TestCase
private MockHeader matchHeaders = new MockHeader();
private int _count = 0;
private AMQQueue _queue;
- private NonDefaultExchange _exchange;
+ private ExchangeImpl _exchange;
protected void setUp()
{
@@ -149,7 +149,7 @@ public class HeadersBindingTest extends TestCase
when(_queue.getVirtualHost()).thenReturn(vhost);
when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
CurrentActor.set(mock(LogActor.class));
- _exchange = mock(NonDefaultExchange.class);
+ _exchange = mock(ExchangeImpl.class);
when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index d32f8bcf49..64bcf8f730 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -37,7 +37,6 @@ import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -150,7 +149,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
final String name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributeValues);
final UUID id = MapValueConverter.getUUIDAttribute(org.apache.qpid.server.model.Exchange.ID, attributeValues);
- final NonDefaultExchange exchange = mock(NonDefaultExchange.class);
+ final ExchangeImpl exchange = mock(ExchangeImpl.class);
ExchangeType exType = mock(ExchangeType.class);
when(exchange.getName()).thenReturn(name);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 56b931f37e..613dd07741 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -40,7 +40,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -81,7 +80,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler;
private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler;
- private NonDefaultExchange _exchange = mock(NonDefaultExchange.class);
+ private ExchangeImpl _exchange = mock(ExchangeImpl.class);
private static final String ROUTING_KEY = "routingKey";
private static final String QUEUE_NAME = "queueName";
private Map<String,Object> _bindingArgs;
@@ -258,7 +257,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
{
- NonDefaultExchange alternateExchange = createTestAlternateExchange();
+ ExchangeImpl alternateExchange = createTestAlternateExchange();
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange, null);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
@@ -274,10 +273,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
- private NonDefaultExchange createTestAlternateExchange()
+ private ExchangeImpl createTestAlternateExchange()
{
UUID exchUuid = UUID.randomUUID();
- NonDefaultExchange alternateExchange = mock(NonDefaultExchange.class);
+ ExchangeImpl alternateExchange = mock(ExchangeImpl.class);
when(alternateExchange.getId()).thenReturn(exchUuid);
return alternateExchange;
}
@@ -318,7 +317,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// update the queue to have exclusive=false
- NonDefaultExchange alternateExchange = createTestAlternateExchange();
+ ExchangeImpl alternateExchange = createTestAlternateExchange();
queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange, attributes);
DurableConfigurationStoreHelper.updateQueue(_configStore, queue);
@@ -362,7 +361,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private AMQQueue createTestQueue(String queueName,
String queueOwner,
boolean exclusive,
- NonDefaultExchange alternateExchange,
+ ExchangeImpl alternateExchange,
final Map<String, Object> arguments) throws StoreException
{
AMQQueue<?> queue = mock(AMQQueue.class);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 90ba03a789..1eaccc4e5f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -28,7 +28,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.exchange.DirectExchange;
@@ -70,8 +70,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
private static final String CUSTOM_EXCHANGE_NAME = "customExchange";
private DurableConfigurationRecoverer _durableConfigurationRecoverer;
- private NonDefaultExchange _directExchange;
- private NonDefaultExchange _topicExchange;
+ private ExchangeImpl _directExchange;
+ private ExchangeImpl _topicExchange;
private VirtualHost _vhost;
private DurableConfigurationStore _store;
private ExchangeFactory _exchangeFactory;
@@ -84,11 +84,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
super.setUp();
- _directExchange = mock(NonDefaultExchange.class);
+ _directExchange = mock(ExchangeImpl.class);
when(_directExchange.getExchangeType()).thenReturn(DirectExchange.TYPE);
- _topicExchange = mock(NonDefaultExchange.class);
+ _topicExchange = mock(ExchangeImpl.class);
when(_topicExchange.getExchangeType()).thenReturn(TopicExchange.TYPE);
AMQQueue queue = mock(AMQQueue.class);
@@ -101,14 +101,14 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue);
- final ArgumentCaptor<NonDefaultExchange> registeredExchange = ArgumentCaptor.forClass(NonDefaultExchange.class);
+ final ArgumentCaptor<ExchangeImpl> registeredExchange = ArgumentCaptor.forClass(ExchangeImpl.class);
doAnswer(new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
- NonDefaultExchange exchange = registeredExchange.getValue();
+ ExchangeImpl exchange = registeredExchange.getValue();
when(_exchangeRegistry.getExchange(eq(exchange.getId()))).thenReturn(exchange);
when(_exchangeRegistry.getExchange(eq(exchange.getName()))).thenReturn(exchange);
return null;
@@ -139,14 +139,14 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
when(_vhost.getQueue(eq(queueName))).thenReturn(queue);
when(_vhost.getQueue(eq(queueId))).thenReturn(queue);
- final ArgumentCaptor<NonDefaultExchange> altExchangeArg = ArgumentCaptor.forClass(NonDefaultExchange.class);
+ final ArgumentCaptor<ExchangeImpl> altExchangeArg = ArgumentCaptor.forClass(ExchangeImpl.class);
doAnswer(
new Answer()
{
@Override
public Object answer(InvocationOnMock invocation) throws Throwable
{
- final NonDefaultExchange value = altExchangeArg.getValue();
+ final ExchangeImpl value = altExchangeArg.getValue();
when(queue.getAlternateExchange()).thenReturn(value);
return null;
}
@@ -157,8 +157,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
if (args.containsKey(Queue.ALTERNATE_EXCHANGE))
{
final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString());
- final NonDefaultExchange exchange =
- (NonDefaultExchange) _exchangeRegistry.getExchange(exchangeId);
+ final ExchangeImpl exchange =
+ (ExchangeImpl) _exchangeRegistry.getExchange(exchangeId);
queue.setAlternateExchange(exchange);
}
return queue;
@@ -267,13 +267,13 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
"org.apache.qpid.server.model.Exchange",
createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE));
- final NonDefaultExchange customExchange = mock(NonDefaultExchange.class);
+ final ExchangeImpl customExchange = mock(ExchangeImpl.class);
final ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class);
- when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<NonDefaultExchange>()
+ when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<ExchangeImpl>()
{
@Override
- public NonDefaultExchange answer(final InvocationOnMock invocation) throws Throwable
+ public ExchangeImpl answer(final InvocationOnMock invocation) throws Throwable
{
Map arguments = attributesCaptor.getValue();
if(CUSTOM_EXCHANGE_NAME.equals(arguments.get(org.apache.qpid.server.model.Exchange.NAME))
@@ -397,17 +397,17 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
final UUID queueId = new UUID(1, 0);
final UUID exchangeId = new UUID(2, 0);
- final NonDefaultExchange customExchange = mock(NonDefaultExchange.class);
+ final ExchangeImpl customExchange = mock(ExchangeImpl.class);
when(customExchange.getId()).thenReturn(exchangeId);
when(customExchange.getName()).thenReturn(CUSTOM_EXCHANGE_NAME);
final ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class);
- when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<NonDefaultExchange>()
+ when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<ExchangeImpl>()
{
@Override
- public NonDefaultExchange answer(final InvocationOnMock invocation) throws Throwable
+ public ExchangeImpl answer(final InvocationOnMock invocation) throws Throwable
{
Map arguments = attributesCaptor.getValue();
if(CUSTOM_EXCHANGE_NAME.equals(arguments.get(org.apache.qpid.server.model.Exchange.NAME))
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index baa62b8ea2..90829ca271 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -27,7 +27,6 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.NonDefaultExchange;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -160,7 +159,7 @@ public class MockVirtualHost implements VirtualHost
}
@Override
- public NonDefaultExchange createExchange(Map<String,Object> attributes)
+ public ExchangeImpl createExchange(Map<String,Object> attributes)
{
return null;
}
@@ -189,13 +188,13 @@ public class MockVirtualHost implements VirtualHost
}
@Override
- public ExchangeImpl getDefaultExchange()
+ public ExchangeImpl getDefaultDestination()
{
return null;
}
@Override
- public Collection<ExchangeImpl> getExchanges()
+ public Collection<ExchangeImpl<?>> getExchanges()
{
return null;
}
@@ -360,12 +359,6 @@ public class MockVirtualHost implements VirtualHost
}
@Override
- public Collection<NonDefaultExchange> getExchangesExceptDefault()
- {
- return null;
- }
-
- @Override
public org.apache.qpid.server.model.VirtualHost getModel()
{
return null;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
index 61569cc3e7..f081268337 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
@@ -117,7 +117,7 @@ public class StandardVirtualHostTest extends QpidTestCase
{
Throwable cause = e.getCause();
assertNotNull(cause);
- assertEquals("Illegal attempt to bind queue '" + queueName + "' to the default exchange with a key other than the queue name: " + customBinding, cause.getMessage());
+ assertEquals("Attempt to bind queue '" + queueName + "' with binding key(s) [" + customBinding + "] without specifying an exchange", cause.getMessage());
}
}
@@ -253,9 +253,6 @@ public class StandardVirtualHostTest extends QpidTestCase
AMQQueue queue = vhost.getQueue(queueName);
assertNotNull("queue should exist", queue);
- ExchangeImpl defaultExch = vhost.getDefaultExchange();
- assertTrue("queue should have been bound to default exchange with its name", defaultExch.isBound(queueName, queue));
-
ExchangeImpl exch = vhost.getExchange(exchangeName);
assertTrue("queue should have been bound to " + exchangeName + " with its name", exch.isBound(queueName, queue));
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 1fb82efd2d..9d7764414f 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -683,83 +684,101 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
}
-
- if(method.getPassive())
+ if(method.getExchange() == null || method.getExchange().equals(""))
{
- ExchangeImpl exchange = getExchange(session, exchangeName);
-
- if(exchange == null)
+ if(!DirectExchange.TYPE.getType().equals(method.getType()))
{
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to redeclare default exchange "
+ + " of type " + DirectExchange.TYPE.getType()
+ + " to " + method.getType() +".");
}
- else
+ if(method.hasAlternateExchange() && !"".equals(method.getAlternateExchange()))
{
- if (!exchange.getTypeName().equals(method.getType())
- && (method.getType() != null && method.getType().length() > 0))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
- + exchangeName + " of type " + exchange.getTypeName() + " to " + method.getType() + ".");
- }
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to set alternate exchange of the default exchange "
+ + " to " + method.getAlternateExchange() +".");
}
}
else
{
-
- try
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
-
- attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
- attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange());
- attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType());
- attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable());
- attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
- method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange());
- virtualHost.createExchange(attributes);
- }
- catch(ReservedExchangeNameException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
- + exchangeName + " which begins with reserved name or prefix.");
- }
- catch(UnknownExchangeException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND,
- "Unknown alternate exchange " + e.getExchangeName());
- }
- catch(AMQUnknownExchangeType e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
- }
- catch(ExchangeExistsException e)
+ if(method.getPassive())
{
- ExchangeImpl exchange = e.getExistingExchange();
- if(!exchange.getTypeName().equals(method.getType()))
+
+ ExchangeImpl exchange = getExchange(session, exchangeName);
+
+ if(exchange == null)
{
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to redeclare exchange: " + exchangeName
- + " of type " + exchange.getTypeName()
- + " to " + method.getType() +".");
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
}
- else if(method.hasAlternateExchange()
- && (exchange.getAlternateExchange() == null ||
- !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
+ else
{
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to change alternate exchange of: " + exchangeName
- + " from " + exchange.getAlternateExchange()
- + " to " + method.getAlternateExchange() +".");
+ if (!exchange.getTypeName().equals(method.getType())
+ && (method.getType() != null && method.getType().length() > 0))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type " + exchange.getTypeName() + " to " + method.getType() + ".");
+ }
}
}
- catch (AccessControlException e)
+ else
{
- exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
- }
+
+ try
+ {
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange());
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType());
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable());
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange());
+ virtualHost.createExchange(attributes);
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+ + exchangeName + " which begins with reserved name or prefix.");
+ }
+ catch(UnknownExchangeException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND,
+ "Unknown alternate exchange " + e.getExchangeName());
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
+ }
+ catch(ExchangeExistsException e)
+ {
+ ExchangeImpl exchange = e.getExistingExchange();
+ if(!exchange.getTypeName().equals(method.getType()))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to redeclare exchange: " + exchangeName
+ + " of type " + exchange.getTypeName()
+ + " to " + method.getType() +".");
+ }
+ else if(method.hasAlternateExchange()
+ && (exchange.getAlternateExchange() == null ||
+ !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to change alternate exchange of: " + exchangeName
+ + " from " + exchange.getAlternateExchange()
+ + " to " + method.getAlternateExchange() +".");
+ }
+ }
+ catch (AccessControlException e)
+ {
+ exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
+ }
+ }
}
-
}
private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description)
@@ -789,12 +808,12 @@ public class ServerSessionDelegate extends SessionDelegate
destination = virtualHost.getMessageDestination(xfr.getDestination());
if(destination == null)
{
- destination = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultDestination();
}
}
else
{
- destination = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultDestination();
}
return destination;
}
@@ -878,19 +897,30 @@ public class ServerSessionDelegate extends SessionDelegate
ExchangeQueryResult result = new ExchangeQueryResult();
- ExchangeImpl exchange = getExchange(session, method.getName());
- if(exchange != null)
+ final String exchangeName = method.getName();
+
+ if(exchangeName == null || exchangeName.equals(""))
{
- result.setDurable(exchange.isDurable());
- result.setType(exchange.getTypeName());
+ result.setDurable(true);
+ result.setType(DirectExchange.TYPE.getType());
result.setNotFound(false);
}
else
{
- result.setNotFound(true);
- }
+ ExchangeImpl exchange = getExchange(session, exchangeName);
+ if(exchange != null)
+ {
+ result.setDurable(exchange.isDurable());
+ result.setType(exchange.getTypeName());
+ result.setNotFound(false);
+ }
+ else
+ {
+ result.setNotFound(true);
+ }
+ }
session.executionResult((int) method.getId(), result);
}
@@ -904,52 +934,56 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
}
- else if (nameNullOrEmpty(method.getExchange()))
- {
- exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange");
- }
else
{
- //TODO - here because of non-compliant python tests
- // should raise exception ILLEGAL_ARGUMENT "binding-key not set"
- if (!method.hasBindingKey())
- {
- method.setBindingKey(method.getQueue());
- }
- AMQQueue queue = virtualHost.getQueue(method.getQueue());
- ExchangeImpl exchange = virtualHost.getExchange(method.getExchange());
- if(queue == null)
+ final String exchangeName = method.getExchange();
+ if (nameNullOrEmpty(exchangeName))
{
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
- }
- else if(exchange == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
- }
- else if(exchange.getExchangeType().equals(HeadersExchange.TYPE) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
- {
- exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getType() + " require an x-match header");
+ exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange");
}
else
{
- if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue))
+ //TODO - here because of non-compliant python tests
+ // should raise exception ILLEGAL_ARGUMENT "binding-key not set"
+ if (!method.hasBindingKey())
{
- try
+ method.setBindingKey(method.getQueue());
+ }
+ AMQQueue queue = virtualHost.getQueue(method.getQueue());
+ ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
+ if(queue == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
+ }
+ else if(exchange == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + exchangeName + "' not found");
+ }
+ else if(exchange.getExchangeType().equals(HeadersExchange.TYPE) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
+ {
+ exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getType() + " require an x-match header");
+ }
+ else
+ {
+ if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue))
{
- exchange.addBinding(method.getBindingKey(), queue, method.getArguments());
+ try
+ {
+ exchange.addBinding(method.getBindingKey(), queue, method.getArguments());
+ }
+ catch (AccessControlException e)
+ {
+ exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
+ }
}
- catch (AccessControlException e)
+ else
{
- exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
+ // todo
}
}
- else
- {
- // todo
- }
- }
+ }
}
@@ -1010,8 +1044,10 @@ public class ServerSessionDelegate extends SessionDelegate
VirtualHost virtualHost = getVirtualHost(session);
ExchangeImpl exchange;
AMQQueue queue;
- if(method.hasExchange())
+ boolean isDefaultExchange;
+ if(method.hasExchange() && !method.getExchange().equals(""))
{
+ isDefaultExchange = false;
exchange = virtualHost.getExchange(method.getExchange());
if(exchange == null)
@@ -1021,11 +1057,47 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- exchange = virtualHost.getDefaultExchange();
+ isDefaultExchange = true;
+ exchange = null;
}
+ if(isDefaultExchange)
+ {
+ if(method.hasQueue())
+ {
+ queue = getQueue(session, method.getQueue());
- if(method.hasQueue())
+ if(queue == null)
+ {
+ result.setQueueNotFound(true);
+ }
+ else
+ {
+ if(method.hasBindingKey())
+ {
+ if(!method.getBindingKey().equals(method.getQueue()))
+ {
+ result.setKeyNotMatched(true);
+ }
+ }
+ }
+ }
+ else if(method.hasBindingKey())
+ {
+ if(getQueue(session, method.getBindingKey()) == null)
+ {
+ result.setKeyNotMatched(true);
+ }
+ }
+
+ if(method.hasArguments() && !method.getArguments().isEmpty())
+ {
+ result.setArgsNotMatched(true);
+ }
+
+
+ }
+ else if(method.hasQueue())
{
queue = getQueue(session, method.getQueue());
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
index 101a92242f..fc085e8ab1 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v0_8.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -62,16 +61,23 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic
}
AMQShortString exchangeName = body.getExchange();
+ VirtualHost vHost = session.getVirtualHost();
+
// TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
- if (exchangeName == null)
+
+ MessageDestination destination;
+
+ if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName))
{
- exchangeName = AMQShortString.valueOf(ExchangeDefaults.DEFAULT_EXCHANGE_NAME);
+ destination = vHost.getDefaultDestination();
+ }
+ else
+ {
+ destination = vHost.getMessageDestination(exchangeName.toString());
}
- VirtualHost vHost = session.getVirtualHost();
- MessageDestination exch = vHost.getMessageDestination(exchangeName.toString());
// if the exchange does not exist we raise a channel exception
- if (exch == null)
+ if (destination == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name");
}
@@ -91,7 +97,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic
info.setExchange(exchangeName);
try
{
- channel.setPublishFrame(info, exch);
+ channel.setPublishFrame(info, destination);
}
catch (AccessControlException e)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
index 27837844ff..fce1260155 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
@@ -79,107 +79,155 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
channel.sync();
- AMQShortString exchangeName = body.getExchange() == null ? AMQShortString.EMPTY_STRING : body.getExchange();
+ AMQShortString exchangeName = body.getExchange();
AMQShortString queueName = body.getQueue();
AMQShortString routingKey = body.getRoutingKey();
- ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
ExchangeBoundOkBody response;
- if (exchange == null)
+ if(exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING))
{
-
-
- response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
- new AMQShortString("Exchange '" + exchangeName + "' not found"));
- }
- else if (routingKey == null)
- {
- if (queueName == null)
+ if(routingKey == null)
{
- if (exchange.hasBindings())
+ if(queueName == null)
{
- response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null);
}
else
{
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
- response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode
- null); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ new AMQShortString("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ }
}
}
else
{
-
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
+ if(queueName == null)
{
-
- response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
- new AMQShortString("Queue '" + queueName + "' not found")); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null);
}
else
{
- if (exchange.isBound(queue))
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
{
- response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
- null); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ new AMQShortString("Queue '" + queueName + "' not found")); // replyText
}
else
{
-
- response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode
- new AMQShortString("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null);
}
}
}
}
- else if (queueName != null)
+ else
{
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
+ ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
+ if (exchange == null)
{
- response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
- new AMQShortString("Queue '" + queueName + "' not found")); // replyText
+
+ response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
+ new AMQShortString("Exchange '" + exchangeName + "' not found"));
}
- else
+ else if (routingKey == null)
{
- String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString();
- if (exchange.isBound(bindingKey, queue))
+ if (queueName == null)
{
+ if (exchange.hasBindings())
+ {
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ }
+ else
+ {
- response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
- null); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode
+ null); // replyText
+ }
}
else
{
- String message = "Queue '" + queueName + "' not bound with routing key '" +
- body.getRoutingKey() + "' to exchange '" + exchangeName + "'";
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
- if(message.length()>255)
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ new AMQShortString("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
{
- message = message.substring(0,254);
+ if (exchange.isBound(queue))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode
+ new AMQShortString("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText
+ }
}
- response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
- new AMQShortString(message)); // replyText
}
}
- }
- else
- {
- if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString()))
+ else if (queueName != null)
{
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ new AMQShortString("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString();
+ if (exchange.isBound(bindingKey, queue))
+ {
- response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
- null); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ String message = "Queue '" + queueName + "' not bound with routing key '" +
+ body.getRoutingKey() + "' to exchange '" + exchangeName + "'";
+
+ if(message.length()>255)
+ {
+ message = message.substring(0,254);
+ }
+ response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
+ new AMQShortString(message)); // replyText
+ }
+ }
}
else
{
+ if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString()))
+ {
- response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode
- new AMQShortString("No queue bound with routing key '" + body.getRoutingKey() +
- "' to exchange '" + exchangeName + "'")); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode
+ new AMQShortString("No queue bound with routing key '" + body.getRoutingKey() +
+ "' to exchange '" + exchangeName + "'")); // replyText
+ }
}
}
session.writeFrame(response.generateFrame(channelId));
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
index 3b630c684c..78d47aaa52 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
@@ -78,76 +79,90 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
ExchangeImpl exchange;
- if (body.getPassive())
+ if(exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING))
{
- exchange = virtualHost.getExchange(exchangeName == null ? null : exchangeName.toString());
- if(exchange == null)
+ if(!new AMQShortString(DirectExchange.TYPE.getType()).equals(body.getType()))
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
+ + " of type "
+ + DirectExchange.TYPE.getType()
+ + " to " + body.getType() +".",
+ body.getClazz(), body.getMethod(),
+ body.getMajor(), body.getMinor(),null);
}
- else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getTypeName().equals(body.getType().asString()))
- {
-
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
- exchangeName + " of type " + exchange.getTypeName()
- + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
- }
-
}
else
{
- try
+ if (body.getPassive())
{
- String name = exchangeName == null ? null : exchangeName.intern().toString();
- String type = body.getType() == null ? null : body.getType().intern().toString();
- Map<String,Object> attributes = new HashMap<String, Object>();
-
- attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
- attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
- attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
- attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
- attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
- body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
- exchange = virtualHost.createExchange(attributes);
+ exchange = virtualHost.getExchange(exchangeName.toString());
+ if(exchange == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+ }
+ else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getTypeName().equals(body.getType().asString()))
+ {
- }
- catch(ReservedExchangeNameException e)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix.");
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
+ exchangeName + " of type " + exchange.getTypeName()
+ + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
+ }
}
- catch(ExchangeExistsException e)
+ else
{
- exchange = e.getExistingExchange();
- if(!new AMQShortString(exchange.getTypeName()).equals(body.getType()))
+ try
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
- + exchangeName + " of type "
- + exchange.getTypeName()
- + " to " + body.getType() +".",
- body.getClazz(), body.getMethod(),
- body.getMajor(), body.getMinor(),null);
+ String name = exchangeName == null ? null : exchangeName.intern().toString();
+ String type = body.getType() == null ? null : body.getType().intern().toString();
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ exchange = virtualHost.createExchange(attributes);
+
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Attempt to declare exchange: " + exchangeName +
+ " which begins with reserved prefix.");
+
+ }
+ catch(ExchangeExistsException e)
+ {
+ exchange = e.getExistingExchange();
+ if(!new AMQShortString(exchange.getTypeName()).equals(body.getType()))
+ {
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type "
+ + exchange.getTypeName()
+ + " to " + body.getType() +".",
+ body.getClazz(), body.getMethod(),
+ body.getMajor(), body.getMinor(),null);
+ }
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
+ catch (UnknownExchangeException e)
+ {
+ // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
}
- }
- catch(AMQUnknownExchangeType e)
- {
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
- }
- catch (UnknownExchangeException e)
- {
- // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
}
}
-
if(!body.getNowait())
{
MethodRegistry methodRegistry = session.getMethodRegistry();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
index 720677064b..bc723fc3dd 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
@@ -62,6 +62,11 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
{
final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
+ if(exchangeName == null || "".equals(exchangeName))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted");
+ }
+
final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
if(exchange == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
index 1e0382f456..7dc76d13d0 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
@@ -102,6 +102,12 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
}
final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
+
+ if(exchangeName == null || "".equals(exchangeName))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange");
+ }
+
final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
if (exch == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
index a828ca323d..abc9c8541c 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
@@ -93,6 +93,12 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
+
+ if(body.getExchange() == null || body.getExchange().equals(AMQShortString.EMPTY_STRING))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue " + queue.getName() + " from the default exchange");
+ }
+
final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString());
if (exch == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
index 86adc585c3..580b912552 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -73,10 +74,18 @@ public class BrokerTestHelper_0_8 extends BrokerTestHelper
when(info.getExchange()).thenReturn(exchangeNameAsShortString);
when(info.getRoutingKey()).thenReturn(routingKey);
- ExchangeImpl exchange = channel.getVirtualHost().getExchange(exchangeName);
+ MessageDestination destination;
+ if(exchangeName == null || "".equals(exchangeName))
+ {
+ destination = channel.getVirtualHost().getDefaultDestination();
+ }
+ else
+ {
+ destination = channel.getVirtualHost().getExchange(exchangeName);
+ }
for (int count = 0; count < numberOfMessages; count++)
{
- channel.setPublishFrame(info, exchange);
+ channel.setPublishFrame(info, destination);
// Set the body size
ContentHeaderBody _headerBody = new ContentHeaderBody();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 1e5c8caa18..78dcab9d75 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -130,11 +130,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
MessageSource queue = getVirtualHost().getMessageSource(addr);
if(queue != null)
{
-
destination = new MessageSourceDestination(queue);
-
-
-
}
else
{
@@ -145,7 +141,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
}
else
{
-
endpoint.setSource(null);
destination = null;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index e6f9c46523..bb57750426 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -34,7 +34,6 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
-import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
@@ -366,7 +365,7 @@ public class MessageStoreTest extends QpidTestCase
{
int origExchangeCount = getVirtualHost().getExchanges().size();
- Map<String, ExchangeImpl> oldExchanges = createExchanges();
+ Map<String, ExchangeImpl<?>> oldExchanges = createExchanges();
assertEquals("Incorrect number of exchanges registered before recovery",
origExchangeCount + 3, getVirtualHost().getExchanges().size());
@@ -421,7 +420,7 @@ public class MessageStoreTest extends QpidTestCase
createAllQueues();
createAllTopicQueues();
- Map<String, ExchangeImpl> exchanges = createExchanges();
+ Map<String, ExchangeImpl<?>> exchanges = createExchanges();
ExchangeImpl nonDurableExchange = exchanges.get(nonDurableExchangeName);
ExchangeImpl directExchange = exchanges.get(directExchangeName);
@@ -479,11 +478,11 @@ public class MessageStoreTest extends QpidTestCase
* and that the new exchanges are not the same objects as the provided list (i.e. that the
* reload actually generated new exchange objects)
*/
- private void validateExchanges(int originalNumExchanges, Map<String, ExchangeImpl> oldExchanges)
+ private void validateExchanges(int originalNumExchanges, Map<String, ExchangeImpl<?>> oldExchanges)
{
- Collection<ExchangeImpl> exchanges = getVirtualHost().getExchanges();
+ Collection<ExchangeImpl<?>> exchanges = getVirtualHost().getExchanges();
Collection<String> exchangeNames = new ArrayList(exchanges.size());
- for(ExchangeImpl exchange : exchanges)
+ for(ExchangeImpl<?> exchange : exchanges)
{
exchangeNames.add(exchange.getName());
}
@@ -709,9 +708,9 @@ public class MessageStoreTest extends QpidTestCase
}
- private Map<String, ExchangeImpl> createExchanges() throws Exception
+ private Map<String, ExchangeImpl<?>> createExchanges() throws Exception
{
- Map<String, ExchangeImpl> exchanges = new HashMap<String, ExchangeImpl>();
+ Map<String, ExchangeImpl<?>> exchanges = new HashMap<String, ExchangeImpl<?>>();
//Register non-durable DirectExchange
exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false));
@@ -723,9 +722,9 @@ public class MessageStoreTest extends QpidTestCase
return exchanges;
}
- private ExchangeImpl createExchange(ExchangeType<?> type, String name, boolean durable) throws Exception
+ private ExchangeImpl<?> createExchange(ExchangeType<?> type, String name, boolean durable) throws Exception
{
- ExchangeImpl exchange = null;
+ ExchangeImpl<?> exchange = null;
Map<String,Object> attributes = new HashMap<String, Object>();