summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-05 14:43:14 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-05 14:43:14 +0000
commit93bd9b2405e5c8d5c4493d621297cc8765785f28 (patch)
tree2263f8360596fa66dfc767acdbe74bafa0eda59f /qpid/java/broker
parentc24eccc88801b77b06842aa0686b6582040630a4 (diff)
downloadqpid-python-93bd9b2405e5c8d5c4493d621297cc8765785f28.tar.gz
Revision: 503646
Author: rgreig Date: 11:28:57, 05 February 2007 Message: (Submitted by Rupert Smith) This local repository is no longer needed. JUnit-Toolkit snapshot repository is now hosted on sourceforge: http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/. A release is also in progress to the central maven repository. ---- Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo Revision: 503637 Author: rgreig Date: 11:17:08, 05 February 2007 Message: (Submitted by Rupert Smith) Junit-toolkit has now fully migrated onto sourceforge. Snapshot repository location updated. ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/pom.xml Revision: 503609 Author: ritchiem Date: 09:49:59, 05 February 2007 Message: Update to performance testing to allow the use of shared destinations. This allows topics to have multiple consumers and the total message counts updated correctly. ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Revision: 503604 Author: rgreig Date: 09:40:04, 05 February 2007 Message: QPID-326 : Patch supplied by Rob Godfrey - add oldest message on queue notification, and log notifications in log file ---- Modified : /incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Revision: 503593 Author: ritchiem Date: 08:58:30, 05 February 2007 Message: Fixed bug in stop(). If a connection is opened not start()ed then closed a NullPointerException will be thrown as the Dispatcher has not been created. ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Revision: 502655 Author: rgreig Date: 16:59:14, 02 February 2007 Message: (Submitted by Rupert Smith) Options moved to top of contructor. Were at bottom and not being used! ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Revision: 502627 Author: rgreig Date: 15:31:30, 02 February 2007 Message: (Submitted by Rupert Smith) Fixed problem with losing message results. Was not passing in self generated message correlation id in the async test, to match up replies with. ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Revision: 502620 Author: rgreig Date: 15:09:08, 02 February 2007 Message: (Submitted by Rupert Smith) Perftests improved with better timeout handling. Shared/unique destinations to ping now an option. TestRunner now runs all per-thread setups, synchs all threads, then runs tests, synchas all threads, then runs tear downs. ---- Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.md5 Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.sha1 Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.md5 Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1 Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java Revision: 502610 Author: bhupendrab Date: 14:26:32, 02 February 2007 Message: QPID-84 tests for FSContextFactory deleted.fscontext.jar is not part of apache svn. ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java Deleted : /incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest Revision: 502576 Author: ritchiem Date: 11:13:13, 02 February 2007 Message: QPID-343 Performance test suite doesn't output missing message count on failure. Updated PingAsyncTestPerf to output missing messsage count. Updated PingPongProducer so it doesn't use AMQShortStringx. ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@503703 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/etc/log4j.xml4
-rw-r--r--qpid/java/broker/etc/virtualhosts.xml77
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java197
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java40
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java119
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java50
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java357
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java25
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java135
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java264
14 files changed, 509 insertions, 835 deletions
diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml
index a14ac8459b..28a572eac9 100644
--- a/qpid/java/broker/etc/log4j.xml
+++ b/qpid/java/broker/etc/log4j.xml
@@ -42,6 +42,10 @@
<priority value="info"/>
</category>
+ <category name="org.apache.qpid.server.queue.AMQQueueMBean">
+ <priority value="info"/>
+ </category>
+
<category name="org.apache.qpid">
<priority value="warn"/>
</category>
diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml
index de6a8c0682..2a573535de 100644
--- a/qpid/java/broker/etc/virtualhosts.xml
+++ b/qpid/java/broker/etc/virtualhosts.xml
@@ -21,8 +21,79 @@
-->
<virtualhosts>
<virtualhost>
- <path>/development</path>
- <bind>direct://amq.direct//queue</bind>
- <bind>direct://amq.direct//ping</bind>
+ <name>localhost</name>
+
+ <localhost>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </localhost>
+ </virtualhost>
+ <virtualhost>
+ <name>development</name>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <development>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </development>
+ </virtualhost>
+ <virtualhost>
+ <name>test</name>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <test>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </test>
</virtualhost>
</virtualhosts>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 9ecbf3d31a..a433351509 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -32,9 +32,13 @@ import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.CompositeConfiguration;
import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
public class VirtualHostConfiguration
{
@@ -42,11 +46,7 @@ public class VirtualHostConfiguration
XMLConfiguration _config;
- private static final String XML_VIRTUALHOST = "virtualhost";
- private static final String XML_PATH = "path";
- private static final String XML_BIND = "bind";
- private static final String XML_VIRTUALHOST_PATH = "virtualhost.path";
- private static final String XML_VIRTUALHOST_BIND = "virtualhost.bind";
+ private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost.";
public VirtualHostConfiguration(String configFile) throws ConfigurationException
@@ -55,135 +55,58 @@ public class VirtualHostConfiguration
_config = new XMLConfiguration(configFile);
- if (_config.getProperty(XML_VIRTUALHOST_PATH) == null)
- {
- throw new ConfigurationException(
- "Virtualhost Configuration document does not contain a valid virtualhost.");
- }
}
- public void performBindings() throws AMQException, ConfigurationException, URLSyntaxException
- {
- Object prop = _config.getProperty(XML_VIRTUALHOST_PATH);
- if (prop instanceof Collection)
- {
- _logger.debug("Number of VirtualHosts: " + ((Collection) prop).size());
- int virtualhosts = ((Collection) prop).size();
- for (int vhost = 0; vhost < virtualhosts; vhost++)
- {
- loadVirtualHost(vhost);
- }
- }
- else
- {
- loadVirtualHost(-1);
- }
- }
-
- private void loadVirtualHost(int index) throws AMQException, ConfigurationException, URLSyntaxException
+ private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException
{
- String path = XML_VIRTUALHOST;
+ _logger.debug("Loding configuration for virtaulhost: "+virtualHostName);
- if (index != -1)
+ if(virtualHostName == null)
{
- path = path + "(" + index + ")";
- }
-
- Object prop = _config.getProperty(path + "." + XML_PATH);
-
- if (prop == null)
- {
- prop = _config.getProperty(path + "." + XML_BIND);
- String error = "Virtual Host not defined for binding";
-
- if (prop != null)
- {
- if (prop instanceof Collection)
- {
- error += "s";
- }
-
- error += ": " + prop;
- }
-
- throw new ConfigurationException(error);
+ throw new ConfigurationException("Unknown virtual host: " + virtualHostName);
}
- _logger.info("VirtualHost:'" + prop + "'");
+ List queueNames = configuration.getList("queue.name");
- prop = _config.getProperty(path + "." + XML_BIND);
- if (prop instanceof Collection)
+ for(Object queueNameObj : queueNames)
{
- int bindings = ((Collection) prop).size();
- _logger.debug("Number of Bindings: " + bindings);
- for (int dest = 0; dest < bindings; dest++)
- {
- loadBinding(path, dest);
- }
+ String queueName = String.valueOf(queueNameObj);
+ configureQueue(queueName, configuration);
}
- else
- {
- loadBinding(path, -1);
- }
- }
- private void loadBinding(String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException
- {
- String path = rootpath + "." + XML_BIND;
- if (index != -1)
- {
- path = path + "(" + index + ")";
- }
-
- String bindingString = _config.getString(path);
-
- AMQBindingURL binding = new AMQBindingURL(bindingString);
-
- _logger.debug("Loaded Binding:" + binding);
-
- try
- {
- bind(binding);
- }
- catch (AMQException amqe)
- {
- _logger.info("Unable to bind url: " + binding);
- throw amqe;
- }
}
- private void bind(AMQBindingURL binding) throws AMQException, ConfigurationException
+ private void configureQueue(String queueName, Configuration configuration) throws AMQException, ConfigurationException
{
+ CompositeConfiguration queueConfiguration = new CompositeConfiguration();
- String queueName = binding.getQueueName();
-
- // This will occur if the URL is a Topic
- if (queueName == null)
- {
- //todo register valid topic
- ///queueName = binding.getDestinationName();
- throw new AMQException("Topics cannot be bound. TODO Register valid topic");
- }
+ queueConfiguration.addConfiguration(configuration.subset("queue."+ queueName));
+ queueConfiguration.addConfiguration(configuration);
- //Get references to Broker Registries
QueueRegistry queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
MessageStore messageStore = ApplicationRegistry.getInstance().getMessageStore();
ExchangeRegistry exchangeRegistry = ApplicationRegistry.getInstance().getExchangeRegistry();
+ AMQQueue queue;
+
synchronized (queueRegistry)
{
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- _logger.info("Queue '" + binding.getQueueName() + "' does not exists. Creating.");
+ _logger.info("Creating queue '" + queueName + "' [on virtual host ]" );
+
+ boolean durable = queueConfiguration.getBoolean("durable" ,false);
+ boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
+ String owner = queueConfiguration.getString("owner", null);
queue = new AMQQueue(queueName,
- Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)),
- null /* These queues will have no owner */,
- false /* Therefore autodelete makes no sence */, queueRegistry);
+ durable,
+ owner == null ? null : owner /* These queues will have no owner */,
+ autodelete /* Therefore autodelete makes no sence */, queueRegistry);
if (queue.isDurable())
{
@@ -194,27 +117,67 @@ public class VirtualHostConfiguration
}
else
{
- _logger.info("Queue '" + binding.getQueueName() + "' already exists not creating.");
+ _logger.info("Queue '" + queueName + "' already exists [on virtual host ], not creating.");
}
- Exchange defaultExchange = exchangeRegistry.getExchange(binding.getExchangeName());
- synchronized (defaultExchange)
+ String exchangeName = queueConfiguration.getString("exchange", null);
+
+ Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : exchangeName);
+
+ if(exchange == null)
{
- if (defaultExchange == null)
- {
- throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + binding);
- }
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
- defaultExchange.registerQueue(queue.getName(), queue, null);
+ if (exchange == null)
+ {
+ throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
+ }
- if (binding.getRoutingKey() == null || binding.getRoutingKey().equals(""))
+ synchronized (exchange)
+ {
+ List routingKeys = queueConfiguration.getList("routingKey");
+ if(routingKeys == null || routingKeys.isEmpty())
{
- throw new ConfigurationException("Unknown binding not specified on url:" + binding);
+ routingKeys = Collections.singletonList(queue.getName());
}
- queue.bind(binding.getRoutingKey(), defaultExchange);
+ for(Object routingKey : routingKeys)
+ {
+ exchange.registerQueue((String)routingKey, queue, null);
+
+ queue.bind((String)routingKey, exchange);
+
+ _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
+ }
}
- _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + binding.getExchangeName() + " RK:'" + binding.getRoutingKey() + "'");
+
}
+
+
+ Configurator.configure(queue);//, queueConfiguration);
}
+
+
+ public void performBindings() throws AMQException, ConfigurationException
+ {
+ List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name");
+
+ _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames);
+
+ for(Object nameObject : virtualHostNames)
+ {
+ String name = String.valueOf(nameObject);
+ configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name));
+ }
+
+ if (virtualHostNames == null || virtualHostNames.isEmpty())
+ {
+ throw new ConfigurationException(
+ "Virtualhost Configuration document does not contain a valid virtualhost.");
+ }
+ }
+
+
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index eb9d1acb59..92dc0fa43e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.exchange;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.log4j.Logger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -37,6 +37,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
*/
private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>();
+ private Exchange _defaultExchange;
+
public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
{
//create 'standard' exchanges:
@@ -52,9 +54,23 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public void registerExchange(Exchange exchange)
{
+ if(_defaultExchange == null)
+ {
+ setDefaultExchange(exchange);
+ }
_exchangeMap.put(exchange.getName(), exchange);
}
+ public void setDefaultExchange(Exchange exchange)
+ {
+ _defaultExchange = exchange;
+ }
+
+ public Exchange getDefaultExchange()
+ {
+ return _defaultExchange;
+ }
+
public void unregisterExchange(String name, boolean inUse) throws AMQException
{
// TODO: check inUse argument
@@ -71,7 +87,16 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public Exchange getExchange(String name)
{
- return _exchangeMap.get(name);
+
+ if(name == null || name.length() == 0)
+ {
+ return _defaultExchange;
+ }
+ else
+ {
+ return _exchangeMap.get(name);
+ }
+
}
/**
@@ -82,14 +107,15 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public void routeContent(AMQMessage payload) throws AMQException
{
final String exchange = payload.getPublishBody().exchange;
- final Exchange exch = _exchangeMap.get(exchange);
+ final Exchange exch = getExchange(exchange);
// there is a small window of opportunity for the exchange to be deleted in between
- // the JmsPublish being received (where the exchange is validated) and the final
+ // the BasicPublish being received (where the exchange is validated) and the final
// content body being received (which triggers this method)
+ // TODO: check where the exchange is validated
if (exch == null)
{
throw new AMQException("Exchange '" + exchange + "' does not exist");
}
exch.route(payload);
}
-}
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index 4a0a6a0ee1..1d32ee17a9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -37,4 +37,6 @@ public interface ExchangeRegistry extends MessageRouter
void unregisterExchange(String name, boolean inUse) throws ExchangeInUseException, AMQException;
Exchange getExchange(String name);
+
+ Exchange getDefaultExchange();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index afe4ea95b9..8603113c11 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.message.MessageDecorator;
import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
@@ -43,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class AMQMessage
{
+ private static final Logger _log = Logger.getLogger(AMQMessage.class);
+
public static final String JMS_MESSAGE = "jms.message";
private final Set<Object> _tokens = new HashSet<Object>();
@@ -61,6 +64,8 @@ public class AMQMessage
private final AtomicInteger _referenceCount = new AtomicInteger(1);
+ private long _arrivalTime;
+
/**
* Keeps a track of how many bytes we have received in body frames
*/
@@ -157,20 +162,20 @@ public class AMQMessage
public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
{
-
+
AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
allFrames[0] = BasicDeliverBody.createAMQFrame(channel,
- (byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag, // consumerTag
- deliveryTag, // deliveryTag
- getExchangeName(), // exchange
- _redelivered, // redelivered
- getRoutingKey() // routingKey
- );
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ consumerTag, // consumerTag
+ deliveryTag, // deliveryTag
+ getExchangeName(), // exchange
+ _redelivered, // redelivered
+ getRoutingKey() // routingKey
+ );
allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
for (int i = 2; i < allFrames.length; i++)
{
@@ -201,6 +206,8 @@ public class AMQMessage
public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
{
_contentHeaderBody = contentHeaderBody;
+ _arrivalTime = System.currentTimeMillis();
+
if (_storeWhenComplete && isAllContentReceived())
{
storeMessage();
@@ -223,6 +230,7 @@ public class AMQMessage
_bodyLengthReceived += contentBody.getSize();
if (_storeWhenComplete && isAllContentReceived())
{
+ _arrivalTime = System.currentTimeMillis();
storeMessage();
}
}
@@ -263,6 +271,12 @@ public class AMQMessage
_redelivered = redelivered;
}
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
public long getMessageId()
{
return _messageId;
@@ -299,6 +313,7 @@ public class AMQMessage
throw new MessageCleanupException(_messageId, e);
}
}
+
}
public void setPublisher(AMQProtocolSession publisher)
@@ -367,11 +382,17 @@ public class AMQMessage
return _txnBuffer;
}
+ public long getSize()
+ {
+ return getContentHeaderBody().bodySize;
+ }
+
/**
* Called to enforce the 'immediate' flag.
+ *
* @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
+ * immediate delivery but has not been marked as delivered to a
+ * consumer
*/
public void checkDeliveredToConsumer() throws NoConsumersException
{
@@ -393,7 +414,8 @@ public class AMQMessage
/**
* Called selectors to determin if the message has already been sent
- * @return _deliveredToConsumer
+ *
+ * @return _deliveredToConsumer
*/
public boolean getDeliveredToConsumer()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 101a2833a0..d8bacc8c7d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
@@ -90,22 +91,36 @@ public class AMQQueue implements Managable, Comparable
/**
* max allowed size(KB) of a single message
*/
- private long _maxMessageSize = 10000;
+ private long _maximumMessageSize = 10000;
/**
* max allowed number of messages on a queue.
*/
- private Integer _maxMessageCount = 10000;
+ @Configured(path = "maximumMessageCount", defaultValue = "0")
+ public int _maximumMessageCount;
/**
- * max queue depth(KB) for the queue
+ * max queue depth for the queue
*/
- private long _maxQueueDepth = 10000000;
+ @Configured(path = "maximumQueueDepth", defaultValue = "0")
+ public long _maximumQueueDepth = 10000000;
+
+ /*
+ * maximum message age before alerts occur
+ */
+ @Configured(path = "maximumMessageAge", defaultValue = "0")
+ public long _maximumMessageAge = 30000; //0
+
+ /*
+ * the minimum interval between sending out consequetive alerts of the same type
+ */
+ @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
+ public long _minimumAlertRepeatGap = 30000;
/**
* total messages received by the queue since startup.
*/
- private long _totalMessagesReceived = 0;
+ public long _totalMessagesReceived = 0;
public int compareTo(Object o)
{
@@ -183,35 +198,13 @@ public class AMQQueue implements Managable, Comparable
_autoDelete = autoDelete;
_queueRegistry = queueRegistry;
_asyncDelivery = asyncDelivery;
+
_managedObject = createMBean();
_managedObject.register();
+
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
-
- //fixme - Make this configurable via the broker config.xml
- if (System.getProperties().getProperty("deliverymanager") != null)
- {
- if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
- {
- _logger.info("Using ConcurrentSelectorDeliveryManager");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- }
- else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
- {
- _logger.info("Using ConcurrentDeliveryManager");
- _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
- }
- else
- {
- _logger.info("Using SynchronizedDeliveryManager");
- _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
- }
- }
- else
- {
- _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- }
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
private AMQQueueMBean createMBean() throws AMQException
@@ -267,6 +260,11 @@ public class AMQQueue implements Managable, Comparable
return _deliveryMgr.getMessages();
}
+ public long getQueueDepth()
+ {
+ return _deliveryMgr.getTotalMessageSize();
+ }
+
/**
* @param messageId
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
@@ -295,50 +293,55 @@ public class AMQQueue implements Managable, Comparable
return _managedObject;
}
- public Long getMaximumMessageSize()
+ public long getMaximumMessageSize()
{
- return _maxMessageSize;
+ return _maximumMessageSize;
}
- public void setMaximumMessageSize(Long value)
+ public void setMaximumMessageSize(long value)
{
- _maxMessageSize = value;
+ _maximumMessageSize = value;
}
- public Integer getConsumerCount()
+ public int getConsumerCount()
{
return _subscribers.size();
}
- public Integer getActiveConsumerCount()
+ public int getActiveConsumerCount()
{
return _subscribers.getWeight();
}
- public Long getReceivedMessageCount()
+ public long getReceivedMessageCount()
{
return _totalMessagesReceived;
}
- public Integer getMaximumMessageCount()
+ public int getMaximumMessageCount()
{
- return _maxMessageCount;
+ return _maximumMessageCount;
}
- public void setMaximumMessageCount(Integer value)
+ public void setMaximumMessageCount(int value)
{
- _maxMessageCount = value;
+ _maximumMessageCount = value;
}
- public Long getMaximumQueueDepth()
+ public long getMaximumQueueDepth()
{
- return _maxQueueDepth;
+ return _maximumQueueDepth;
}
// Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(Long value)
+ public void setMaximumQueueDepth(long value)
{
- _maxQueueDepth = value;
+ _maximumQueueDepth = value;
+ }
+
+ public long getOldestMessageArrivalTime()
+ {
+ return _deliveryMgr.getOldestMessageArrival();
}
/**
@@ -374,11 +377,11 @@ public class AMQQueue implements Managable, Comparable
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
- if(subscription.hasFilters())
+ if (subscription.hasFilters())
{
if (_deliveryMgr.hasQueuedMessages())
{
- _deliveryMgr.populatePreDeliveryQueue(subscription);
+ _deliveryMgr.populatePreDeliveryQueue(subscription);
}
}
@@ -551,6 +554,27 @@ public class AMQQueue implements Managable, Comparable
}
}
+ public long getMinimumAlertRepeatGap()
+ {
+ return _minimumAlertRepeatGap;
+ }
+
+ public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
+ {
+ _minimumAlertRepeatGap = minimumAlertRepeatGap;
+ }
+
+ public long getMaximumMessageAge()
+ {
+ return _maximumMessageAge;
+ }
+
+ public void setMaximumMessageAge(long maximumMessageAge)
+ {
+ _maximumMessageAge = maximumMessageAge;
+ }
+
+
private class Deliver implements TxnOp
{
private final AMQMessage _msg;
@@ -591,4 +615,5 @@ public class AMQQueue implements Managable, Comparable
}
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index a914975e00..f5ecf6ba55 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
import javax.management.openmbean.*;
import javax.management.JMException;
@@ -41,8 +42,11 @@ import java.util.ArrayList;
* for an AMQQueue.
*/
@MBeanDescription("Management Interface for AMQQueue")
-public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
+public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
+
+ private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+
private AMQQueue _queue = null;
private String _queueName = null;
// OpenMBean data types for viewMessages method
@@ -51,12 +55,14 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
-
+
// OpenMBean data types for viewMessageContent method
private static CompositeType _msgContentType = null;
private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"};
private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
+
+ private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
@@ -71,7 +77,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
{
init();
}
- catch(JMException ex)
+ catch (JMException ex)
{
// It should never occur
System.out.println(ex.getMessage());
@@ -88,7 +94,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
_msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
_msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
_msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
- _msgContentAttributes, _msgContentAttributeTypes);
+ _msgContentAttributes, _msgContentAttributeTypes);
_msgAttributeTypes[0] = SimpleType.LONG; // For message id
_msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
@@ -215,35 +221,31 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
*/
public void checkForNotification(AMQMessage msg)
{
- // Check for threshold message count
- Integer msgCount = getMessageCount();
- if (msgCount >= getMaximumMessageCount())
- {
- notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
- }
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
- // Check for threshold message size
- long messageSize = getMessageSize(msg);
- if (messageSize >= _queue.getMaximumMessageSize())
+ for (NotificationCheck check : NotificationCheck.values())
{
- notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
- }
-
- // Check for threshold queue depth in bytes
- long queueDepth = getQueueDepth();
- if (queueDepth >= _queue.getMaximumQueueDepth())
- {
- notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
+ if (check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()] < thresholdTime)
+ {
+ if (check.notifyIfNecessary(msg, _queue, this))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
+ }
}
}
/**
* Sends the notification to the listeners
*/
- private void notifyClients(String notificationMsg)
+ public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
{
+ // important : add log to the log file - monitoring tools may be looking for this
+ _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
+
Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(n);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
deleted file mode 100644
index 022d3b9635..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
-import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.server.configuration.Configurator;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-
-/**
- * Manages delivery of messages on behalf of a queue
- */
-public class ConcurrentDeliveryManager implements DeliveryManager
-{
- private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
-
- @Configured(path = "advanced.compressBufferOnQueue",
- defaultValue = "false")
- public boolean compressBufferOnQueue;
- /**
- * Holds any queued messages
- */
- private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
- //private int _messageCount;
- /**
- * Ensures that only one asynchronous task is running for this manager at
- * any time.
- */
- private final AtomicBoolean _processing = new AtomicBoolean();
- /**
- * The subscriptions on the queue to whom messages are delivered
- */
- private final SubscriptionManager _subscriptions;
-
- /**
- * A reference to the queue we are delivering messages for. We need this to be able
- * to pass the code that handles acknowledgements a handle on the queue.
- */
- private final AMQQueue _queue;
-
-
- /**
- * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
- * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
- * via the async thread.
- * <p/>
- * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
- */
- private ReentrantLock _lock = new ReentrantLock();
-
-
- ConcurrentDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
- {
-
- //Set values from configuration
- Configurator.configure(this);
-
- if (compressBufferOnQueue)
- {
- _log.info("Compressing Buffers on queue.");
- }
-
- _subscriptions = subscriptions;
- _queue = queue;
- }
-
- /**
- * @return boolean if we are queueing
- */
- private boolean queueing()
- {
- return hasQueuedMessages();
- }
-
-
- /**
- * @param msg to enqueue
- * @return true if we are queue this message
- */
- private boolean enqueue(AMQMessage msg)
- {
- if (msg.isImmediate())
- {
- return false;
- }
- else
- {
- _lock.lock();
- try
- {
- if (queueing())
- {
- return addMessageToQueue(msg);
- }
- else
- {
- return false;
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
- }
-
- private void startQueueing(AMQMessage msg)
- {
- if (!msg.isImmediate())
- {
- addMessageToQueue(msg);
- }
- }
-
- private boolean addMessageToQueue(AMQMessage msg)
- {
- // Shrink the ContentBodies to their actual size to save memory.
- if (compressBufferOnQueue)
- {
- Iterator it = msg.getContentBodies().iterator();
- while (it.hasNext())
- {
- ContentBody cb = (ContentBody) it.next();
- cb.reduceBufferToFit();
- }
- }
-
- _messages.offer(msg);
-
- return true;
- }
-
-
- public boolean hasQueuedMessages()
- {
-
- _lock.lock();
- try
- {
- return !_messages.isEmpty();
- }
- finally
- {
- _lock.unlock();
- }
-
-
- }
-
- public int getQueueMessageCount()
- {
- return getMessageCount();
- }
-
- /**
- * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
- * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
- *
- * @return int the number of messages in the delivery queue.
- */
- private int getMessageCount()
- {
- return _messages.size();
- }
-
-
- public synchronized List<AMQMessage> getMessages()
- {
- return new ArrayList<AMQMessage>(_messages);
- }
-
- public void populatePreDeliveryQueue(Subscription subscription)
- {
- //no-op . This DM has no PreDeliveryQueues
- }
-
- public synchronized void removeAMessageFromTop() throws AMQException
- {
- AMQMessage msg = poll();
- if (msg != null)
- {
- msg.dequeue(_queue);
- }
- }
-
- public synchronized void clearAllMessages() throws AMQException
- {
- AMQMessage msg = poll();
- while (msg != null)
- {
- msg.dequeue(_queue);
- msg = poll();
- }
- }
-
- /**
- * Only one thread should ever execute this method concurrently, but
- * it can do so while other threads invoke deliver().
- */
- private void processQueue()
- {
- try
- {
- boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
- AMQMessage message = peek();
-
- //While we have messages to send and subscribers to send them to.
- while (message != null && hasSubscribers)
- {
- // _log.debug("Have messages(" + _messages.size() + ") and subscribers");
- Subscription next = _subscriptions.nextSubscriber(message);
- //FIXME Is there still not the chance that this subscribe could be suspended between here and the send?
-
- //We don't synchronize access to subscribers so need to re-check
- if (next != null)
- {
- next.send(message, _queue);
- poll();
- message = peek();
- }
- else
- {
- hasSubscribers = false;
- }
- }
- }
- catch (FailedDequeueException e)
- {
- _log.error("Unable to deliver message as dequeue failed: " + e, e);
- }
- finally
- {
- _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers());
- }
- }
-
- private AMQMessage peek()
- {
- return _messages.peek();
- }
-
- private AMQMessage poll()
- {
- return _messages.poll();
- }
-
- Runner asyncDelivery = new Runner();
-
- public void processAsync(Executor executor)
- {
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get());
-
- if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
- {
- //are we already running? if so, don't re-run
- if (_processing.compareAndSet(false, true))
- {
- // Do we need this?
- // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok.
- //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown())
- {
- executor.execute(asyncDelivery);
- }
- }
- }
- }
-
- public void deliver(String name, AMQMessage msg) throws FailedDequeueException
- {
- // first check whether we are queueing, and enqueue if we are
- if (!enqueue(msg))
- {
- // not queueing so deliver message to 'next' subscriber
- _lock.lock();
- try
- {
- Subscription s = _subscriptions.nextSubscriber(msg);
- if (s == null)
- {
- if (!msg.isImmediate())
- {
- // no subscribers yet so enter 'queueing' mode and queue this message
- startQueueing(msg);
- }
- }
- else
- {
- s.send(msg, _queue);
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
- }
-
- private class Runner implements Runnable
- {
- public void run()
- {
- boolean running = true;
- while (running)
- {
- processQueue();
-
- //Check that messages have not been added since we did our last peek();
- // Synchronize with the thread that adds to the queue.
- // If the queue is still empty then we can exit
- _lock.lock();
- try
- {
- if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers()))
- {
- running = false;
- _processing.set(false);
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
- }
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index f09e8213b1..9efeb8351c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -34,6 +34,7 @@ import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
@@ -76,7 +77,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
* Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
*/
private ReentrantLock _lock = new ReentrantLock();
-
+ private AtomicLong _totalMessageSize = new AtomicLong();
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -109,6 +110,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_messages.offer(msg);
+ _totalMessageSize.addAndGet(msg.getSize());
+
return true;
}
@@ -142,6 +145,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return _messages.size();
}
+ public long getTotalMessageSize()
+ {
+ return _totalMessageSize.get();
+ }
+
+ public long getOldestMessageArrival()
+ {
+ AMQMessage msg = _messages.peek();
+ return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
+ }
+
public synchronized List<AMQMessage> getMessages()
{
@@ -173,6 +187,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (msg != null)
{
msg.dequeue(_queue);
+ _totalMessageSize.addAndGet(-msg.getSize());
}
}
@@ -182,6 +197,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
while (msg != null)
{
msg.dequeue(_queue);
+ _totalMessageSize.set(0L);
msg = poll();
}
}
@@ -222,6 +238,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//remove sent message from our queue.
messageQueue.poll();
+ _totalMessageSize.addAndGet(-message.getSize());
}
catch (FailedDequeueException e)
{
@@ -308,7 +325,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//Pre Deliver to all subscriptions
if (_log.isDebugEnabled())
{
- _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
+ _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
" subscribers to give the message to.");
}
for (Subscription sub : _subscriptions.getSubscriptions())
@@ -330,7 +347,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
+ _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
sub.enqueueForPreDelivery(msg);
@@ -345,7 +362,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
System.identityHashCode(s) + ") :" + s);
}
//Deliver the message
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index cac499587f..28386dfa45 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -64,7 +64,8 @@ interface DeliveryManager
*
* @param name the name of the entity on whose behalf we are delivering the message
* @param msg the message to deliver
- * @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued
+ * @throws org.apache.qpid.server.queue.FailedDequeueException
+ * if the message could not be dequeued
*/
void deliver(String name, AMQMessage msg) throws FailedDequeueException;
@@ -75,4 +76,8 @@ interface DeliveryManager
List<AMQMessage> getMessages();
void populatePreDeliveryQueue(Subscription subscription);
+
+ long getTotalMessageSize();
+
+ long getOldestMessageArrival();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
new file mode 100644
index 0000000000..8e9b3804f2
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+public enum NotificationCheck
+{
+
+ MESSAGE_COUNT_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ int msgCount = queue.getMessageCount();
+ final int maximumMessageCount = queue.getMaximumMessageCount();
+ if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+ {
+ listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
+ return true;
+ }
+ return false;
+ }
+ },
+ MESSAGE_SIZE_ALERT(true)
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ final long maximumMessageSize = queue.getMaximumMessageSize();
+ if(maximumMessageSize != 0)
+ {
+ // Check for threshold message size
+ long messageSize;
+// try
+// {
+ messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
+// }
+// catch (AMQException e)
+// {
+// messageSize = 0;
+// }
+
+
+ if (messageSize >= maximumMessageSize)
+ {
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ QUEUE_DEPTH_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ // Check for threshold queue depth in bytes
+ final long maximumQueueDepth = queue.getMaximumQueueDepth();
+
+ if(maximumQueueDepth != 0)
+ {
+ final long queueDepth = queue.getQueueDepth();
+
+ if (queueDepth >= maximumQueueDepth)
+ {
+ listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ MESSAGE_AGE_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+
+ final long maxMessageAge = queue.getMaximumMessageAge();
+ if(maxMessageAge != 0)
+ {
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - maxMessageAge;
+ final long firstArrivalTime = queue.getOldestMessageArrivalTime();
+
+ if(firstArrivalTime < thresholdTime)
+ {
+ long oldestAge = currentTime - firstArrivalTime;
+ listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
+
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ }
+ ;
+
+ private final boolean _messageSpecific;
+
+ NotificationCheck()
+ {
+ this(false);
+ }
+
+ NotificationCheck(boolean messageSpecific)
+ {
+ _messageSpecific = messageSpecific;
+ }
+
+ public boolean isMessageSpecific()
+ {
+ return _messageSpecific;
+ }
+
+ abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
new file mode 100644
index 0000000000..bd9d7f6b11
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface QueueNotificationListener
+{
+ void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
deleted file mode 100644
index c967ea2cde..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
-
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Manages delivery of messages on behalf of a queue
- */
-class SynchronizedDeliveryManager implements DeliveryManager
-{
- private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class);
-
- /**
- * Holds any queued messages
- */
- private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>();
- /**
- * Ensures that only one asynchronous task is running for this manager at
- * any time.
- */
- private final AtomicBoolean _processing = new AtomicBoolean();
- /**
- * The subscriptions on the queue to whom messages are delivered
- */
- private final SubscriptionManager _subscriptions;
-
- /**
- * An indication of the mode we are in. If this is true then messages are
- * being queued up in _messages for asynchronous delivery. If it is false
- * then messages can be delivered directly as they come in.
- */
- private volatile boolean _queueing;
-
- /**
- * A reference to the queue we are delivering messages for. We need this to be able
- * to pass the code that handles acknowledgements a handle on the queue.
- */
- private final AMQQueue _queue;
-
- SynchronizedDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
- {
- _subscriptions = subscriptions;
- _queue = queue;
- }
-
- private synchronized boolean enqueue(AMQMessage msg)
- {
- if (msg.isImmediate())
- {
- return false;
- }
- else
- {
- if (_queueing)
- {
- _messages.offer(msg);
- return true;
- }
- else
- {
- return false;
- }
- }
- }
-
- private synchronized void startQueueing(AMQMessage msg)
- {
- _queueing = true;
- enqueue(msg);
- }
-
- /**
- * Determines whether there are queued messages. Sets _queueing to false if
- * there are no queued messages. This needs to be atomic.
- *
- * @return true if there are queued messages
- */
- public synchronized boolean hasQueuedMessages()
- {
- boolean empty = _messages.isEmpty();
- if (empty)
- {
- _queueing = false;
- }
- return !empty;
- }
-
- public synchronized int getQueueMessageCount()
- {
- return _messages.size();
- }
-
- public synchronized List<AMQMessage> getMessages()
- {
- return new ArrayList<AMQMessage>(_messages);
- }
-
- public void populatePreDeliveryQueue(Subscription subscription)
- {
- //no-op . This DM has no PreDeliveryQueues
- }
-
- public synchronized void removeAMessageFromTop() throws AMQException
- {
- AMQMessage msg = poll();
- if (msg != null)
- {
- msg.dequeue(_queue);
- }
- }
-
- public synchronized void clearAllMessages() throws AMQException
- {
- AMQMessage msg = poll();
- while (msg != null)
- {
- msg.dequeue(_queue);
- msg = poll();
- }
- }
-
- /**
- * Only one thread should ever execute this method concurrently, but
- * it can do so while other threads invoke deliver().
- */
- private void processQueue()
- {
- try
- {
- boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
- while (hasQueuedMessages() && hasSubscribers)
- {
- Subscription next = _subscriptions.nextSubscriber(peek());
- //We don't synchronize access to subscribers so need to re-check
- if (next != null)
- {
- try
- {
- next.send(poll(), _queue);
- }
- catch (AMQException e)
- {
- _log.error("Unable to deliver message: " + e, e);
- }
- }
- else
- {
- hasSubscribers = false;
- }
- }
- }
- finally
- {
- _processing.set(false);
- }
- }
-
- private synchronized AMQMessage peek()
- {
- return _messages.peek();
- }
-
- private synchronized AMQMessage poll()
- {
- return _messages.poll();
- }
-
- /**
- * Requests that the delivery manager start processing the queue asynchronously
- * if there is work that can be done (i.e. there are messages queued up and
- * subscribers that can receive them.
- * <p/>
- * This should be called when subscribers are added, but only after the consume-ok
- * message has been returned as message delivery may start immediately. It should also
- * be called after unsuspending a client.
- * <p/>
- *
- * @param executor the executor on which the delivery should take place
- */
- public void processAsync(Executor executor)
- {
- if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
- {
- //are we already running? if so, don't re-run
- if (_processing.compareAndSet(false, true))
- {
- // Do we need this?
- // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok.
- //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown())
- {
- executor.execute(new Runner());
- }
- }
- }
- }
-
- /**
- * Handles message delivery. The delivery manager is always in one of two modes;
- * it is either queueing messages for asynchronous delivery or delivering
- * directly.
- *
- * @param name the name of the entity on whose behalf we are delivering the message
- * @param msg the message to deliver
- * @throws NoConsumersException if there are no active subscribers to deliver
- * the message to
- */
- public void deliver(String name, AMQMessage msg) throws FailedDequeueException
- {
- // first check whether we are queueing, and enqueue if we are
- if (!enqueue(msg))
- {
- synchronized(this)
- {
- // not queueing so deliver message to 'next' subscriber
- Subscription s = _subscriptions.nextSubscriber(msg);
- if (s == null)
- {
- // no subscribers yet so enter 'queueing' mode and queue this message
- startQueueing(msg);
- }
- else
- {
- s.send(msg, _queue);
- }
- }
- }
-
- }
-
- private class Runner implements Runnable
- {
- public void run()
- {
- processQueue();
- }
- }
-}