From 9cc4cdaa54ec97aefb3ca2da9b437101b8127c2b Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 26 Jan 2007 18:45:39 +0000 Subject: Updated broker for issues highlighted by python tests. (e.g. fanout exchange, default exchange rules etc) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@500303 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/exchange/DefaultExchangeFactory.java | 11 +- .../server/exchange/DefaultExchangeRegistry.java | 24 ++- .../qpid/server/exchange/DestNameExchange.java | 6 + .../qpid/server/exchange/DestWildExchange.java | 7 + .../org/apache/qpid/server/exchange/Exchange.java | 1 + .../qpid/server/exchange/ExchangeRegistry.java | 2 + .../qpid/server/exchange/FanoutExchange.java | 205 +++++++++++++++++++++ .../qpid/server/exchange/HeadersExchange.java | 6 + .../server/handler/ExchangeDeclareHandler.java | 28 ++- .../qpid/server/protocol/ExchangeInitialiser.java | 3 + .../apache/qpid/server/queue/ExchangeBindings.java | 2 +- 11 files changed, 285 insertions(+), 10 deletions(-) create mode 100644 java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 0c73e0f9f0..8603bc34aa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -25,6 +25,8 @@ import org.apache.qpid.AMQException; import java.util.HashMap; import java.util.Map; +import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.exchange.ExchangeDefaults; public class DefaultExchangeFactory implements ExchangeFactory { @@ -34,9 +36,10 @@ public class DefaultExchangeFactory implements ExchangeFactory public DefaultExchangeFactory() { - _exchangeClassMap.put("direct", org.apache.qpid.server.exchange.DestNameExchange.class); - _exchangeClassMap.put("topic", org.apache.qpid.server.exchange.DestWildExchange.class); - _exchangeClassMap.put("headers", org.apache.qpid.server.exchange.HeadersExchange.class); + _exchangeClassMap.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestNameExchange.class); + _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class); + _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class); + _exchangeClassMap.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.FanoutExchange.class); } public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete, @@ -46,7 +49,7 @@ public class DefaultExchangeFactory implements ExchangeFactory Class exchClass = _exchangeClassMap.get(type); if (exchClass == null) { - throw new AMQException(_logger, "Unknown exchange type: " + type); + throw new AMQUnknownExchangeType("Unknown exchange type: " + type); } try { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index ef94918e59..02a8dbda6e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -37,6 +37,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry */ private ConcurrentMap _exchangeMap = new ConcurrentHashMap(); + private Exchange _defaultExchange; + public DefaultExchangeRegistry(ExchangeFactory exchangeFactory) { //create 'standard' exchanges: @@ -52,9 +54,18 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void registerExchange(Exchange exchange) { + if(_defaultExchange == null) + { + setDefaultExchange(exchange); + } _exchangeMap.put(exchange.getName(), exchange); } + public void setDefaultExchange(Exchange exchange) + { + _defaultExchange = exchange; + } + public void unregisterExchange(String name, boolean inUse) throws AMQException { // TODO: check inUse argument @@ -71,7 +82,16 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public Exchange getExchange(String name) { - return _exchangeMap.get(name); + + if(name == null || name.length() == 0) + { + return _defaultExchange; + } + else + { + return _exchangeMap.get(name); + } + } /** @@ -82,7 +102,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void routeContent(AMQMessage payload) throws AMQException { final String exchange = payload.getTransferBody().destination; - final Exchange exch = _exchangeMap.get(exchange); + final Exchange exch = getExchange(exchange); // there is a small window of opportunity for the exchange to be deleted in between // the JmsPublish being received (where the exchange is validated) and the final // content body being received (which triggers this method) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index b777ae7d82..872cde56e2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.exchange.ExchangeDefaults; import javax.management.JMException; import javax.management.MBeanException; @@ -228,4 +229,9 @@ public class DestNameExchange extends AbstractExchange { return !_index.getBindingsMap().isEmpty(); } + + public String getType() + { + return ExchangeDefaults.DIRECT_EXCHANGE_CLASS; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 932632cde3..3064a7eabc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.exchange.ExchangeDefaults; import javax.management.JMException; import javax.management.MBeanException; @@ -241,4 +242,10 @@ public class DestWildExchange extends AbstractExchange throw new AMQException("Exception occured in creating the topic exchenge mbean", ex); } } + + + public String getType() + { + return ExchangeDefaults.TOPIC_EXCHANGE_CLASS; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 824e85dc5c..9fcbe2a871 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.queue.AMQMessage; public interface Exchange { String getName(); + String getType(); void initialise(String name, boolean durable, int ticket, boolean autoDelete) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index 4a0a6a0ee1..2fc64135a2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -37,4 +37,6 @@ public interface ExchangeRegistry extends MessageRouter void unregisterExchange(String name, boolean inUse) throws ExchangeInUseException, AMQException; Exchange getExchange(String name); + + void setDefaultExchange(Exchange exchange); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java new file mode 100644 index 0000000000..11b8061105 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -0,0 +1,205 @@ +package org.apache.qpid.server.exchange; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; + +import javax.management.openmbean.*; +import javax.management.JMException; +import javax.management.MBeanException; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; + +public class FanoutExchange extends AbstractExchange +{ + private static final Logger _logger = Logger.getLogger(FanoutExchange.class); + + /** + * Maps from queue name to queue instances + */ + private final CopyOnWriteArraySet _queues = new CopyOnWriteArraySet(); + + /** + * MBean class implementing the management interfaces. + */ + @MBeanDescription("Management Bean for Fanout Exchange") + private final class FanoutExchangeMBean extends ExchangeMBean + { + // open mbean data types for representing exchange bindings + private String[] _bindingItemNames = {"Routing Key", "Queue Names"}; + private String[] _bindingItemIndexNames = {_bindingItemNames[0]}; + private OpenType[] _bindingItemTypes = new OpenType[2]; + private CompositeType _bindingDataType = null; + private TabularType _bindinglistDataType = null; + private TabularDataSupport _bindingList = null; + + @MBeanConstructor("Creates an MBean for AMQ fanout exchange") + public FanoutExchangeMBean() throws JMException + { + super(); + _exchangeType = "fanout"; + init(); + } + + /** + * initialises the OpenType objects. + */ + private void init() throws OpenDataException + { + _bindingItemTypes[0] = SimpleType.STRING; + _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING); + _bindingDataType = new CompositeType("Exchange Binding", "Routing key and Queue names", + _bindingItemNames, _bindingItemNames, _bindingItemTypes); + _bindinglistDataType = new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(), + _bindingDataType, _bindingItemIndexNames); + } + + public TabularData bindings() throws OpenDataException + { + + _bindingList = new TabularDataSupport(_bindinglistDataType); + + for (AMQQueue queue : _queues) + { + String queueName = queue.getName().toString(); + + + + Object[] bindingItemValues = {queueName, new String[] {queueName}}; + CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); + _bindingList.put(bindingData); + } + + return _bindingList; + } + + public void createNewBinding(String queueName, String binding) throws JMException + { + AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName); + if (queue == null) + { + throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); + } + + try + { + registerQueue(binding, queue, null); + queue.bind(binding, FanoutExchange.this); + } + catch (AMQException ex) + { + throw new MBeanException(ex); + } + } + + }// End of MBean class + + + protected ExchangeMBean createMBean() throws AMQException + { + try + { + return new FanoutExchange.FanoutExchangeMBean(); + } + catch (JMException ex) + { + _logger.error("Exception occured in creating the direct exchange mbean", ex); + throw new AMQException("Exception occured in creating the direct exchange mbean", ex); + } + } + + public String getType() + { + return ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + } + + public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + assert queue != null; + + if (_queues.contains(queue)) + { + _logger.debug("Queue " + queue + " is already registered"); + } + else + { + _queues.add(queue); + _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this); + } + } + + public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException + { + assert queue != null; + assert routingKey != null; + + if (!_queues.remove(queue)) + { + throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + + ". "); + } + } + + public void route(AMQMessage payload) throws AMQException + { + MessageTransferBody transferBody = payload.getTransferBody(); + if (_queues == null || _queues.isEmpty()) + { + String msg = "No queues bound to " + this; + // XXX + /*if (publishBody.mandatory) + { + throw new NoRouteException(msg, payload); + } + else*/ + { + _logger.warn(msg); + } + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Publishing message to queue " + _queues); + } + + for (AMQQueue q : _queues) + { + q.deliver(payload); + } + } + } + + public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + { + return _queues.contains(queue); + } + + public boolean isBound(String routingKey) throws AMQException + { + + return _queues != null && !_queues.isEmpty(); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + + + return _queues.contains(queue); + } + + public boolean hasBindings() throws AMQException + { + return !_queues.isEmpty(); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 5534f9d227..60ec059336 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.exchange.ExchangeDefaults; import javax.management.JMException; import javax.management.openmbean.*; @@ -285,4 +286,9 @@ public class HeadersExchange extends AbstractExchange return o instanceof Registration && ((Registration) o).queue.equals(queue); } } + + public String getType() + { + return ExchangeDefaults.HEADERS_EXCHANGE_CLASS; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 273c3881ae..99d0df82cd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -22,9 +22,12 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -68,9 +71,28 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener