diff options
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java | 203 |
1 files changed, 162 insertions, 41 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 5d2a31b80d..8a8cbd23cf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -20,17 +20,11 @@ */ package org.apache.qpid.server.virtualhost; -import java.util.Collections; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; - -import javax.management.NotCompliantMBeanException; - import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQBrokerManagerMBean; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; @@ -47,10 +41,9 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.QueueBackingStore; import org.apache.qpid.server.queue.FileQueueBackingStoreFactory; import org.apache.qpid.server.queue.QueueBackingStoreFactory; +import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.routing.RoutingTable; import org.apache.qpid.server.security.access.ACLManager; @@ -59,11 +52,17 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.transactionlog.TransactionLog; +import javax.management.NotCompliantMBeanException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + public class VirtualHost implements Accessable { private static final Logger _logger = Logger.getLogger(VirtualHost.class); - private final String _name; private ConnectionRegistry _connectionRegistry; @@ -87,7 +86,7 @@ public class VirtualHost implements Accessable private ACLManager _accessManager; private final Timer _houseKeepingTimer; - + private VirtualHostConfiguration _configuration; private QueueBackingStoreFactory _queueBackingStoreFactory; @@ -114,7 +113,7 @@ public class VirtualHost implements Accessable public VirtualHostConfiguration getConfiguration() { - return _configuration ; + return _configuration; } public QueueBackingStoreFactory getQueueBackingStoreFactory() @@ -148,12 +147,13 @@ public class VirtualHost implements Accessable return VirtualHost.this; } - } // End of MBean class /** * Normal Constructor + * * @param hostConfig + * * @throws Exception */ public VirtualHost(VirtualHostConfiguration hostConfig) throws Exception @@ -165,7 +165,7 @@ public class VirtualHost implements Accessable { _configuration = hostConfig; _name = hostConfig.getName(); - + if (_name == null || _name.length() == 0) { throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost."); @@ -175,12 +175,30 @@ public class VirtualHost implements Accessable _connectionRegistry = new ConnectionRegistry(this); - _houseKeepingTimer = new Timer("Queue-housekeeping-"+_name, true); + _houseKeepingTimer = new Timer("Queue-housekeeping-" + _name, true); _queueRegistry = new DefaultQueueRegistry(this); + _exchangeFactory = new DefaultExchangeFactory(this); + _exchangeFactory.initialise(hostConfig); + _exchangeRegistry = new DefaultExchangeRegistry(this); + _queueBackingStoreFactory = new FileQueueBackingStoreFactory(); + _queueBackingStoreFactory.configure(this, hostConfig); + + //Create a temporary RT to store the durable entries from the config file + // so we can replay them in to the real _RT after it has been loaded. + /// This should be removed after the _RT has been fully split from the the TL + + StartupRoutingTable configFileRT = new StartupRoutingTable(); + + _routingTable = configFileRT; + + // This needs to be after the RT has been defined as it creates the default durable exchanges. + _exchangeRegistry.initialise(); + initialiseModel(hostConfig); + if (transactionLog != null) { _transactionLog = transactionLog; @@ -195,19 +213,28 @@ public class VirtualHost implements Accessable initialiseRoutingTable(hostConfig); } - _queueBackingStoreFactory = new FileQueueBackingStoreFactory(); - _queueBackingStoreFactory.configure(this, hostConfig); + //Now that the RT has been initialised loop through the persistent queues/exchanges created from the config + // file and write them in to the new routing Table. + for (StartupRoutingTable.CreateQueueTuple cqt : configFileRT.queue) + { + _routingTable.createQueue(cqt.queue, cqt.arguments); + } - _exchangeFactory.initialise(hostConfig); - _exchangeRegistry.initialise(); + for (Exchange exchange : configFileRT.exchange) + { + _routingTable.createExchange(exchange); + } + + for (StartupRoutingTable.CreateBindingTuple cbt : configFileRT.bindings) + { + _routingTable.bindQueue(cbt.exchange, cbt.routingKey, cbt.queue, cbt.arguments); + } - initialiseModel(hostConfig); - _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig); _accessManager = ApplicationRegistry.getInstance().getAccessManager(); _accessManager.configureHostPlugins(hostConfig.getSecurityConfiguration()); - + _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); @@ -216,13 +243,13 @@ public class VirtualHost implements Accessable private void initialiseHouseKeeping(long period) { /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ - if(period != 0L) + if (period != 0L) { class RemoveExpiredMessagesTask extends TimerTask { public void run() { - for(AMQQueue q : _queueRegistry.getQueues()) + for (AMQQueue q : _queueRegistry.getQueues()) { try @@ -231,7 +258,7 @@ public class VirtualHost implements Accessable } catch (AMQException e) { - _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e); + _logger.error("Exception in housekeeping for queue: " + q.getName().toString(), e); throw new RuntimeException(e); } } @@ -239,8 +266,8 @@ public class VirtualHost implements Accessable } _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(), - period/2, - period); + period / 2, + period); } } @@ -259,10 +286,10 @@ public class VirtualHost implements Accessable } _transactionLog = (TransactionLog) o; - //Assign RoutingTable as old MessageStores converted to TransactionLog may require the _routingTable. + //Assign RoutingTable as old MessageStores converted to TransactionLog will require the _routingTable. if (_transactionLog instanceof RoutingTable) { - _routingTable = (RoutingTable)_transactionLog; + _routingTable = (RoutingTable) _transactionLog; } _transactionLog.configure(this, "store", config); @@ -294,14 +321,14 @@ public class VirtualHost implements Accessable } } } - + private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException { - _logger.debug("Loading configuration for virtualhost: "+config.getName()); + _logger.debug("Loading configuration for virtualhost: " + config.getName()); List exchangeNames = config.getExchanges(); - for(Object exchangeNameObj : exchangeNames) + for (Object exchangeNameObj : exchangeNames) { String exchangeName = String.valueOf(exchangeNameObj); configureExchange(config.getExchangeConfiguration(exchangeName)); @@ -309,7 +336,7 @@ public class VirtualHost implements Accessable String[] queueNames = config.getQueueNames(); - for(Object queueNameObj : queueNames) + for (Object queueNameObj : queueNames) { String queueName = String.valueOf(queueNameObj); configureQueue(config.getQueueConfiguration(queueName)); @@ -322,14 +349,14 @@ public class VirtualHost implements Accessable Exchange exchange; exchange = _exchangeRegistry.getExchange(exchangeName); - if(exchange == null) + if (exchange == null) { AMQShortString type = new AMQShortString(exchangeConfiguration.getType()); boolean durable = exchangeConfiguration.getDurable(); boolean autodelete = exchangeConfiguration.getAutoDelete(); - Exchange newExchange = _exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0); + Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0); _exchangeRegistry.registerExchange(newExchange); } } @@ -347,7 +374,7 @@ public class VirtualHost implements Accessable Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); - if(exchange == null) + if (exchange == null) { exchange = _exchangeRegistry.getDefaultExchange(); } @@ -358,19 +385,22 @@ public class VirtualHost implements Accessable } List routingKeys = queueConfiguration.getRoutingKeys(); - if(routingKeys == null || routingKeys.isEmpty()) + if (routingKeys == null || routingKeys.isEmpty()) { routingKeys = Collections.singletonList(queue.getName()); } - for(Object routingKeyNameObj : routingKeys) + for (Object routingKeyNameObj : routingKeys) { AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); + if (_logger.isInfoEnabled()) + { + _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this); + } queue.bind(exchange, routingKey, null); - _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'"); } - if(exchange != _exchangeRegistry.getDefaultExchange()) + if (exchange != _exchangeRegistry.getDefaultExchange()) { queue.bind(_exchangeRegistry.getDefaultExchange(), queue.getName(), null); } @@ -414,7 +444,7 @@ public class VirtualHost implements Accessable public ACLManager getAccessManager() { return _accessManager; - } + } public void close() throws Exception { @@ -453,4 +483,95 @@ public class VirtualHost implements Accessable { return _virtualHostMBean; } + + /** + * Temporary Startup RT class to record the creation of persistent queues / exchanges. + * + * + * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded. + * This should be removed after the _RT has been fully split from the the TL + */ + private class StartupRoutingTable implements RoutingTable + { + public List<Exchange> exchange = new LinkedList<Exchange>(); + public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>(); + public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>(); + + public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + { + } + + public void close() throws Exception + { + } + + public void createExchange(Exchange exchange) throws AMQException + { + if (exchange.isDurable()) + { + this.exchange.add(exchange); + } + } + + public void removeExchange(Exchange exchange) throws AMQException + { + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + if (exchange.isDurable() && queue.isDurable()) + { + bindings.add(new CreateBindingTuple(exchange, routingKey, queue, args)); + } + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + } + + public void createQueue(AMQQueue queue) throws AMQException + { + createQueue(queue, null); + } + + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + { + if (queue.isDurable()) + { + this.queue.add(new CreateQueueTuple(queue, arguments)); + } + } + + public void removeQueue(AMQQueue queue) throws AMQException + { + } + + private class CreateQueueTuple + { + public AMQQueue queue; + public FieldTable arguments; + + public CreateQueueTuple(AMQQueue queue, FieldTable arguments) + { + this.queue = queue; + this.arguments = arguments; + } + } + + private class CreateBindingTuple + { + public AMQQueue queue; + public FieldTable arguments; + public Exchange exchange; + public AMQShortString routingKey; + + public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) + { + this.exchange = exchange; + this.routingKey = routingKey; + this.queue = queue; + arguments = args; + } + } + } } |
