diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2007-02-16 23:11:41 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2007-02-16 23:11:41 +0000 |
| commit | dd8df96fcca8f5f9dcbe91ba012cff400a38daa7 (patch) | |
| tree | ee84d98ec82abd31dd486f98fea1cb6bdb526db5 /java/broker | |
| parent | 6213309b7c179fdddfeca0273d5c1f6592adedd7 (diff) | |
| download | qpid-python-dd8df96fcca8f5f9dcbe91ba012cff400a38daa7.tar.gz | |
QPID-375 : remove assumptions on standard exchanges (amq.direct, amq.topic, etc), allow other exchanges to be created through virtualhosts.xml
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508649 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
6 files changed, 122 insertions, 54 deletions
diff --git a/java/broker/etc/virtualhosts.xml b/java/broker/etc/virtualhosts.xml index 3601daacc7..52ff23e090 100644 --- a/java/broker/etc/virtualhosts.xml +++ b/java/broker/etc/virtualhosts.xml @@ -24,58 +24,54 @@ <virtualhost> <name>localhost</name> <localhost> - <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> - <maximumMessageCount>5000</maximumMessageCount> - <queue> - <name>queue</name> + <exchanges> + <exchange> + <type>direct</type> + <name>test.direct</name> + <durable>true</durable> + </exchange> + <exchange> + <type>topic</type> + <name>test.topic</name> + </exchange> + </exchanges> + <queues> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + + <queue> + <name>queue</name> + </queue> + <queue> + <name>ping</name> + </queue> <queue> - <exchange>amq.direct</exchange> - <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> - <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> - <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + <name>test-queue</name> + <test-queue> + <exchange>test.direct</exchange> + <durable>true</durable> + </test-queue> </queue> - </queue> - <queue> - <name>ping</name> - <ping> - <exchange>amq.direct</exchange> - <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> - <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> - <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> - </ping> - </queue> + <queue> + <name>test-ping</name> + <test-ping> + <exchange>test.direct</exchange> + </test-ping> + </queue> + + </queues> </localhost> </virtualhost> + + <virtualhost> <name>development</name> <development> <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> <maximumMessageCount>5000</maximumMessageCount> - <queue> - <name>queue</name> - <queue> - <exchange>amq.direct</exchange> - <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> - <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> - <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> - </queue> - </queue> - <queue> - <name>ping</name> - <ping> - <exchange>amq.direct</exchange> - <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> - <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> - <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> - </ping> - </queue> - </development> - </virtualhost> - <virtualhost> - <name>test</name> - <test> - <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> - <maximumMessageCount>5000</maximumMessageCount> + <queues> <queue> <name>queue</name> <queue> @@ -94,6 +90,34 @@ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> </ping> </queue> + </queues> + </development> + </virtualhost> + <virtualhost> + <name>test</name> + <test> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> + <queues> + <queue> + <name>queue</name> + <queue> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </queue> + </queue> + <queue> + <name>ping</name> + <ping> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </ping> + </queue> + </queues> </test> </virtualhost> </virtualhosts> diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index bd8f0c9670..af38a9abe5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -32,6 +32,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -71,7 +72,16 @@ public class VirtualHostConfiguration throw new ConfigurationException("Unknown virtual host: " + virtualHostName); } - List queueNames = configuration.getList("queue.name"); + List exchangeNames = configuration.getList("exchanges.exchange.name"); + + for(Object exchangeNameObj : exchangeNames) + { + String exchangeName = String.valueOf(exchangeNameObj); + configureExchange(virtualHost, exchangeName, configuration); + } + + + List queueNames = configuration.getList("queues.queue.name"); for(Object queueNameObj : queueNames) { @@ -81,12 +91,49 @@ public class VirtualHostConfiguration } + private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) throws AMQException + { + + CompositeConfiguration exchangeConfiguration = new CompositeConfiguration(); + + exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString)); + exchangeConfiguration.addConfiguration(configuration.subset("exchanges")); + + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + MessageStore messageStore = virtualHost.getMessageStore(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + + AMQShortString exchangeName = new AMQShortString(exchangeNameString); + + + Exchange exchange; + + + + synchronized (exchangeRegistry) + { + exchange = exchangeRegistry.getExchange(exchangeName); + if(exchange == null) + { + + AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type","direct")); + boolean durable = exchangeConfiguration.getBoolean("durable",false); + boolean autodelete = exchangeConfiguration.getBoolean("autodelete",false); + + Exchange newExchange = exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0); + exchangeRegistry.registerExchange(newExchange); + } + + } + } + private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException { CompositeConfiguration queueConfiguration = new CompositeConfiguration(); - queueConfiguration.addConfiguration(configuration.subset("queue."+ queueNameString)); - queueConfiguration.addConfiguration(configuration); + queueConfiguration.addConfiguration(configuration.subset("queues.queue."+ queueNameString)); + queueConfiguration.addConfiguration(configuration.subset("queues")); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); MessageStore messageStore = virtualHost.getMessageStore(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 7e3f9857f9..c7803133b3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -55,10 +55,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void registerExchange(Exchange exchange) { - if(_defaultExchange == null) - { - setDefaultExchange(exchange); - } _exchangeMap.put(exchange.getName(), exchange); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index 7e378dfd01..3798918428 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -63,7 +63,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? if (body.exchange == null) { - body.exchange = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + body.exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME; } VirtualHost vHost = session.getVirtualHost(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 2218ff604f..a35cb9f7d3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -107,7 +107,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar queueRegistry.registerQueue(queue); if (autoRegister) { - Exchange defaultExchange = exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); + Exchange defaultExchange = exchangeRegistry.getDefaultExchange(); defaultExchange.registerQueue(body.queue, queue, null); queue.bind(body.queue, defaultExchange); _log.info("Queue " + body.queue + " bound to default exchange"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java index 6b6163724c..fa8f13127a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java @@ -28,13 +28,14 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; public class ExchangeInitialiser { - public void initialise(ExchangeFactory factory, ExchangeRegistry registry) throws AMQException{ + public void initialise(ExchangeFactory factory, ExchangeRegistry registry) throws AMQException{ + define(registry, factory, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); - registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME)); + registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME)); } private void define(ExchangeRegistry r, ExchangeFactory f, |
