summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/etc/virtualhosts.xml110
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java53
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java5
6 files changed, 122 insertions, 54 deletions
diff --git a/java/broker/etc/virtualhosts.xml b/java/broker/etc/virtualhosts.xml
index 3601daacc7..52ff23e090 100644
--- a/java/broker/etc/virtualhosts.xml
+++ b/java/broker/etc/virtualhosts.xml
@@ -24,58 +24,54 @@
<virtualhost>
<name>localhost</name>
<localhost>
- <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
- <maximumMessageCount>5000</maximumMessageCount>
- <queue>
- <name>queue</name>
+ <exchanges>
+ <exchange>
+ <type>direct</type>
+ <name>test.direct</name>
+ <durable>true</durable>
+ </exchange>
+ <exchange>
+ <type>topic</type>
+ <name>test.topic</name>
+ </exchange>
+ </exchanges>
+ <queues>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+
+ <queue>
+ <name>queue</name>
+ </queue>
+ <queue>
+ <name>ping</name>
+ </queue>
<queue>
- <exchange>amq.direct</exchange>
- <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
- <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
- <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ <name>test-queue</name>
+ <test-queue>
+ <exchange>test.direct</exchange>
+ <durable>true</durable>
+ </test-queue>
</queue>
- </queue>
- <queue>
- <name>ping</name>
- <ping>
- <exchange>amq.direct</exchange>
- <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
- <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
- <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
- </ping>
- </queue>
+ <queue>
+ <name>test-ping</name>
+ <test-ping>
+ <exchange>test.direct</exchange>
+ </test-ping>
+ </queue>
+
+ </queues>
</localhost>
</virtualhost>
+
+
<virtualhost>
<name>development</name>
<development>
<minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
<maximumMessageCount>5000</maximumMessageCount>
- <queue>
- <name>queue</name>
- <queue>
- <exchange>amq.direct</exchange>
- <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
- <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
- <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
- </queue>
- </queue>
- <queue>
- <name>ping</name>
- <ping>
- <exchange>amq.direct</exchange>
- <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
- <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
- <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
- </ping>
- </queue>
- </development>
- </virtualhost>
- <virtualhost>
- <name>test</name>
- <test>
- <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
- <maximumMessageCount>5000</maximumMessageCount>
+ <queues>
<queue>
<name>queue</name>
<queue>
@@ -94,6 +90,34 @@
<maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
</ping>
</queue>
+ </queues>
+ </development>
+ </virtualhost>
+ <virtualhost>
+ <name>test</name>
+ <test>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <queues>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </queues>
</test>
</virtualhost>
</virtualhosts>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index bd8f0c9670..af38a9abe5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -32,6 +32,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -71,7 +72,16 @@ public class VirtualHostConfiguration
throw new ConfigurationException("Unknown virtual host: " + virtualHostName);
}
- List queueNames = configuration.getList("queue.name");
+ List exchangeNames = configuration.getList("exchanges.exchange.name");
+
+ for(Object exchangeNameObj : exchangeNames)
+ {
+ String exchangeName = String.valueOf(exchangeNameObj);
+ configureExchange(virtualHost, exchangeName, configuration);
+ }
+
+
+ List queueNames = configuration.getList("queues.queue.name");
for(Object queueNameObj : queueNames)
{
@@ -81,12 +91,49 @@ public class VirtualHostConfiguration
}
+ private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) throws AMQException
+ {
+
+ CompositeConfiguration exchangeConfiguration = new CompositeConfiguration();
+
+ exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString));
+ exchangeConfiguration.addConfiguration(configuration.subset("exchanges"));
+
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MessageStore messageStore = virtualHost.getMessageStore();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+
+ AMQShortString exchangeName = new AMQShortString(exchangeNameString);
+
+
+ Exchange exchange;
+
+
+
+ synchronized (exchangeRegistry)
+ {
+ exchange = exchangeRegistry.getExchange(exchangeName);
+ if(exchange == null)
+ {
+
+ AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type","direct"));
+ boolean durable = exchangeConfiguration.getBoolean("durable",false);
+ boolean autodelete = exchangeConfiguration.getBoolean("autodelete",false);
+
+ Exchange newExchange = exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0);
+ exchangeRegistry.registerExchange(newExchange);
+ }
+
+ }
+ }
+
private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException
{
CompositeConfiguration queueConfiguration = new CompositeConfiguration();
- queueConfiguration.addConfiguration(configuration.subset("queue."+ queueNameString));
- queueConfiguration.addConfiguration(configuration);
+ queueConfiguration.addConfiguration(configuration.subset("queues.queue."+ queueNameString));
+ queueConfiguration.addConfiguration(configuration.subset("queues"));
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MessageStore messageStore = virtualHost.getMessageStore();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 7e3f9857f9..c7803133b3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -55,10 +55,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public void registerExchange(Exchange exchange)
{
- if(_defaultExchange == null)
- {
- setDefaultExchange(exchange);
- }
_exchangeMap.put(exchange.getName(), exchange);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index 7e378dfd01..3798918428 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -63,7 +63,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
// TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
if (body.exchange == null)
{
- body.exchange = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ body.exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
}
VirtualHost vHost = session.getVirtualHost();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 2218ff604f..a35cb9f7d3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -107,7 +107,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
queueRegistry.registerQueue(queue);
if (autoRegister)
{
- Exchange defaultExchange = exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
defaultExchange.registerQueue(body.queue, queue, null);
queue.bind(body.queue, defaultExchange);
_log.info("Queue " + body.queue + " bound to default exchange");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
index 6b6163724c..fa8f13127a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
@@ -28,13 +28,14 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
public class ExchangeInitialiser
{
- public void initialise(ExchangeFactory factory, ExchangeRegistry registry) throws AMQException{
+ public void initialise(ExchangeFactory factory, ExchangeRegistry registry) throws AMQException{
+ define(registry, factory, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
- registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME));
+ registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME));
}
private void define(ExchangeRegistry r, ExchangeFactory f,