From d21feec9fb0b72ccf363051094c48d771c4ba6f4 Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Wed, 10 Jan 2007 00:11:27 +0000 Subject: QPID-271 : (Patch supplied by Rob Godfrey) Implement fanout exchange git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@494658 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/exchange/DefaultExchangeFactory.java | 8 +- .../qpid/server/exchange/FanoutExchange.java | 206 +++++++++++++++++++++ .../qpid/server/protocol/ExchangeInitialiser.java | 1 + .../org/apache/qpid/exchange/ExchangeDefaults.java | 4 + 4 files changed, 218 insertions(+), 1 deletion(-) create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 222cd2aef2..77f9819048 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -22,6 +22,9 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -39,6 +42,8 @@ public class DefaultExchangeFactory implements ExchangeFactory _exchangeClassMap.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestNameExchange.class); _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class); _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class); + _exchangeClassMap.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.FanoutExchange.class); + } public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, @@ -48,7 +53,8 @@ public class DefaultExchangeFactory implements ExchangeFactory Class exchClass = _exchangeClassMap.get(type); if (exchClass == null) { - throw new AMQException(_logger, "Unknown exchange type: " + type); + + throw new AMQUnknownExchangeType("Unknown exchange type: " + type); } try { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java new file mode 100644 index 0000000000..2e7457e4a6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -0,0 +1,206 @@ +package org.apache.qpid.server.exchange; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; + +import javax.management.openmbean.*; +import javax.management.JMException; +import javax.management.MBeanException; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; + +public class FanoutExchange extends AbstractExchange +{ + private static final Logger _logger = Logger.getLogger(FanoutExchange.class); + + /** + * Maps from queue name to queue instances + */ + private final CopyOnWriteArraySet _queues = new CopyOnWriteArraySet(); + + /** + * MBean class implementing the management interfaces. + */ + @MBeanDescription("Management Bean for Fanout Exchange") + private final class FanoutExchangeMBean extends ExchangeMBean + { + // open mbean data types for representing exchange bindings + private String[] _bindingItemNames = {"Routing Key", "Queue Names"}; + private String[] _bindingItemIndexNames = {_bindingItemNames[0]}; + private OpenType[] _bindingItemTypes = new OpenType[2]; + private CompositeType _bindingDataType = null; + private TabularType _bindinglistDataType = null; + private TabularDataSupport _bindingList = null; + + @MBeanConstructor("Creates an MBean for AMQ direct exchange") + public FanoutExchangeMBean() throws JMException + { + super(); + _exchangeType = "fanout"; + init(); + } + + /** + * initialises the OpenType objects. + */ + private void init() throws OpenDataException + { + _bindingItemTypes[0] = SimpleType.STRING; + _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING); + _bindingDataType = new CompositeType("Exchange Binding", "Routing key and Queue names", + _bindingItemNames, _bindingItemNames, _bindingItemTypes); + _bindinglistDataType = new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(), + _bindingDataType, _bindingItemIndexNames); + } + + public TabularData bindings() throws OpenDataException + { + + _bindingList = new TabularDataSupport(_bindinglistDataType); + + for (AMQQueue queue : _queues) + { + String queueName = queue.getName().toString(); + + + + Object[] bindingItemValues = {queueName, new String[] {queueName}}; + CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); + _bindingList.put(bindingData); + } + + return _bindingList; + } + + public void createNewBinding(String queueName, String binding) throws JMException + { + AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName)); + if (queue == null) + { + throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); + } + + try + { + registerQueue(new AMQShortString(binding), queue, null); + queue.bind(new AMQShortString(binding), FanoutExchange.this); + } + catch (AMQException ex) + { + throw new MBeanException(ex); + } + } + + }// End of MBean class + + + protected ExchangeMBean createMBean() throws AMQException + { + try + { + return new FanoutExchange.FanoutExchangeMBean(); + } + catch (JMException ex) + { + _logger.error("Exception occured in creating the direct exchange mbean", ex); + throw new AMQException("Exception occured in creating the direct exchange mbean", ex); + } + } + + public AMQShortString getType() + { + return ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + } + + public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + assert queue != null; + + if (_queues.contains(queue)) + { + _logger.debug("Queue " + queue + " is already registered"); + } + else + { + _queues.add(queue); + _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this); + } + } + + public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException + { + assert queue != null; + assert routingKey != null; + + if (!_queues.remove(queue)) + { + throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + + ". "); + } + } + + public void route(AMQMessage payload) throws AMQException + { + final BasicPublishBody publishBody = payload.getPublishBody(); + final AMQShortString routingKey = publishBody.routingKey; + if (_queues == null || _queues.isEmpty()) + { + String msg = "No queues bound to " + this; + if (publishBody.mandatory) + { + throw new NoRouteException(msg, payload); + } + else + { + _logger.warn(msg); + } + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Publishing message to queue " + _queues); + } + + for (AMQQueue q : _queues) + { + payload.enqueue(q); + } + } + } + + public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException + { + return _queues.contains(queue); + } + + public boolean isBound(AMQShortString routingKey) throws AMQException + { + + return _queues != null && !_queues.isEmpty(); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + + + return _queues.contains(queue); + } + + public boolean hasBindings() throws AMQException + { + return !_queues.isEmpty(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java index d4881aefaf..e1fac55d3b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java @@ -32,6 +32,7 @@ public class ExchangeInitialiser define(registry, factory, ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); } private void define(ExchangeRegistry r, ExchangeFactory f, diff --git a/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java index 729cdb871e..57159f3802 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -35,4 +35,8 @@ public class ExchangeDefaults public final static AMQShortString HEADERS_EXCHANGE_NAME = new AMQShortString("amq.match"); public final static AMQShortString HEADERS_EXCHANGE_CLASS = new AMQShortString("headers"); + + public final static AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout"); + + public final static AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout"); } -- cgit v1.2.1