diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-03-04 22:52:05 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-03-04 22:52:05 +0000 |
| commit | 0096aa9a3c5969db8a3d517ba5cbfb6979d1eb24 (patch) | |
| tree | 3ac3c144cf42e9cfe7e22c1cd879c65a89d42319 /qpid/java/broker-plugins | |
| parent | 291bbc2729009bac61602fcb268af619df764bbb (diff) | |
| download | qpid-python-0096aa9a3c5969db8a3d517ba5cbfb6979d1eb24.tar.gz | |
QPID-5601 : [Java Broker] The 0-x "default exchange" should not actually be modelled as an Exchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1574235 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
9 files changed, 384 insertions, 222 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 1fb82efd2d..9d7764414f 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -27,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; @@ -683,83 +684,101 @@ public class ServerSessionDelegate extends SessionDelegate return; } } - - if(method.getPassive()) + if(method.getExchange() == null || method.getExchange().equals("")) { - ExchangeImpl exchange = getExchange(session, exchangeName); - - if(exchange == null) + if(!DirectExchange.TYPE.getType().equals(method.getType())) { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'"); + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to redeclare default exchange " + + " of type " + DirectExchange.TYPE.getType() + + " to " + method.getType() +"."); } - else + if(method.hasAlternateExchange() && !"".equals(method.getAlternateExchange())) { - if (!exchange.getTypeName().equals(method.getType()) - && (method.getType() != null && method.getType().length() > 0)) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " - + exchangeName + " of type " + exchange.getTypeName() + " to " + method.getType() + "."); - } + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to set alternate exchange of the default exchange " + + " to " + method.getAlternateExchange() +"."); } } else { - - try - { - Map<String,Object> attributes = new HashMap<String, Object>(); - - attributes.put(org.apache.qpid.server.model.Exchange.ID, null); - attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange()); - attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType()); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable()); - attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, - method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange()); - virtualHost.createExchange(attributes); - } - catch(ReservedExchangeNameException e) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: " - + exchangeName + " which begins with reserved name or prefix."); - } - catch(UnknownExchangeException e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, - "Unknown alternate exchange " + e.getExchangeName()); - } - catch(AMQUnknownExchangeType e) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); - } - catch(ExchangeExistsException e) + if(method.getPassive()) { - ExchangeImpl exchange = e.getExistingExchange(); - if(!exchange.getTypeName().equals(method.getType())) + + ExchangeImpl exchange = getExchange(session, exchangeName); + + if(exchange == null) { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to redeclare exchange: " + exchangeName - + " of type " + exchange.getTypeName() - + " to " + method.getType() +"."); + exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'"); } - else if(method.hasAlternateExchange() - && (exchange.getAlternateExchange() == null || - !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) + else { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to change alternate exchange of: " + exchangeName - + " from " + exchange.getAlternateExchange() - + " to " + method.getAlternateExchange() +"."); + if (!exchange.getTypeName().equals(method.getType()) + && (method.getType() != null && method.getType().length() > 0)) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + exchange.getTypeName() + " to " + method.getType() + "."); + } } } - catch (AccessControlException e) + else { - exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); - } + + try + { + Map<String,Object> attributes = new HashMap<String, Object>(); + + attributes.put(org.apache.qpid.server.model.Exchange.ID, null); + attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange()); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType()); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable()); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange()); + virtualHost.createExchange(attributes); + } + catch(ReservedExchangeNameException e) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: " + + exchangeName + " which begins with reserved name or prefix."); + } + catch(UnknownExchangeException e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, + "Unknown alternate exchange " + e.getExchangeName()); + } + catch(AMQUnknownExchangeType e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); + } + catch(ExchangeExistsException e) + { + ExchangeImpl exchange = e.getExistingExchange(); + if(!exchange.getTypeName().equals(method.getType())) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to redeclare exchange: " + exchangeName + + " of type " + exchange.getTypeName() + + " to " + method.getType() +"."); + } + else if(method.hasAlternateExchange() + && (exchange.getAlternateExchange() == null || + !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to change alternate exchange of: " + exchangeName + + " from " + exchange.getAlternateExchange() + + " to " + method.getAlternateExchange() +"."); + } + } + catch (AccessControlException e) + { + exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); + } + } } - } private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description) @@ -789,12 +808,12 @@ public class ServerSessionDelegate extends SessionDelegate destination = virtualHost.getMessageDestination(xfr.getDestination()); if(destination == null) { - destination = virtualHost.getDefaultExchange(); + destination = virtualHost.getDefaultDestination(); } } else { - destination = virtualHost.getDefaultExchange(); + destination = virtualHost.getDefaultDestination(); } return destination; } @@ -878,19 +897,30 @@ public class ServerSessionDelegate extends SessionDelegate ExchangeQueryResult result = new ExchangeQueryResult(); - ExchangeImpl exchange = getExchange(session, method.getName()); - if(exchange != null) + final String exchangeName = method.getName(); + + if(exchangeName == null || exchangeName.equals("")) { - result.setDurable(exchange.isDurable()); - result.setType(exchange.getTypeName()); + result.setDurable(true); + result.setType(DirectExchange.TYPE.getType()); result.setNotFound(false); } else { - result.setNotFound(true); - } + ExchangeImpl exchange = getExchange(session, exchangeName); + if(exchange != null) + { + result.setDurable(exchange.isDurable()); + result.setType(exchange.getTypeName()); + result.setNotFound(false); + } + else + { + result.setNotFound(true); + } + } session.executionResult((int) method.getId(), result); } @@ -904,52 +934,56 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set"); } - else if (nameNullOrEmpty(method.getExchange())) - { - exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange"); - } else { - //TODO - here because of non-compliant python tests - // should raise exception ILLEGAL_ARGUMENT "binding-key not set" - if (!method.hasBindingKey()) - { - method.setBindingKey(method.getQueue()); - } - AMQQueue queue = virtualHost.getQueue(method.getQueue()); - ExchangeImpl exchange = virtualHost.getExchange(method.getExchange()); - if(queue == null) + final String exchangeName = method.getExchange(); + if (nameNullOrEmpty(exchangeName)) { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); - } - else if(exchange == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found"); - } - else if(exchange.getExchangeType().equals(HeadersExchange.TYPE) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match"))) - { - exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getType() + " require an x-match header"); + exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange"); } else { - if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue)) + //TODO - here because of non-compliant python tests + // should raise exception ILLEGAL_ARGUMENT "binding-key not set" + if (!method.hasBindingKey()) { - try + method.setBindingKey(method.getQueue()); + } + AMQQueue queue = virtualHost.getQueue(method.getQueue()); + ExchangeImpl exchange = virtualHost.getExchange(exchangeName); + if(queue == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); + } + else if(exchange == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + exchangeName + "' not found"); + } + else if(exchange.getExchangeType().equals(HeadersExchange.TYPE) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match"))) + { + exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getType() + " require an x-match header"); + } + else + { + if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue)) { - exchange.addBinding(method.getBindingKey(), queue, method.getArguments()); + try + { + exchange.addBinding(method.getBindingKey(), queue, method.getArguments()); + } + catch (AccessControlException e) + { + exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); + } } - catch (AccessControlException e) + else { - exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); + // todo } } - else - { - // todo - } - } + } } @@ -1010,8 +1044,10 @@ public class ServerSessionDelegate extends SessionDelegate VirtualHost virtualHost = getVirtualHost(session); ExchangeImpl exchange; AMQQueue queue; - if(method.hasExchange()) + boolean isDefaultExchange; + if(method.hasExchange() && !method.getExchange().equals("")) { + isDefaultExchange = false; exchange = virtualHost.getExchange(method.getExchange()); if(exchange == null) @@ -1021,11 +1057,47 @@ public class ServerSessionDelegate extends SessionDelegate } else { - exchange = virtualHost.getDefaultExchange(); + isDefaultExchange = true; + exchange = null; } + if(isDefaultExchange) + { + if(method.hasQueue()) + { + queue = getQueue(session, method.getQueue()); - if(method.hasQueue()) + if(queue == null) + { + result.setQueueNotFound(true); + } + else + { + if(method.hasBindingKey()) + { + if(!method.getBindingKey().equals(method.getQueue())) + { + result.setKeyNotMatched(true); + } + } + } + } + else if(method.hasBindingKey()) + { + if(getQueue(session, method.getBindingKey()) == null) + { + result.setKeyNotMatched(true); + } + } + + if(method.hasArguments() && !method.getArguments().isEmpty()) + { + result.setArgsNotMatched(true); + } + + + } + else if(method.hasQueue()) { queue = getQueue(session, method.getQueue()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java index 101a92242f..fc085e8ab1 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v0_8.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; @@ -62,16 +61,23 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic } AMQShortString exchangeName = body.getExchange(); + VirtualHost vHost = session.getVirtualHost(); + // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? - if (exchangeName == null) + + MessageDestination destination; + + if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName)) { - exchangeName = AMQShortString.valueOf(ExchangeDefaults.DEFAULT_EXCHANGE_NAME); + destination = vHost.getDefaultDestination(); + } + else + { + destination = vHost.getMessageDestination(exchangeName.toString()); } - VirtualHost vHost = session.getVirtualHost(); - MessageDestination exch = vHost.getMessageDestination(exchangeName.toString()); // if the exchange does not exist we raise a channel exception - if (exch == null) + if (destination == null) { throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name"); } @@ -91,7 +97,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic info.setExchange(exchangeName); try { - channel.setPublishFrame(info, exch); + channel.setPublishFrame(info, destination); } catch (AccessControlException e) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java index 27837844ff..fce1260155 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java @@ -79,107 +79,155 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo channel.sync(); - AMQShortString exchangeName = body.getExchange() == null ? AMQShortString.EMPTY_STRING : body.getExchange(); + AMQShortString exchangeName = body.getExchange(); AMQShortString queueName = body.getQueue(); AMQShortString routingKey = body.getRoutingKey(); - ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString()); ExchangeBoundOkBody response; - if (exchange == null) + if(exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING)) { - - - response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND, - new AMQShortString("Exchange '" + exchangeName + "' not found")); - } - else if (routingKey == null) - { - if (queueName == null) + if(routingKey == null) { - if (exchange.hasBindings()) + if(queueName == null) { - response = methodRegistry.createExchangeBoundOkBody(OK, null); + response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null); } else { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { - response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode - null); // replyText + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode + new AMQShortString("Queue '" + queueName + "' not found")); // replyText + } + else + { + response = methodRegistry.createExchangeBoundOkBody(OK, null); + } } } else { - - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) + if(queueName == null) { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - new AMQShortString("Queue '" + queueName + "' not found")); // replyText + response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null); } else { - if (exchange.isBound(queue)) + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) { - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode + new AMQShortString("Queue '" + queueName + "' not found")); // replyText } else { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode - new AMQShortString("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText + response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null); } } } } - else if (queueName != null) + else { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) + ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString()); + if (exchange == null) { - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - new AMQShortString("Queue '" + queueName + "' not found")); // replyText + + response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND, + new AMQShortString("Exchange '" + exchangeName + "' not found")); } - else + else if (routingKey == null) { - String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString(); - if (exchange.isBound(bindingKey, queue)) + if (queueName == null) { + if (exchange.hasBindings()) + { + response = methodRegistry.createExchangeBoundOkBody(OK, null); + } + else + { - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText + response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode + null); // replyText + } } else { - String message = "Queue '" + queueName + "' not bound with routing key '" + - body.getRoutingKey() + "' to exchange '" + exchangeName + "'"; + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { - if(message.length()>255) + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode + new AMQShortString("Queue '" + queueName + "' not found")); // replyText + } + else { - message = message.substring(0,254); + if (exchange.isBound(queue)) + { + + response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode + null); // replyText + } + else + { + + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode + new AMQShortString("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText + } } - response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode - new AMQShortString(message)); // replyText } } - } - else - { - if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString())) + else if (queueName != null) { + AMQQueue queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) + { + + response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode + new AMQShortString("Queue '" + queueName + "' not found")); // replyText + } + else + { + String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString(); + if (exchange.isBound(bindingKey, queue)) + { - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText + response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode + null); // replyText + } + else + { + + String message = "Queue '" + queueName + "' not bound with routing key '" + + body.getRoutingKey() + "' to exchange '" + exchangeName + "'"; + + if(message.length()>255) + { + message = message.substring(0,254); + } + response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode + new AMQShortString(message)); // replyText + } + } } else { + if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString())) + { - response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode - new AMQShortString("No queue bound with routing key '" + body.getRoutingKey() + - "' to exchange '" + exchangeName + "'")); // replyText + response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode + null); // replyText + } + else + { + + response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode + new AMQShortString("No queue bound with routing key '" + body.getRoutingKey() + + "' to exchange '" + exchangeName + "'")); // replyText + } } } session.writeFrame(response.generateFrame(channelId)); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java index 3b630c684c..78d47aaa52 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java @@ -30,6 +30,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.protocol.v0_8.AMQChannel; @@ -78,76 +79,90 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange ExchangeImpl exchange; - if (body.getPassive()) + if(exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING)) { - exchange = virtualHost.getExchange(exchangeName == null ? null : exchangeName.toString()); - if(exchange == null) + if(!new AMQShortString(DirectExchange.TYPE.getType()).equals(body.getType())) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName); + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: " + + " of type " + + DirectExchange.TYPE.getType() + + " to " + body.getType() +".", + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); } - else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getTypeName().equals(body.getType().asString())) - { - - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + - exchangeName + " of type " + exchange.getTypeName() - + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null); - } - } else { - try + if (body.getPassive()) { - String name = exchangeName == null ? null : exchangeName.intern().toString(); - String type = body.getType() == null ? null : body.getType().intern().toString(); - Map<String,Object> attributes = new HashMap<String, Object>(); - - attributes.put(org.apache.qpid.server.model.Exchange.ID, null); - attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); - attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); - attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, - body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); - exchange = virtualHost.createExchange(attributes); + exchange = virtualHost.getExchange(exchangeName.toString()); + if(exchange == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName); + } + else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getTypeName().equals(body.getType().asString())) + { - } - catch(ReservedExchangeNameException e) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix."); + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + exchange.getTypeName() + + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null); + } } - catch(ExchangeExistsException e) + else { - exchange = e.getExistingExchange(); - if(!new AMQShortString(exchange.getTypeName()).equals(body.getType())) + try { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " - + exchangeName + " of type " - + exchange.getTypeName() - + " to " + body.getType() +".", - body.getClazz(), body.getMethod(), - body.getMajor(), body.getMinor(),null); + String name = exchangeName == null ? null : exchangeName.intern().toString(); + String type = body.getType() == null ? null : body.getType().intern().toString(); + Map<String,Object> attributes = new HashMap<String, Object>(); + + attributes.put(org.apache.qpid.server.model.Exchange.ID, null); + attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + exchange = virtualHost.createExchange(attributes); + + } + catch(ReservedExchangeNameException e) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Attempt to declare exchange: " + exchangeName + + " which begins with reserved prefix."); + + } + catch(ExchangeExistsException e) + { + exchange = e.getExistingExchange(); + if(!new AMQShortString(exchange.getTypeName()).equals(body.getType())) + { + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + + exchange.getTypeName() + + " to " + body.getType() +".", + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); + } + } + catch(AMQUnknownExchangeType e) + { + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e); + } + catch (AccessControlException e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + } + catch (UnknownExchangeException e) + { + // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur + throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e); } - } - catch(AMQUnknownExchangeType e) - { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); - } - catch (UnknownExchangeException e) - { - // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur - throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e); } } - if(!body.getNowait()) { MethodRegistry methodRegistry = session.getMethodRegistry(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java index 720677064b..bc723fc3dd 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java @@ -62,6 +62,11 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD { final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString(); + if(exchangeName == null || "".equals(exchangeName)) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted"); + } + final ExchangeImpl exchange = virtualHost.getExchange(exchangeName); if(exchange == null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java index 1e0382f456..7dc76d13d0 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java @@ -102,6 +102,12 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist."); } final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString(); + + if(exchangeName == null || "".equals(exchangeName)) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange"); + } + final ExchangeImpl exch = virtualHost.getExchange(exchangeName); if (exch == null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java index a828ca323d..abc9c8541c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java @@ -93,6 +93,12 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB { throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); } + + if(body.getExchange() == null || body.getExchange().equals(AMQShortString.EMPTY_STRING)) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue " + queue.getName() + " from the default exchange"); + } + final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString()); if (exch == null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java index 86adc585c3..580b912552 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -73,10 +74,18 @@ public class BrokerTestHelper_0_8 extends BrokerTestHelper when(info.getExchange()).thenReturn(exchangeNameAsShortString); when(info.getRoutingKey()).thenReturn(routingKey); - ExchangeImpl exchange = channel.getVirtualHost().getExchange(exchangeName); + MessageDestination destination; + if(exchangeName == null || "".equals(exchangeName)) + { + destination = channel.getVirtualHost().getDefaultDestination(); + } + else + { + destination = channel.getVirtualHost().getExchange(exchangeName); + } for (int count = 0; count < numberOfMessages; count++) { - channel.setPublishFrame(info, exchange); + channel.setPublishFrame(info, destination); // Set the body size ContentHeaderBody _headerBody = new ContentHeaderBody(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 1e5c8caa18..78dcab9d75 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -130,11 +130,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio MessageSource queue = getVirtualHost().getMessageSource(addr); if(queue != null) { - destination = new MessageSourceDestination(queue); - - - } else { @@ -145,7 +141,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } else { - endpoint.setSource(null); destination = null; } |
