summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java203
1 files changed, 162 insertions, 41 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 5d2a31b80d..8a8cbd23cf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -20,17 +20,11 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.Collections;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.management.NotCompliantMBeanException;
-
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -47,10 +41,9 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.QueueBackingStore;
import org.apache.qpid.server.queue.FileQueueBackingStoreFactory;
import org.apache.qpid.server.queue.QueueBackingStoreFactory;
+import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.server.security.access.ACLManager;
@@ -59,11 +52,17 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.transactionlog.TransactionLog;
+import javax.management.NotCompliantMBeanException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
public class VirtualHost implements Accessable
{
private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
private final String _name;
private ConnectionRegistry _connectionRegistry;
@@ -87,7 +86,7 @@ public class VirtualHost implements Accessable
private ACLManager _accessManager;
private final Timer _houseKeepingTimer;
-
+
private VirtualHostConfiguration _configuration;
private QueueBackingStoreFactory _queueBackingStoreFactory;
@@ -114,7 +113,7 @@ public class VirtualHost implements Accessable
public VirtualHostConfiguration getConfiguration()
{
- return _configuration ;
+ return _configuration;
}
public QueueBackingStoreFactory getQueueBackingStoreFactory()
@@ -148,12 +147,13 @@ public class VirtualHost implements Accessable
return VirtualHost.this;
}
-
} // End of MBean class
/**
* Normal Constructor
+ *
* @param hostConfig
+ *
* @throws Exception
*/
public VirtualHost(VirtualHostConfiguration hostConfig) throws Exception
@@ -165,7 +165,7 @@ public class VirtualHost implements Accessable
{
_configuration = hostConfig;
_name = hostConfig.getName();
-
+
if (_name == null || _name.length() == 0)
{
throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
@@ -175,12 +175,30 @@ public class VirtualHost implements Accessable
_connectionRegistry = new ConnectionRegistry(this);
- _houseKeepingTimer = new Timer("Queue-housekeeping-"+_name, true);
+ _houseKeepingTimer = new Timer("Queue-housekeeping-" + _name, true);
_queueRegistry = new DefaultQueueRegistry(this);
+
_exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeFactory.initialise(hostConfig);
+
_exchangeRegistry = new DefaultExchangeRegistry(this);
+ _queueBackingStoreFactory = new FileQueueBackingStoreFactory();
+ _queueBackingStoreFactory.configure(this, hostConfig);
+
+ //Create a temporary RT to store the durable entries from the config file
+ // so we can replay them in to the real _RT after it has been loaded.
+ /// This should be removed after the _RT has been fully split from the the TL
+
+ StartupRoutingTable configFileRT = new StartupRoutingTable();
+
+ _routingTable = configFileRT;
+
+ // This needs to be after the RT has been defined as it creates the default durable exchanges.
+ _exchangeRegistry.initialise();
+ initialiseModel(hostConfig);
+
if (transactionLog != null)
{
_transactionLog = transactionLog;
@@ -195,19 +213,28 @@ public class VirtualHost implements Accessable
initialiseRoutingTable(hostConfig);
}
- _queueBackingStoreFactory = new FileQueueBackingStoreFactory();
- _queueBackingStoreFactory.configure(this, hostConfig);
+ //Now that the RT has been initialised loop through the persistent queues/exchanges created from the config
+ // file and write them in to the new routing Table.
+ for (StartupRoutingTable.CreateQueueTuple cqt : configFileRT.queue)
+ {
+ _routingTable.createQueue(cqt.queue, cqt.arguments);
+ }
- _exchangeFactory.initialise(hostConfig);
- _exchangeRegistry.initialise();
+ for (Exchange exchange : configFileRT.exchange)
+ {
+ _routingTable.createExchange(exchange);
+ }
+
+ for (StartupRoutingTable.CreateBindingTuple cbt : configFileRT.bindings)
+ {
+ _routingTable.bindQueue(cbt.exchange, cbt.routingKey, cbt.queue, cbt.arguments);
+ }
- initialiseModel(hostConfig);
-
_authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig);
_accessManager = ApplicationRegistry.getInstance().getAccessManager();
_accessManager.configureHostPlugins(hostConfig.getSecurityConfiguration());
-
+
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
@@ -216,13 +243,13 @@ public class VirtualHost implements Accessable
private void initialiseHouseKeeping(long period)
{
/* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
- if(period != 0L)
+ if (period != 0L)
{
class RemoveExpiredMessagesTask extends TimerTask
{
public void run()
{
- for(AMQQueue q : _queueRegistry.getQueues())
+ for (AMQQueue q : _queueRegistry.getQueues())
{
try
@@ -231,7 +258,7 @@ public class VirtualHost implements Accessable
}
catch (AMQException e)
{
- _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
+ _logger.error("Exception in housekeeping for queue: " + q.getName().toString(), e);
throw new RuntimeException(e);
}
}
@@ -239,8 +266,8 @@ public class VirtualHost implements Accessable
}
_houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
- period/2,
- period);
+ period / 2,
+ period);
}
}
@@ -259,10 +286,10 @@ public class VirtualHost implements Accessable
}
_transactionLog = (TransactionLog) o;
- //Assign RoutingTable as old MessageStores converted to TransactionLog may require the _routingTable.
+ //Assign RoutingTable as old MessageStores converted to TransactionLog will require the _routingTable.
if (_transactionLog instanceof RoutingTable)
{
- _routingTable = (RoutingTable)_transactionLog;
+ _routingTable = (RoutingTable) _transactionLog;
}
_transactionLog.configure(this, "store", config);
@@ -294,14 +321,14 @@ public class VirtualHost implements Accessable
}
}
}
-
+
private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
{
- _logger.debug("Loading configuration for virtualhost: "+config.getName());
+ _logger.debug("Loading configuration for virtualhost: " + config.getName());
List exchangeNames = config.getExchanges();
- for(Object exchangeNameObj : exchangeNames)
+ for (Object exchangeNameObj : exchangeNames)
{
String exchangeName = String.valueOf(exchangeNameObj);
configureExchange(config.getExchangeConfiguration(exchangeName));
@@ -309,7 +336,7 @@ public class VirtualHost implements Accessable
String[] queueNames = config.getQueueNames();
- for(Object queueNameObj : queueNames)
+ for (Object queueNameObj : queueNames)
{
String queueName = String.valueOf(queueNameObj);
configureQueue(config.getQueueConfiguration(queueName));
@@ -322,14 +349,14 @@ public class VirtualHost implements Accessable
Exchange exchange;
exchange = _exchangeRegistry.getExchange(exchangeName);
- if(exchange == null)
+ if (exchange == null)
{
AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
boolean durable = exchangeConfiguration.getDurable();
boolean autodelete = exchangeConfiguration.getAutoDelete();
- Exchange newExchange = _exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0);
+ Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
_exchangeRegistry.registerExchange(newExchange);
}
}
@@ -347,7 +374,7 @@ public class VirtualHost implements Accessable
Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
- if(exchange == null)
+ if (exchange == null)
{
exchange = _exchangeRegistry.getDefaultExchange();
}
@@ -358,19 +385,22 @@ public class VirtualHost implements Accessable
}
List routingKeys = queueConfiguration.getRoutingKeys();
- if(routingKeys == null || routingKeys.isEmpty())
+ if (routingKeys == null || routingKeys.isEmpty())
{
routingKeys = Collections.singletonList(queue.getName());
}
- for(Object routingKeyNameObj : routingKeys)
+ for (Object routingKeyNameObj : routingKeys)
{
AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this);
+ }
queue.bind(exchange, routingKey, null);
- _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
}
- if(exchange != _exchangeRegistry.getDefaultExchange())
+ if (exchange != _exchangeRegistry.getDefaultExchange())
{
queue.bind(_exchangeRegistry.getDefaultExchange(), queue.getName(), null);
}
@@ -414,7 +444,7 @@ public class VirtualHost implements Accessable
public ACLManager getAccessManager()
{
return _accessManager;
- }
+ }
public void close() throws Exception
{
@@ -453,4 +483,95 @@ public class VirtualHost implements Accessable
{
return _virtualHostMBean;
}
+
+ /**
+ * Temporary Startup RT class to record the creation of persistent queues / exchanges.
+ *
+ *
+ * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded.
+ * This should be removed after the _RT has been fully split from the the TL
+ */
+ private class StartupRoutingTable implements RoutingTable
+ {
+ public List<Exchange> exchange = new LinkedList<Exchange>();
+ public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
+ public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>();
+
+ public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ {
+ }
+
+ public void close() throws Exception
+ {
+ }
+
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ if (exchange.isDurable())
+ {
+ this.exchange.add(exchange);
+ }
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ if (exchange.isDurable() && queue.isDurable())
+ {
+ bindings.add(new CreateBindingTuple(exchange, routingKey, queue, args));
+ }
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ createQueue(queue, null);
+ }
+
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ {
+ if (queue.isDurable())
+ {
+ this.queue.add(new CreateQueueTuple(queue, arguments));
+ }
+ }
+
+ public void removeQueue(AMQQueue queue) throws AMQException
+ {
+ }
+
+ private class CreateQueueTuple
+ {
+ public AMQQueue queue;
+ public FieldTable arguments;
+
+ public CreateQueueTuple(AMQQueue queue, FieldTable arguments)
+ {
+ this.queue = queue;
+ this.arguments = arguments;
+ }
+ }
+
+ private class CreateBindingTuple
+ {
+ public AMQQueue queue;
+ public FieldTable arguments;
+ public Exchange exchange;
+ public AMQShortString routingKey;
+
+ public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ {
+ this.exchange = exchange;
+ this.routingKey = routingKey;
+ this.queue = queue;
+ arguments = args;
+ }
+ }
+ }
}