summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-03-04 22:52:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-03-04 22:52:05 +0000
commit0096aa9a3c5969db8a3d517ba5cbfb6979d1eb24 (patch)
tree3ac3c144cf42e9cfe7e22c1cd879c65a89d42319 /qpid/java/broker-plugins
parent291bbc2729009bac61602fcb268af619df764bbb (diff)
downloadqpid-python-0096aa9a3c5969db8a3d517ba5cbfb6979d1eb24.tar.gz
QPID-5601 : [Java Broker] The 0-x "default exchange" should not actually be modelled as an Exchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1574235 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java276
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java20
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java152
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java123
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java13
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java5
9 files changed, 384 insertions, 222 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 1fb82efd2d..9d7764414f 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -683,83 +684,101 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
}
-
- if(method.getPassive())
+ if(method.getExchange() == null || method.getExchange().equals(""))
{
- ExchangeImpl exchange = getExchange(session, exchangeName);
-
- if(exchange == null)
+ if(!DirectExchange.TYPE.getType().equals(method.getType()))
{
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to redeclare default exchange "
+ + " of type " + DirectExchange.TYPE.getType()
+ + " to " + method.getType() +".");
}
- else
+ if(method.hasAlternateExchange() && !"".equals(method.getAlternateExchange()))
{
- if (!exchange.getTypeName().equals(method.getType())
- && (method.getType() != null && method.getType().length() > 0))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
- + exchangeName + " of type " + exchange.getTypeName() + " to " + method.getType() + ".");
- }
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to set alternate exchange of the default exchange "
+ + " to " + method.getAlternateExchange() +".");
}
}
else
{
-
- try
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
-
- attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
- attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange());
- attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType());
- attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable());
- attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
- method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange());
- virtualHost.createExchange(attributes);
- }
- catch(ReservedExchangeNameException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
- + exchangeName + " which begins with reserved name or prefix.");
- }
- catch(UnknownExchangeException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND,
- "Unknown alternate exchange " + e.getExchangeName());
- }
- catch(AMQUnknownExchangeType e)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
- }
- catch(ExchangeExistsException e)
+ if(method.getPassive())
{
- ExchangeImpl exchange = e.getExistingExchange();
- if(!exchange.getTypeName().equals(method.getType()))
+
+ ExchangeImpl exchange = getExchange(session, exchangeName);
+
+ if(exchange == null)
{
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to redeclare exchange: " + exchangeName
- + " of type " + exchange.getTypeName()
- + " to " + method.getType() +".");
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
}
- else if(method.hasAlternateExchange()
- && (exchange.getAlternateExchange() == null ||
- !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
+ else
{
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to change alternate exchange of: " + exchangeName
- + " from " + exchange.getAlternateExchange()
- + " to " + method.getAlternateExchange() +".");
+ if (!exchange.getTypeName().equals(method.getType())
+ && (method.getType() != null && method.getType().length() > 0))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type " + exchange.getTypeName() + " to " + method.getType() + ".");
+ }
}
}
- catch (AccessControlException e)
+ else
{
- exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
- }
+
+ try
+ {
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange());
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType());
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable());
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange());
+ virtualHost.createExchange(attributes);
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+ + exchangeName + " which begins with reserved name or prefix.");
+ }
+ catch(UnknownExchangeException e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND,
+ "Unknown alternate exchange " + e.getExchangeName());
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
+ }
+ catch(ExchangeExistsException e)
+ {
+ ExchangeImpl exchange = e.getExistingExchange();
+ if(!exchange.getTypeName().equals(method.getType()))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to redeclare exchange: " + exchangeName
+ + " of type " + exchange.getTypeName()
+ + " to " + method.getType() +".");
+ }
+ else if(method.hasAlternateExchange()
+ && (exchange.getAlternateExchange() == null ||
+ !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to change alternate exchange of: " + exchangeName
+ + " from " + exchange.getAlternateExchange()
+ + " to " + method.getAlternateExchange() +".");
+ }
+ }
+ catch (AccessControlException e)
+ {
+ exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
+ }
+ }
}
-
}
private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description)
@@ -789,12 +808,12 @@ public class ServerSessionDelegate extends SessionDelegate
destination = virtualHost.getMessageDestination(xfr.getDestination());
if(destination == null)
{
- destination = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultDestination();
}
}
else
{
- destination = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultDestination();
}
return destination;
}
@@ -878,19 +897,30 @@ public class ServerSessionDelegate extends SessionDelegate
ExchangeQueryResult result = new ExchangeQueryResult();
- ExchangeImpl exchange = getExchange(session, method.getName());
- if(exchange != null)
+ final String exchangeName = method.getName();
+
+ if(exchangeName == null || exchangeName.equals(""))
{
- result.setDurable(exchange.isDurable());
- result.setType(exchange.getTypeName());
+ result.setDurable(true);
+ result.setType(DirectExchange.TYPE.getType());
result.setNotFound(false);
}
else
{
- result.setNotFound(true);
- }
+ ExchangeImpl exchange = getExchange(session, exchangeName);
+ if(exchange != null)
+ {
+ result.setDurable(exchange.isDurable());
+ result.setType(exchange.getTypeName());
+ result.setNotFound(false);
+ }
+ else
+ {
+ result.setNotFound(true);
+ }
+ }
session.executionResult((int) method.getId(), result);
}
@@ -904,52 +934,56 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
}
- else if (nameNullOrEmpty(method.getExchange()))
- {
- exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange");
- }
else
{
- //TODO - here because of non-compliant python tests
- // should raise exception ILLEGAL_ARGUMENT "binding-key not set"
- if (!method.hasBindingKey())
- {
- method.setBindingKey(method.getQueue());
- }
- AMQQueue queue = virtualHost.getQueue(method.getQueue());
- ExchangeImpl exchange = virtualHost.getExchange(method.getExchange());
- if(queue == null)
+ final String exchangeName = method.getExchange();
+ if (nameNullOrEmpty(exchangeName))
{
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
- }
- else if(exchange == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
- }
- else if(exchange.getExchangeType().equals(HeadersExchange.TYPE) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
- {
- exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getType() + " require an x-match header");
+ exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange");
}
else
{
- if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue))
+ //TODO - here because of non-compliant python tests
+ // should raise exception ILLEGAL_ARGUMENT "binding-key not set"
+ if (!method.hasBindingKey())
{
- try
+ method.setBindingKey(method.getQueue());
+ }
+ AMQQueue queue = virtualHost.getQueue(method.getQueue());
+ ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
+ if(queue == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
+ }
+ else if(exchange == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + exchangeName + "' not found");
+ }
+ else if(exchange.getExchangeType().equals(HeadersExchange.TYPE) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
+ {
+ exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getType() + " require an x-match header");
+ }
+ else
+ {
+ if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue))
{
- exchange.addBinding(method.getBindingKey(), queue, method.getArguments());
+ try
+ {
+ exchange.addBinding(method.getBindingKey(), queue, method.getArguments());
+ }
+ catch (AccessControlException e)
+ {
+ exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
+ }
}
- catch (AccessControlException e)
+ else
{
- exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
+ // todo
}
}
- else
- {
- // todo
- }
- }
+ }
}
@@ -1010,8 +1044,10 @@ public class ServerSessionDelegate extends SessionDelegate
VirtualHost virtualHost = getVirtualHost(session);
ExchangeImpl exchange;
AMQQueue queue;
- if(method.hasExchange())
+ boolean isDefaultExchange;
+ if(method.hasExchange() && !method.getExchange().equals(""))
{
+ isDefaultExchange = false;
exchange = virtualHost.getExchange(method.getExchange());
if(exchange == null)
@@ -1021,11 +1057,47 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- exchange = virtualHost.getDefaultExchange();
+ isDefaultExchange = true;
+ exchange = null;
}
+ if(isDefaultExchange)
+ {
+ if(method.hasQueue())
+ {
+ queue = getQueue(session, method.getQueue());
- if(method.hasQueue())
+ if(queue == null)
+ {
+ result.setQueueNotFound(true);
+ }
+ else
+ {
+ if(method.hasBindingKey())
+ {
+ if(!method.getBindingKey().equals(method.getQueue()))
+ {
+ result.setKeyNotMatched(true);
+ }
+ }
+ }
+ }
+ else if(method.hasBindingKey())
+ {
+ if(getQueue(session, method.getBindingKey()) == null)
+ {
+ result.setKeyNotMatched(true);
+ }
+ }
+
+ if(method.hasArguments() && !method.getArguments().isEmpty())
+ {
+ result.setArgsNotMatched(true);
+ }
+
+
+ }
+ else if(method.hasQueue())
{
queue = getQueue(session, method.getQueue());
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
index 101a92242f..fc085e8ab1 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v0_8.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -62,16 +61,23 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic
}
AMQShortString exchangeName = body.getExchange();
+ VirtualHost vHost = session.getVirtualHost();
+
// TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
- if (exchangeName == null)
+
+ MessageDestination destination;
+
+ if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName))
{
- exchangeName = AMQShortString.valueOf(ExchangeDefaults.DEFAULT_EXCHANGE_NAME);
+ destination = vHost.getDefaultDestination();
+ }
+ else
+ {
+ destination = vHost.getMessageDestination(exchangeName.toString());
}
- VirtualHost vHost = session.getVirtualHost();
- MessageDestination exch = vHost.getMessageDestination(exchangeName.toString());
// if the exchange does not exist we raise a channel exception
- if (exch == null)
+ if (destination == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name");
}
@@ -91,7 +97,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic
info.setExchange(exchangeName);
try
{
- channel.setPublishFrame(info, exch);
+ channel.setPublishFrame(info, destination);
}
catch (AccessControlException e)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
index 27837844ff..fce1260155 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
@@ -79,107 +79,155 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
channel.sync();
- AMQShortString exchangeName = body.getExchange() == null ? AMQShortString.EMPTY_STRING : body.getExchange();
+ AMQShortString exchangeName = body.getExchange();
AMQShortString queueName = body.getQueue();
AMQShortString routingKey = body.getRoutingKey();
- ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
ExchangeBoundOkBody response;
- if (exchange == null)
+ if(exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING))
{
-
-
- response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
- new AMQShortString("Exchange '" + exchangeName + "' not found"));
- }
- else if (routingKey == null)
- {
- if (queueName == null)
+ if(routingKey == null)
{
- if (exchange.hasBindings())
+ if(queueName == null)
{
- response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null);
}
else
{
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
- response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode
- null); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ new AMQShortString("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ }
}
}
else
{
-
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
+ if(queueName == null)
{
-
- response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
- new AMQShortString("Queue '" + queueName + "' not found")); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null);
}
else
{
- if (exchange.isBound(queue))
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
{
- response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
- null); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ new AMQShortString("Queue '" + queueName + "' not found")); // replyText
}
else
{
-
- response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode
- new AMQShortString("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null);
}
}
}
}
- else if (queueName != null)
+ else
{
- AMQQueue queue = virtualHost.getQueue(queueName.toString());
- if (queue == null)
+ ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
+ if (exchange == null)
{
- response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
- new AMQShortString("Queue '" + queueName + "' not found")); // replyText
+
+ response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
+ new AMQShortString("Exchange '" + exchangeName + "' not found"));
}
- else
+ else if (routingKey == null)
{
- String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString();
- if (exchange.isBound(bindingKey, queue))
+ if (queueName == null)
{
+ if (exchange.hasBindings())
+ {
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ }
+ else
+ {
- response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
- null); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode
+ null); // replyText
+ }
}
else
{
- String message = "Queue '" + queueName + "' not bound with routing key '" +
- body.getRoutingKey() + "' to exchange '" + exchangeName + "'";
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
- if(message.length()>255)
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ new AMQShortString("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
{
- message = message.substring(0,254);
+ if (exchange.isBound(queue))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode
+ new AMQShortString("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText
+ }
}
- response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
- new AMQShortString(message)); // replyText
}
}
- }
- else
- {
- if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString()))
+ else if (queueName != null)
{
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ new AMQShortString("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString();
+ if (exchange.isBound(bindingKey, queue))
+ {
- response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
- null); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ String message = "Queue '" + queueName + "' not bound with routing key '" +
+ body.getRoutingKey() + "' to exchange '" + exchangeName + "'";
+
+ if(message.length()>255)
+ {
+ message = message.substring(0,254);
+ }
+ response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
+ new AMQShortString(message)); // replyText
+ }
+ }
}
else
{
+ if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString()))
+ {
- response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode
- new AMQShortString("No queue bound with routing key '" + body.getRoutingKey() +
- "' to exchange '" + exchangeName + "'")); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode
+ new AMQShortString("No queue bound with routing key '" + body.getRoutingKey() +
+ "' to exchange '" + exchangeName + "'")); // replyText
+ }
}
}
session.writeFrame(response.generateFrame(channelId));
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
index 3b630c684c..78d47aaa52 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
@@ -78,76 +79,90 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
ExchangeImpl exchange;
- if (body.getPassive())
+ if(exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING))
{
- exchange = virtualHost.getExchange(exchangeName == null ? null : exchangeName.toString());
- if(exchange == null)
+ if(!new AMQShortString(DirectExchange.TYPE.getType()).equals(body.getType()))
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
+ + " of type "
+ + DirectExchange.TYPE.getType()
+ + " to " + body.getType() +".",
+ body.getClazz(), body.getMethod(),
+ body.getMajor(), body.getMinor(),null);
}
- else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getTypeName().equals(body.getType().asString()))
- {
-
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
- exchangeName + " of type " + exchange.getTypeName()
- + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
- }
-
}
else
{
- try
+ if (body.getPassive())
{
- String name = exchangeName == null ? null : exchangeName.intern().toString();
- String type = body.getType() == null ? null : body.getType().intern().toString();
- Map<String,Object> attributes = new HashMap<String, Object>();
-
- attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
- attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
- attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
- attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
- attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
- body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
- exchange = virtualHost.createExchange(attributes);
+ exchange = virtualHost.getExchange(exchangeName.toString());
+ if(exchange == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+ }
+ else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getTypeName().equals(body.getType().asString()))
+ {
- }
- catch(ReservedExchangeNameException e)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix.");
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
+ exchangeName + " of type " + exchange.getTypeName()
+ + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
+ }
}
- catch(ExchangeExistsException e)
+ else
{
- exchange = e.getExistingExchange();
- if(!new AMQShortString(exchange.getTypeName()).equals(body.getType()))
+ try
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
- + exchangeName + " of type "
- + exchange.getTypeName()
- + " to " + body.getType() +".",
- body.getClazz(), body.getMethod(),
- body.getMajor(), body.getMinor(),null);
+ String name = exchangeName == null ? null : exchangeName.intern().toString();
+ String type = body.getType() == null ? null : body.getType().intern().toString();
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ exchange = virtualHost.createExchange(attributes);
+
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Attempt to declare exchange: " + exchangeName +
+ " which begins with reserved prefix.");
+
+ }
+ catch(ExchangeExistsException e)
+ {
+ exchange = e.getExistingExchange();
+ if(!new AMQShortString(exchange.getTypeName()).equals(body.getType()))
+ {
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type "
+ + exchange.getTypeName()
+ + " to " + body.getType() +".",
+ body.getClazz(), body.getMethod(),
+ body.getMajor(), body.getMinor(),null);
+ }
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
+ catch (UnknownExchangeException e)
+ {
+ // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
}
- }
- catch(AMQUnknownExchangeType e)
- {
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
- }
- catch (AccessControlException e)
- {
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
- }
- catch (UnknownExchangeException e)
- {
- // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
}
}
-
if(!body.getNowait())
{
MethodRegistry methodRegistry = session.getMethodRegistry();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
index 720677064b..bc723fc3dd 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
@@ -62,6 +62,11 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
{
final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
+ if(exchangeName == null || "".equals(exchangeName))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted");
+ }
+
final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
if(exchange == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
index 1e0382f456..7dc76d13d0 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
@@ -102,6 +102,12 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
}
final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
+
+ if(exchangeName == null || "".equals(exchangeName))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange");
+ }
+
final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
if (exch == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
index a828ca323d..abc9c8541c 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
@@ -93,6 +93,12 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
+
+ if(body.getExchange() == null || body.getExchange().equals(AMQShortString.EMPTY_STRING))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue " + queue.getName() + " from the default exchange");
+ }
+
final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString());
if (exch == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
index 86adc585c3..580b912552 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -73,10 +74,18 @@ public class BrokerTestHelper_0_8 extends BrokerTestHelper
when(info.getExchange()).thenReturn(exchangeNameAsShortString);
when(info.getRoutingKey()).thenReturn(routingKey);
- ExchangeImpl exchange = channel.getVirtualHost().getExchange(exchangeName);
+ MessageDestination destination;
+ if(exchangeName == null || "".equals(exchangeName))
+ {
+ destination = channel.getVirtualHost().getDefaultDestination();
+ }
+ else
+ {
+ destination = channel.getVirtualHost().getExchange(exchangeName);
+ }
for (int count = 0; count < numberOfMessages; count++)
{
- channel.setPublishFrame(info, exchange);
+ channel.setPublishFrame(info, destination);
// Set the body size
ContentHeaderBody _headerBody = new ContentHeaderBody();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 1e5c8caa18..78dcab9d75 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -130,11 +130,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
MessageSource queue = getVirtualHost().getMessageSource(addr);
if(queue != null)
{
-
destination = new MessageSourceDestination(queue);
-
-
-
}
else
{
@@ -145,7 +141,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
}
else
{
-
endpoint.setSource(null);
destination = null;
}