diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-02-05 14:43:14 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-05 14:43:14 +0000 |
| commit | 93bd9b2405e5c8d5c4493d621297cc8765785f28 (patch) | |
| tree | 2263f8360596fa66dfc767acdbe74bafa0eda59f /qpid/java/broker | |
| parent | c24eccc88801b77b06842aa0686b6582040630a4 (diff) | |
| download | qpid-python-93bd9b2405e5c8d5c4493d621297cc8765785f28.tar.gz | |
Revision: 503646
Author: rgreig
Date: 11:28:57, 05 February 2007
Message:
(Submitted by Rupert Smith)
This local repository is no longer needed. JUnit-Toolkit snapshot repository is now hosted on sourceforge: http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/. A release is also in progress to the central maven repository.
----
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo
Revision: 503637
Author: rgreig
Date: 11:17:08, 05 February 2007
Message:
(Submitted by Rupert Smith)
Junit-toolkit has now fully migrated onto sourceforge. Snapshot repository location updated.
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/pom.xml
Revision: 503609
Author: ritchiem
Date: 09:49:59, 05 February 2007
Message:
Update to performance testing to allow the use of shared destinations. This allows topics to have multiple consumers and the total message counts updated correctly.
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Revision: 503604
Author: rgreig
Date: 09:40:04, 05 February 2007
Message:
QPID-326 : Patch supplied by Rob Godfrey - add oldest message on queue notification, and log notifications in log file
----
Modified : /incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
Revision: 503593
Author: ritchiem
Date: 08:58:30, 05 February 2007
Message:
Fixed bug in stop(). If a connection is opened not start()ed then closed a NullPointerException will be thrown as the Dispatcher has not been created.
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Revision: 502655
Author: rgreig
Date: 16:59:14, 02 February 2007
Message:
(Submitted by Rupert Smith) Options moved to top of contructor. Were at bottom and not being used!
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Revision: 502627
Author: rgreig
Date: 15:31:30, 02 February 2007
Message:
(Submitted by Rupert Smith)
Fixed problem with losing message results. Was not passing in self generated message correlation id in the async test, to match up replies with.
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Revision: 502620
Author: rgreig
Date: 15:09:08, 02 February 2007
Message:
(Submitted by Rupert Smith)
Perftests improved with better timeout handling. Shared/unique destinations to ping now an option.
TestRunner now runs all per-thread setups, synchs all threads, then runs tests, synchas all threads, then runs tear downs.
----
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.md5
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.sha1
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.md5
Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.sha1
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5
Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5
Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
Revision: 502610
Author: bhupendrab
Date: 14:26:32, 02 February 2007
Message:
QPID-84
tests for FSContextFactory deleted.fscontext.jar is not part of apache svn.
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
Deleted : /incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest
Revision: 502576
Author: ritchiem
Date: 11:13:13, 02 February 2007
Message:
QPID-343 Performance test suite doesn't output missing message count on failure.
Updated PingAsyncTestPerf to output missing messsage count.
Updated PingPongProducer so it doesn't use AMQShortStringx.
----
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@503703 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
14 files changed, 509 insertions, 835 deletions
diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml index a14ac8459b..28a572eac9 100644 --- a/qpid/java/broker/etc/log4j.xml +++ b/qpid/java/broker/etc/log4j.xml @@ -42,6 +42,10 @@ <priority value="info"/> </category> + <category name="org.apache.qpid.server.queue.AMQQueueMBean"> + <priority value="info"/> + </category> + <category name="org.apache.qpid"> <priority value="warn"/> </category> diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml index de6a8c0682..2a573535de 100644 --- a/qpid/java/broker/etc/virtualhosts.xml +++ b/qpid/java/broker/etc/virtualhosts.xml @@ -21,8 +21,79 @@ --> <virtualhosts> <virtualhost> - <path>/development</path> - <bind>direct://amq.direct//queue</bind> - <bind>direct://amq.direct//ping</bind> + <name>localhost</name> + + <localhost> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> + <queue> + <name>queue</name> + <queue> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </queue> + </queue> + <queue> + <name>ping</name> + <ping> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </ping> + </queue> + </localhost> + </virtualhost> + <virtualhost> + <name>development</name> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> + <development> + <queue> + <name>queue</name> + <queue> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </queue> + </queue> + <queue> + <name>ping</name> + <ping> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </ping> + </queue> + </development> + </virtualhost> + <virtualhost> + <name>test</name> + <minimumAlertRepeatGap>30000</minimumAlertRepeatGap> + <maximumMessageCount>5000</maximumMessageCount> + <test> + <queue> + <name>queue</name> + <queue> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </queue> + </queue> + <queue> + <name>ping</name> + <ping> + <exchange>amq.direct</exchange> + <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb --> + <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb --> + <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins --> + </ping> + </queue> + </test> </virtualhost> </virtualhosts> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 9ecbf3d31a..a433351509 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -32,9 +32,13 @@ import org.apache.qpid.AMQException; import org.apache.log4j.Logger; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.CompositeConfiguration; import java.util.Collection; +import java.util.List; +import java.util.Collections; public class VirtualHostConfiguration { @@ -42,11 +46,7 @@ public class VirtualHostConfiguration XMLConfiguration _config; - private static final String XML_VIRTUALHOST = "virtualhost"; - private static final String XML_PATH = "path"; - private static final String XML_BIND = "bind"; - private static final String XML_VIRTUALHOST_PATH = "virtualhost.path"; - private static final String XML_VIRTUALHOST_BIND = "virtualhost.bind"; + private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost."; public VirtualHostConfiguration(String configFile) throws ConfigurationException @@ -55,135 +55,58 @@ public class VirtualHostConfiguration _config = new XMLConfiguration(configFile); - if (_config.getProperty(XML_VIRTUALHOST_PATH) == null) - { - throw new ConfigurationException( - "Virtualhost Configuration document does not contain a valid virtualhost."); - } } - public void performBindings() throws AMQException, ConfigurationException, URLSyntaxException - { - Object prop = _config.getProperty(XML_VIRTUALHOST_PATH); - if (prop instanceof Collection) - { - _logger.debug("Number of VirtualHosts: " + ((Collection) prop).size()); - int virtualhosts = ((Collection) prop).size(); - for (int vhost = 0; vhost < virtualhosts; vhost++) - { - loadVirtualHost(vhost); - } - } - else - { - loadVirtualHost(-1); - } - } - - private void loadVirtualHost(int index) throws AMQException, ConfigurationException, URLSyntaxException + private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException { - String path = XML_VIRTUALHOST; + _logger.debug("Loding configuration for virtaulhost: "+virtualHostName); - if (index != -1) + if(virtualHostName == null) { - path = path + "(" + index + ")"; - } - - Object prop = _config.getProperty(path + "." + XML_PATH); - - if (prop == null) - { - prop = _config.getProperty(path + "." + XML_BIND); - String error = "Virtual Host not defined for binding"; - - if (prop != null) - { - if (prop instanceof Collection) - { - error += "s"; - } - - error += ": " + prop; - } - - throw new ConfigurationException(error); + throw new ConfigurationException("Unknown virtual host: " + virtualHostName); } - _logger.info("VirtualHost:'" + prop + "'"); + List queueNames = configuration.getList("queue.name"); - prop = _config.getProperty(path + "." + XML_BIND); - if (prop instanceof Collection) + for(Object queueNameObj : queueNames) { - int bindings = ((Collection) prop).size(); - _logger.debug("Number of Bindings: " + bindings); - for (int dest = 0; dest < bindings; dest++) - { - loadBinding(path, dest); - } + String queueName = String.valueOf(queueNameObj); + configureQueue(queueName, configuration); } - else - { - loadBinding(path, -1); - } - } - private void loadBinding(String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException - { - String path = rootpath + "." + XML_BIND; - if (index != -1) - { - path = path + "(" + index + ")"; - } - - String bindingString = _config.getString(path); - - AMQBindingURL binding = new AMQBindingURL(bindingString); - - _logger.debug("Loaded Binding:" + binding); - - try - { - bind(binding); - } - catch (AMQException amqe) - { - _logger.info("Unable to bind url: " + binding); - throw amqe; - } } - private void bind(AMQBindingURL binding) throws AMQException, ConfigurationException + private void configureQueue(String queueName, Configuration configuration) throws AMQException, ConfigurationException { + CompositeConfiguration queueConfiguration = new CompositeConfiguration(); - String queueName = binding.getQueueName(); - - // This will occur if the URL is a Topic - if (queueName == null) - { - //todo register valid topic - ///queueName = binding.getDestinationName(); - throw new AMQException("Topics cannot be bound. TODO Register valid topic"); - } + queueConfiguration.addConfiguration(configuration.subset("queue."+ queueName)); + queueConfiguration.addConfiguration(configuration); - //Get references to Broker Registries QueueRegistry queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry(); MessageStore messageStore = ApplicationRegistry.getInstance().getMessageStore(); ExchangeRegistry exchangeRegistry = ApplicationRegistry.getInstance().getExchangeRegistry(); + AMQQueue queue; + synchronized (queueRegistry) { - AMQQueue queue = queueRegistry.getQueue(queueName); + queue = queueRegistry.getQueue(queueName); if (queue == null) { - _logger.info("Queue '" + binding.getQueueName() + "' does not exists. Creating."); + _logger.info("Creating queue '" + queueName + "' [on virtual host ]" ); + + boolean durable = queueConfiguration.getBoolean("durable" ,false); + boolean autodelete = queueConfiguration.getBoolean("autodelete", false); + String owner = queueConfiguration.getString("owner", null); queue = new AMQQueue(queueName, - Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)), - null /* These queues will have no owner */, - false /* Therefore autodelete makes no sence */, queueRegistry); + durable, + owner == null ? null : owner /* These queues will have no owner */, + autodelete /* Therefore autodelete makes no sence */, queueRegistry); if (queue.isDurable()) { @@ -194,27 +117,67 @@ public class VirtualHostConfiguration } else { - _logger.info("Queue '" + binding.getQueueName() + "' already exists not creating."); + _logger.info("Queue '" + queueName + "' already exists [on virtual host ], not creating."); } - Exchange defaultExchange = exchangeRegistry.getExchange(binding.getExchangeName()); - synchronized (defaultExchange) + String exchangeName = queueConfiguration.getString("exchange", null); + + Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : exchangeName); + + if(exchange == null) { - if (defaultExchange == null) - { - throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + binding); - } + exchange = exchangeRegistry.getDefaultExchange(); + } - defaultExchange.registerQueue(queue.getName(), queue, null); + if (exchange == null) + { + throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); + } - if (binding.getRoutingKey() == null || binding.getRoutingKey().equals("")) + synchronized (exchange) + { + List routingKeys = queueConfiguration.getList("routingKey"); + if(routingKeys == null || routingKeys.isEmpty()) { - throw new ConfigurationException("Unknown binding not specified on url:" + binding); + routingKeys = Collections.singletonList(queue.getName()); } - queue.bind(binding.getRoutingKey(), defaultExchange); + for(Object routingKey : routingKeys) + { + exchange.registerQueue((String)routingKey, queue, null); + + queue.bind((String)routingKey, exchange); + + _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'"); + } } - _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + binding.getExchangeName() + " RK:'" + binding.getRoutingKey() + "'"); + } + + + Configurator.configure(queue);//, queueConfiguration); } + + + public void performBindings() throws AMQException, ConfigurationException + { + List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name"); + + _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames); + + for(Object nameObject : virtualHostNames) + { + String name = String.valueOf(nameObject); + configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name)); + } + + if (virtualHostNames == null || virtualHostNames.isEmpty()) + { + throw new ConfigurationException( + "Virtualhost Configuration document does not contain a valid virtualhost."); + } + } + + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index eb9d1acb59..92dc0fa43e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,10 +20,10 @@ */ package org.apache.qpid.server.exchange; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.protocol.ExchangeInitialiser; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.log4j.Logger; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -37,6 +37,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry */ private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>(); + private Exchange _defaultExchange; + public DefaultExchangeRegistry(ExchangeFactory exchangeFactory) { //create 'standard' exchanges: @@ -52,9 +54,23 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void registerExchange(Exchange exchange) { + if(_defaultExchange == null) + { + setDefaultExchange(exchange); + } _exchangeMap.put(exchange.getName(), exchange); } + public void setDefaultExchange(Exchange exchange) + { + _defaultExchange = exchange; + } + + public Exchange getDefaultExchange() + { + return _defaultExchange; + } + public void unregisterExchange(String name, boolean inUse) throws AMQException { // TODO: check inUse argument @@ -71,7 +87,16 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public Exchange getExchange(String name) { - return _exchangeMap.get(name); + + if(name == null || name.length() == 0) + { + return _defaultExchange; + } + else + { + return _exchangeMap.get(name); + } + } /** @@ -82,14 +107,15 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void routeContent(AMQMessage payload) throws AMQException { final String exchange = payload.getPublishBody().exchange; - final Exchange exch = _exchangeMap.get(exchange); + final Exchange exch = getExchange(exchange); // there is a small window of opportunity for the exchange to be deleted in between - // the JmsPublish being received (where the exchange is validated) and the final + // the BasicPublish being received (where the exchange is validated) and the final // content body being received (which triggers this method) + // TODO: check where the exchange is validated if (exch == null) { throw new AMQException("Exchange '" + exchange + "' does not exist"); } exch.route(payload); } -} +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index 4a0a6a0ee1..1d32ee17a9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -37,4 +37,6 @@ public interface ExchangeRegistry extends MessageRouter void unregisterExchange(String name, boolean inUse) throws ExchangeInUseException, AMQException; Exchange getExchange(String name); + + Exchange getDefaultExchange(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index afe4ea95b9..8603113c11 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.message.MessageDecorator; import org.apache.qpid.server.message.jms.JMSMessage; import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.List; @@ -43,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap; */ public class AMQMessage { + private static final Logger _log = Logger.getLogger(AMQMessage.class); + public static final String JMS_MESSAGE = "jms.message"; private final Set<Object> _tokens = new HashSet<Object>(); @@ -61,6 +64,8 @@ public class AMQMessage private final AtomicInteger _referenceCount = new AtomicInteger(1); + private long _arrivalTime; + /** * Keeps a track of how many bytes we have received in body frames */ @@ -157,20 +162,20 @@ public class AMQMessage public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag) { - + AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()]; // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. allFrames[0] = BasicDeliverBody.createAMQFrame(channel, - (byte)8, (byte)0, // AMQP version (major, minor) - consumerTag, // consumerTag - deliveryTag, // deliveryTag - getExchangeName(), // exchange - _redelivered, // redelivered - getRoutingKey() // routingKey - ); + (byte) 8, (byte) 0, // AMQP version (major, minor) + consumerTag, // consumerTag + deliveryTag, // deliveryTag + getExchangeName(), // exchange + _redelivered, // redelivered + getRoutingKey() // routingKey + ); allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); for (int i = 2; i < allFrames.length; i++) { @@ -201,6 +206,8 @@ public class AMQMessage public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException { _contentHeaderBody = contentHeaderBody; + _arrivalTime = System.currentTimeMillis(); + if (_storeWhenComplete && isAllContentReceived()) { storeMessage(); @@ -223,6 +230,7 @@ public class AMQMessage _bodyLengthReceived += contentBody.getSize(); if (_storeWhenComplete && isAllContentReceived()) { + _arrivalTime = System.currentTimeMillis(); storeMessage(); } } @@ -263,6 +271,12 @@ public class AMQMessage _redelivered = redelivered; } + + public long getArrivalTime() + { + return _arrivalTime; + } + public long getMessageId() { return _messageId; @@ -299,6 +313,7 @@ public class AMQMessage throw new MessageCleanupException(_messageId, e); } } + } public void setPublisher(AMQProtocolSession publisher) @@ -367,11 +382,17 @@ public class AMQMessage return _txnBuffer; } + public long getSize() + { + return getContentHeaderBody().bodySize; + } + /** * Called to enforce the 'immediate' flag. + * * @throws NoConsumersException if the message is marked for - * immediate delivery but has not been marked as delivered to a - * consumer + * immediate delivery but has not been marked as delivered to a + * consumer */ public void checkDeliveredToConsumer() throws NoConsumersException { @@ -393,7 +414,8 @@ public class AMQMessage /** * Called selectors to determin if the message has already been sent - * @return _deliveredToConsumer + * + * @return _deliveredToConsumer */ public boolean getDeliveredToConsumer() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 101a2833a0..d8bacc8c7d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; @@ -90,22 +91,36 @@ public class AMQQueue implements Managable, Comparable /** * max allowed size(KB) of a single message */ - private long _maxMessageSize = 10000; + private long _maximumMessageSize = 10000; /** * max allowed number of messages on a queue. */ - private Integer _maxMessageCount = 10000; + @Configured(path = "maximumMessageCount", defaultValue = "0") + public int _maximumMessageCount; /** - * max queue depth(KB) for the queue + * max queue depth for the queue */ - private long _maxQueueDepth = 10000000; + @Configured(path = "maximumQueueDepth", defaultValue = "0") + public long _maximumQueueDepth = 10000000; + + /* + * maximum message age before alerts occur + */ + @Configured(path = "maximumMessageAge", defaultValue = "0") + public long _maximumMessageAge = 30000; //0 + + /* + * the minimum interval between sending out consequetive alerts of the same type + */ + @Configured(path = "minimumAlertRepeatGap", defaultValue = "0") + public long _minimumAlertRepeatGap = 30000; /** * total messages received by the queue since startup. */ - private long _totalMessagesReceived = 0; + public long _totalMessagesReceived = 0; public int compareTo(Object o) { @@ -183,35 +198,13 @@ public class AMQQueue implements Managable, Comparable _autoDelete = autoDelete; _queueRegistry = queueRegistry; _asyncDelivery = asyncDelivery; + _managedObject = createMBean(); _managedObject.register(); + _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; - - //fixme - Make this configurable via the broker config.xml - if (System.getProperties().getProperty("deliverymanager") != null) - { - if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager")) - { - _logger.info("Using ConcurrentSelectorDeliveryManager"); - _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); - } - else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager")) - { - _logger.info("Using ConcurrentDeliveryManager"); - _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this); - } - else - { - _logger.info("Using SynchronizedDeliveryManager"); - _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this); - } - } - else - { - _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager"); - _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); - } + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); } private AMQQueueMBean createMBean() throws AMQException @@ -267,6 +260,11 @@ public class AMQQueue implements Managable, Comparable return _deliveryMgr.getMessages(); } + public long getQueueDepth() + { + return _deliveryMgr.getTotalMessageSize(); + } + /** * @param messageId * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. @@ -295,50 +293,55 @@ public class AMQQueue implements Managable, Comparable return _managedObject; } - public Long getMaximumMessageSize() + public long getMaximumMessageSize() { - return _maxMessageSize; + return _maximumMessageSize; } - public void setMaximumMessageSize(Long value) + public void setMaximumMessageSize(long value) { - _maxMessageSize = value; + _maximumMessageSize = value; } - public Integer getConsumerCount() + public int getConsumerCount() { return _subscribers.size(); } - public Integer getActiveConsumerCount() + public int getActiveConsumerCount() { return _subscribers.getWeight(); } - public Long getReceivedMessageCount() + public long getReceivedMessageCount() { return _totalMessagesReceived; } - public Integer getMaximumMessageCount() + public int getMaximumMessageCount() { - return _maxMessageCount; + return _maximumMessageCount; } - public void setMaximumMessageCount(Integer value) + public void setMaximumMessageCount(int value) { - _maxMessageCount = value; + _maximumMessageCount = value; } - public Long getMaximumQueueDepth() + public long getMaximumQueueDepth() { - return _maxQueueDepth; + return _maximumQueueDepth; } // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(Long value) + public void setMaximumQueueDepth(long value) { - _maxQueueDepth = value; + _maximumQueueDepth = value; + } + + public long getOldestMessageArrivalTime() + { + return _deliveryMgr.getOldestMessageArrival(); } /** @@ -374,11 +377,11 @@ public class AMQQueue implements Managable, Comparable Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); - if(subscription.hasFilters()) + if (subscription.hasFilters()) { if (_deliveryMgr.hasQueuedMessages()) { - _deliveryMgr.populatePreDeliveryQueue(subscription); + _deliveryMgr.populatePreDeliveryQueue(subscription); } } @@ -551,6 +554,27 @@ public class AMQQueue implements Managable, Comparable } } + public long getMinimumAlertRepeatGap() + { + return _minimumAlertRepeatGap; + } + + public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap) + { + _minimumAlertRepeatGap = minimumAlertRepeatGap; + } + + public long getMaximumMessageAge() + { + return _maximumMessageAge; + } + + public void setMaximumMessageAge(long maximumMessageAge) + { + _maximumMessageAge = maximumMessageAge; + } + + private class Deliver implements TxnOp { private final AMQMessage _msg; @@ -591,4 +615,5 @@ public class AMQQueue implements Managable, Comparable } } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index a914975e00..f5ecf6ba55 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.mina.common.ByteBuffer; +import org.apache.log4j.Logger; import javax.management.openmbean.*; import javax.management.JMException; @@ -41,8 +42,11 @@ import java.util.ArrayList; * for an AMQQueue. */ @MBeanDescription("Management Interface for AMQQueue") -public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue +public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener { + + private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class); + private AMQQueue _queue = null; private String _queueName = null; // OpenMBean data types for viewMessages method @@ -51,12 +55,14 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. - + // OpenMBean data types for viewMessageContent method private static CompositeType _msgContentType = null; private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"}; private static OpenType[] _msgContentAttributeTypes = new OpenType[4]; - + + private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; + @MBeanConstructor("Creates an MBean exposing an AMQQueue") public AMQQueueMBean(AMQQueue queue) throws JMException { @@ -71,7 +77,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue { init(); } - catch(JMException ex) + catch (JMException ex) { // It should never occur System.out.println(ex.getMessage()); @@ -88,7 +94,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, - _msgContentAttributes, _msgContentAttributeTypes); + _msgContentAttributes, _msgContentAttributeTypes); _msgAttributeTypes[0] = SimpleType.LONG; // For message id _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes @@ -215,35 +221,31 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue */ public void checkForNotification(AMQMessage msg) { - // Check for threshold message count - Integer msgCount = getMessageCount(); - if (msgCount >= getMaximumMessageCount()) - { - notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value"); - } + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); - // Check for threshold message size - long messageSize = getMessageSize(msg); - if (messageSize >= _queue.getMaximumMessageSize()) + for (NotificationCheck check : NotificationCheck.values()) { - notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value"); - } - - // Check for threshold queue depth in bytes - long queueDepth = getQueueDepth(); - if (queueDepth >= _queue.getMaximumQueueDepth()) - { - notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value"); + if (check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()] < thresholdTime) + { + if (check.notifyIfNecessary(msg, _queue, this)) + { + _lastNotificationTimes[check.ordinal()] = currentTime; + } + } } } /** * Sends the notification to the listeners */ - private void notifyClients(String notificationMsg) + public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg) { + // important : add log to the log file - monitoring tools may be looking for this + _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg); + Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, - ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); + ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); _broadcaster.sendNotification(n); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java deleted file mode 100644 index 022d3b9635..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ /dev/null @@ -1,357 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; -import org.apache.qpid.configuration.Configured; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.server.configuration.Configurator; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.atomic.AtomicBoolean; - - -/** - * Manages delivery of messages on behalf of a queue - */ -public class ConcurrentDeliveryManager implements DeliveryManager -{ - private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class); - - @Configured(path = "advanced.compressBufferOnQueue", - defaultValue = "false") - public boolean compressBufferOnQueue; - /** - * Holds any queued messages - */ - private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); - //private int _messageCount; - /** - * Ensures that only one asynchronous task is running for this manager at - * any time. - */ - private final AtomicBoolean _processing = new AtomicBoolean(); - /** - * The subscriptions on the queue to whom messages are delivered - */ - private final SubscriptionManager _subscriptions; - - /** - * A reference to the queue we are delivering messages for. We need this to be able - * to pass the code that handles acknowledgements a handle on the queue. - */ - private final AMQQueue _queue; - - - /** - * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced - * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered - * via the async thread. - * <p/> - * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. - */ - private ReentrantLock _lock = new ReentrantLock(); - - - ConcurrentDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) - { - - //Set values from configuration - Configurator.configure(this); - - if (compressBufferOnQueue) - { - _log.info("Compressing Buffers on queue."); - } - - _subscriptions = subscriptions; - _queue = queue; - } - - /** - * @return boolean if we are queueing - */ - private boolean queueing() - { - return hasQueuedMessages(); - } - - - /** - * @param msg to enqueue - * @return true if we are queue this message - */ - private boolean enqueue(AMQMessage msg) - { - if (msg.isImmediate()) - { - return false; - } - else - { - _lock.lock(); - try - { - if (queueing()) - { - return addMessageToQueue(msg); - } - else - { - return false; - } - } - finally - { - _lock.unlock(); - } - } - } - - private void startQueueing(AMQMessage msg) - { - if (!msg.isImmediate()) - { - addMessageToQueue(msg); - } - } - - private boolean addMessageToQueue(AMQMessage msg) - { - // Shrink the ContentBodies to their actual size to save memory. - if (compressBufferOnQueue) - { - Iterator it = msg.getContentBodies().iterator(); - while (it.hasNext()) - { - ContentBody cb = (ContentBody) it.next(); - cb.reduceBufferToFit(); - } - } - - _messages.offer(msg); - - return true; - } - - - public boolean hasQueuedMessages() - { - - _lock.lock(); - try - { - return !_messages.isEmpty(); - } - finally - { - _lock.unlock(); - } - - - } - - public int getQueueMessageCount() - { - return getMessageCount(); - } - - /** - * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size. - * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue. - * - * @return int the number of messages in the delivery queue. - */ - private int getMessageCount() - { - return _messages.size(); - } - - - public synchronized List<AMQMessage> getMessages() - { - return new ArrayList<AMQMessage>(_messages); - } - - public void populatePreDeliveryQueue(Subscription subscription) - { - //no-op . This DM has no PreDeliveryQueues - } - - public synchronized void removeAMessageFromTop() throws AMQException - { - AMQMessage msg = poll(); - if (msg != null) - { - msg.dequeue(_queue); - } - } - - public synchronized void clearAllMessages() throws AMQException - { - AMQMessage msg = poll(); - while (msg != null) - { - msg.dequeue(_queue); - msg = poll(); - } - } - - /** - * Only one thread should ever execute this method concurrently, but - * it can do so while other threads invoke deliver(). - */ - private void processQueue() - { - try - { - boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); - AMQMessage message = peek(); - - //While we have messages to send and subscribers to send them to. - while (message != null && hasSubscribers) - { - // _log.debug("Have messages(" + _messages.size() + ") and subscribers"); - Subscription next = _subscriptions.nextSubscriber(message); - //FIXME Is there still not the chance that this subscribe could be suspended between here and the send? - - //We don't synchronize access to subscribers so need to re-check - if (next != null) - { - next.send(message, _queue); - poll(); - message = peek(); - } - else - { - hasSubscribers = false; - } - } - } - catch (FailedDequeueException e) - { - _log.error("Unable to deliver message as dequeue failed: " + e, e); - } - finally - { - _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers()); - } - } - - private AMQMessage peek() - { - return _messages.peek(); - } - - private AMQMessage poll() - { - return _messages.poll(); - } - - Runner asyncDelivery = new Runner(); - - public void processAsync(Executor executor) - { - _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + - " Active:" + _subscriptions.hasActiveSubscribers() + - " Processing:" + _processing.get()); - - if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) - { - //are we already running? if so, don't re-run - if (_processing.compareAndSet(false, true)) - { - // Do we need this? - // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok. - //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown()) - { - executor.execute(asyncDelivery); - } - } - } - } - - public void deliver(String name, AMQMessage msg) throws FailedDequeueException - { - // first check whether we are queueing, and enqueue if we are - if (!enqueue(msg)) - { - // not queueing so deliver message to 'next' subscriber - _lock.lock(); - try - { - Subscription s = _subscriptions.nextSubscriber(msg); - if (s == null) - { - if (!msg.isImmediate()) - { - // no subscribers yet so enter 'queueing' mode and queue this message - startQueueing(msg); - } - } - else - { - s.send(msg, _queue); - } - } - finally - { - _lock.unlock(); - } - } - } - - private class Runner implements Runnable - { - public void run() - { - boolean running = true; - while (running) - { - processQueue(); - - //Check that messages have not been added since we did our last peek(); - // Synchronize with the thread that adds to the queue. - // If the queue is still empty then we can exit - _lock.lock(); - try - { - if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers())) - { - running = false; - _processing.set(false); - } - } - finally - { - _lock.unlock(); - } - } - } - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index f09e8213b1..9efeb8351c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -34,6 +34,7 @@ import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** @@ -76,7 +77,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. */ private ReentrantLock _lock = new ReentrantLock(); - + private AtomicLong _totalMessageSize = new AtomicLong(); ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -109,6 +110,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _messages.offer(msg); + _totalMessageSize.addAndGet(msg.getSize()); + return true; } @@ -142,6 +145,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return _messages.size(); } + public long getTotalMessageSize() + { + return _totalMessageSize.get(); + } + + public long getOldestMessageArrival() + { + AMQMessage msg = _messages.peek(); + return msg == null ? Long.MAX_VALUE : msg.getArrivalTime(); + } + public synchronized List<AMQMessage> getMessages() { @@ -173,6 +187,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (msg != null) { msg.dequeue(_queue); + _totalMessageSize.addAndGet(-msg.getSize()); } } @@ -182,6 +197,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager while (msg != null) { msg.dequeue(_queue); + _totalMessageSize.set(0L); msg = poll(); } } @@ -222,6 +238,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //remove sent message from our queue. messageQueue.poll(); + _totalMessageSize.addAndGet(-message.getSize()); } catch (FailedDequeueException e) { @@ -308,7 +325,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //Pre Deliver to all subscriptions if (_log.isDebugEnabled()) { - _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + + _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to."); } for (Subscription sub : _subscriptions.getSubscriptions()) @@ -330,7 +347,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + + _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); } sub.enqueueForPreDelivery(msg); @@ -345,7 +362,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { - _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s); } //Deliver the message diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index cac499587f..28386dfa45 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -64,7 +64,8 @@ interface DeliveryManager * * @param name the name of the entity on whose behalf we are delivering the message * @param msg the message to deliver - * @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued + * @throws org.apache.qpid.server.queue.FailedDequeueException + * if the message could not be dequeued */ void deliver(String name, AMQMessage msg) throws FailedDequeueException; @@ -75,4 +76,8 @@ interface DeliveryManager List<AMQMessage> getMessages(); void populatePreDeliveryQueue(Subscription subscription); + + long getTotalMessageSize(); + + long getOldestMessageArrival(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java new file mode 100644 index 0000000000..8e9b3804f2 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -0,0 +1,135 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; + +public enum NotificationCheck +{ + + MESSAGE_COUNT_ALERT + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + int msgCount = queue.getMessageCount(); + final int maximumMessageCount = queue.getMaximumMessageCount(); + if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount) + { + listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached."); + return true; + } + return false; + } + }, + MESSAGE_SIZE_ALERT(true) + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + final long maximumMessageSize = queue.getMaximumMessageSize(); + if(maximumMessageSize != 0) + { + // Check for threshold message size + long messageSize; +// try +// { + messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize; +// } +// catch (AMQException e) +// { +// messageSize = 0; +// } + + + if (messageSize >= maximumMessageSize) + { + listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]"); + return true; + } + } + return false; + } + + }, + QUEUE_DEPTH_ALERT + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + // Check for threshold queue depth in bytes + final long maximumQueueDepth = queue.getMaximumQueueDepth(); + + if(maximumQueueDepth != 0) + { + final long queueDepth = queue.getQueueDepth(); + + if (queueDepth >= maximumQueueDepth) + { + listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached."); + return true; + } + } + return false; + } + + }, + MESSAGE_AGE_ALERT + { + boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) + { + + final long maxMessageAge = queue.getMaximumMessageAge(); + if(maxMessageAge != 0) + { + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - maxMessageAge; + final long firstArrivalTime = queue.getOldestMessageArrivalTime(); + + if(firstArrivalTime < thresholdTime) + { + long oldestAge = currentTime - firstArrivalTime; + listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached."); + + return true; + } + } + return false; + + } + + } + ; + + private final boolean _messageSpecific; + + NotificationCheck() + { + this(false); + } + + NotificationCheck(boolean messageSpecific) + { + _messageSpecific = messageSpecific; + } + + public boolean isMessageSpecific() + { + return _messageSpecific; + } + + abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java new file mode 100644 index 0000000000..bd9d7f6b11 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java @@ -0,0 +1,23 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +public interface QueueNotificationListener +{ + void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java deleted file mode 100644 index c967ea2cde..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; - -import java.util.LinkedList; -import java.util.Queue; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Manages delivery of messages on behalf of a queue - */ -class SynchronizedDeliveryManager implements DeliveryManager -{ - private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class); - - /** - * Holds any queued messages - */ - private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>(); - /** - * Ensures that only one asynchronous task is running for this manager at - * any time. - */ - private final AtomicBoolean _processing = new AtomicBoolean(); - /** - * The subscriptions on the queue to whom messages are delivered - */ - private final SubscriptionManager _subscriptions; - - /** - * An indication of the mode we are in. If this is true then messages are - * being queued up in _messages for asynchronous delivery. If it is false - * then messages can be delivered directly as they come in. - */ - private volatile boolean _queueing; - - /** - * A reference to the queue we are delivering messages for. We need this to be able - * to pass the code that handles acknowledgements a handle on the queue. - */ - private final AMQQueue _queue; - - SynchronizedDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) - { - _subscriptions = subscriptions; - _queue = queue; - } - - private synchronized boolean enqueue(AMQMessage msg) - { - if (msg.isImmediate()) - { - return false; - } - else - { - if (_queueing) - { - _messages.offer(msg); - return true; - } - else - { - return false; - } - } - } - - private synchronized void startQueueing(AMQMessage msg) - { - _queueing = true; - enqueue(msg); - } - - /** - * Determines whether there are queued messages. Sets _queueing to false if - * there are no queued messages. This needs to be atomic. - * - * @return true if there are queued messages - */ - public synchronized boolean hasQueuedMessages() - { - boolean empty = _messages.isEmpty(); - if (empty) - { - _queueing = false; - } - return !empty; - } - - public synchronized int getQueueMessageCount() - { - return _messages.size(); - } - - public synchronized List<AMQMessage> getMessages() - { - return new ArrayList<AMQMessage>(_messages); - } - - public void populatePreDeliveryQueue(Subscription subscription) - { - //no-op . This DM has no PreDeliveryQueues - } - - public synchronized void removeAMessageFromTop() throws AMQException - { - AMQMessage msg = poll(); - if (msg != null) - { - msg.dequeue(_queue); - } - } - - public synchronized void clearAllMessages() throws AMQException - { - AMQMessage msg = poll(); - while (msg != null) - { - msg.dequeue(_queue); - msg = poll(); - } - } - - /** - * Only one thread should ever execute this method concurrently, but - * it can do so while other threads invoke deliver(). - */ - private void processQueue() - { - try - { - boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); - while (hasQueuedMessages() && hasSubscribers) - { - Subscription next = _subscriptions.nextSubscriber(peek()); - //We don't synchronize access to subscribers so need to re-check - if (next != null) - { - try - { - next.send(poll(), _queue); - } - catch (AMQException e) - { - _log.error("Unable to deliver message: " + e, e); - } - } - else - { - hasSubscribers = false; - } - } - } - finally - { - _processing.set(false); - } - } - - private synchronized AMQMessage peek() - { - return _messages.peek(); - } - - private synchronized AMQMessage poll() - { - return _messages.poll(); - } - - /** - * Requests that the delivery manager start processing the queue asynchronously - * if there is work that can be done (i.e. there are messages queued up and - * subscribers that can receive them. - * <p/> - * This should be called when subscribers are added, but only after the consume-ok - * message has been returned as message delivery may start immediately. It should also - * be called after unsuspending a client. - * <p/> - * - * @param executor the executor on which the delivery should take place - */ - public void processAsync(Executor executor) - { - if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) - { - //are we already running? if so, don't re-run - if (_processing.compareAndSet(false, true)) - { - // Do we need this? - // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok. - //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown()) - { - executor.execute(new Runner()); - } - } - } - } - - /** - * Handles message delivery. The delivery manager is always in one of two modes; - * it is either queueing messages for asynchronous delivery or delivering - * directly. - * - * @param name the name of the entity on whose behalf we are delivering the message - * @param msg the message to deliver - * @throws NoConsumersException if there are no active subscribers to deliver - * the message to - */ - public void deliver(String name, AMQMessage msg) throws FailedDequeueException - { - // first check whether we are queueing, and enqueue if we are - if (!enqueue(msg)) - { - synchronized(this) - { - // not queueing so deliver message to 'next' subscriber - Subscription s = _subscriptions.nextSubscriber(msg); - if (s == null) - { - // no subscribers yet so enter 'queueing' mode and queue this message - startQueueing(msg); - } - else - { - s.send(msg, _queue); - } - } - } - - } - - private class Runner implements Runnable - { - public void run() - { - processQueue(); - } - } -} |
