diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-02-25 11:31:18 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-02-25 11:31:18 +0000 |
| commit | 4a589006e8329957f13f221b924e3cc4bf9989ca (patch) | |
| tree | 585b0baf31e44871f58c2af98d51bd4c283d1131 /java | |
| parent | cfda2140c2a61a22b59be32f8bfb1aed10eefcf2 (diff) | |
| download | qpid-python-4a589006e8329957f13f221b924e3cc4bf9989ca.tar.gz | |
QPID-1633 : Added new properties to SimpleAMQQueue with appropriate getters/setters from the ServerConfiguration and Management Console.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747753 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
10 files changed, 292 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 90d6caec99..83fcfad1fd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -21,8 +21,10 @@ package org.apache.qpid.server.configuration; import java.util.List; +import java.io.File; import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.registry.ApplicationRegistry; public class QueueConfiguration { @@ -31,13 +33,20 @@ public class QueueConfiguration private Configuration _config; private String _name; + private VirtualHostConfiguration _virtualHostConfiguration; - public QueueConfiguration(String name, Configuration config) + public QueueConfiguration(String name, Configuration config, VirtualHostConfiguration virtualHostConfiguration) { + _virtualHostConfiguration = virtualHostConfiguration; _config = config; _name = name; } + public VirtualHostConfiguration getVirtualHostConfiguration() + { + return _virtualHostConfiguration; + } + public boolean getDurable() { return _config.getBoolean("durable" ,false); @@ -103,4 +112,13 @@ public class QueueConfiguration return _config.getLong("minimumAlertRepeatGap", 0); } + public long getMemoryUsageMaximum() + { + return _config.getLong("maximumMemoryUsage", 0); + } + + public long getMemoryUsageMinimum() + { + return _config.getLong("minimumMemoryUsage", 0); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index b3c08a2a95..484a241fc0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -113,7 +113,7 @@ public class ServerConfiguration CompositeConfiguration mungedConf = new CompositeConfiguration(); mungedConf.addConfiguration(conf.subset("virtualhosts.virtualhost."+name)); mungedConf.addConfiguration(vhostConfiguration.subset("virtualhost." + name)); - VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration(name, mungedConf); + VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration(name, mungedConf, this); _virtualHosts.put(vhostConfig.getName(), vhostConfig); } } @@ -181,6 +181,11 @@ public class ServerConfiguration return conf; } + public String getQpidWork() + { + return System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")); + } + public void setJMXManagementPort(int mport) { _jmxPort = mport; diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 91d0b8d8da..2a235fe9a2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -37,10 +37,12 @@ public class VirtualHostConfiguration private String _name; private Map<String, QueueConfiguration> _queues = new HashMap<String, QueueConfiguration>(); private Map<String, ExchangeConfiguration> _exchanges = new HashMap<String, ExchangeConfiguration>(); + private ServerConfiguration _serverConfiguration; - - public VirtualHostConfiguration(String name, Configuration config) throws ConfigurationException + public VirtualHostConfiguration(String name, Configuration config, + ServerConfiguration serverConfiguration) throws ConfigurationException { + _serverConfiguration = serverConfiguration; _config = config; _name = name; @@ -52,7 +54,7 @@ public class VirtualHostConfiguration CompositeConfiguration mungedConf = new CompositeConfiguration(); mungedConf.addConfiguration(_config.subset("queues.queue." + queueName)); mungedConf.addConfiguration(_config.subset("queues")); - _queues.put(queueName, new QueueConfiguration(queueName, mungedConf)); + _queues.put(queueName, new QueueConfiguration(queueName, mungedConf, this)); } i = _config.getList("exchanges.exchange.name").iterator(); @@ -67,6 +69,11 @@ public class VirtualHostConfiguration } } + public VirtualHostConfiguration(String name, Configuration mungedConf) throws ConfigurationException + { + this(name,mungedConf, null); + } + public String getName() { return _name; @@ -127,4 +134,25 @@ public class VirtualHostConfiguration return _queues.get(queueName); } + public long getMemoryUsageMaximum() + { + return _config.getLong("queues.maximumMemoryUsage", 0); + } + + public long getMemoryUsageMinimum() + { + return _config.getLong("queues.minimumMemoryUsage", 0); + } + + public ServerConfiguration getServerConfiguration() + { + return _serverConfiguration; + } + + public static final String FLOW_TO_DISK_PATH = "flowToDiskPath"; + public String getFlowToDiskLocation() + { + return _config.getString(FLOW_TO_DISK_PATH, getServerConfiguration().getQpidWork()); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 9fadbb0cdc..43ec6c4d15 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -68,6 +68,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> boolean isEmpty(); + boolean isFlowed(); + int getMessageCount(); int getUndeliveredMessageCount(); @@ -111,7 +113,15 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext); + long getMemoryUsageMaximum(); + + void setMemoryUsageMaximum(long maximumMemoryUsage); + + long getMemoryUsageMinimum(); + + void setMemoryUsageMinimum(long minimumMemoryUsage); + long getMemoryUsageCurrent(); long getMaximumMessageSize(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index eb0a011e93..f977dc0449 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -85,6 +85,8 @@ public class AMQQueueFactory q.setMaximumMessageSize(config.getMaximumMessageSize()); q.setMaximumMessageCount(config.getMaximumMessageCount()); q.setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); + q.setMemoryUsageMaximum(config.getMemoryUsageMaximum()); + q.setMemoryUsageMinimum(config.getMemoryUsageMinimum()); return q; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index a08719875d..b077b453cb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -233,6 +233,36 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que _queue.setMaximumQueueDepth(value); } + public Long getMemoryUsageMaximum() + { + return _queue.getMemoryUsageMaximum(); + } + + public void setMemoryUsageMaximum(Long maximumMemoryUsage) + { + _queue.setMemoryUsageMaximum(maximumMemoryUsage); + } + + public Long getMemoryUsageMinimum() + { + return _queue.getMemoryUsageMinimum(); + } + + public void setMemoryUsageMinimum(Long minimumMemoryUsage) + { + _queue.setMemoryUsageMinimum(minimumMemoryUsage); + } + + public Long getMemoryUsageCurrent() + { + return _queue.getMemoryUsageCurrent(); + } + + public boolean isFlowed() + { + return _queue.isFlowed(); + } + /** * returns the size of messages(KB) in the queue. */ diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java index 2bc94995e9..21e99034fc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java @@ -183,6 +183,63 @@ public interface ManagedQueue @MBeanAttribute(name="MaximumQueueDepth", description="The threshold high value(KB) for Queue Depth") void setMaximumQueueDepth(Long value) throws IOException; + //TODO change descriptions + /** + * View the limit on the memory that this queue will utilise. + * + * Used by Flow to Disk. + * + * @return The maximum memory(B) that the queue will occuy. + */ + public Long getMemoryUsageMaximum(); + + /** + * Place a limit on the memory that this queue will utilise. + * + * Used by Flow to Disk + * + * @param maximumMemoryUsage The new maximum memory(B) to be used by this queue + */ + @MBeanAttribute(name="MemoryUsageMaximum", description="The maximum memory(B) that the queue will occupy.") + public void setMemoryUsageMaximum(Long maximumMemoryUsage); + + /** + * View the minimum amount of memory that has been defined for this queue. + * + * Used by Flow to Disk + * + * @return The minimum amount of queue data(B) that the queue will attempt to keep in memory + */ + public Long getMemoryUsageMinimum(); + + /** + * Set the minimum amount of memory that has been defined for this queue. + * + * Used by Flow to Disk + * + * @param minimumMemoryUsage The new minimum memory(B) level to be used by this queue + */ + @MBeanAttribute(name="MemoryUsageMinimum", description="The minimum memory(B) that the queue will occupy.") + public void setMemoryUsageMinimum(Long minimumMemoryUsage); + + /** + * View the amount of memory(B) that this queue is using. + * + * @return The current memory(B) usage of this queue. + */ + @MBeanAttribute(name="MemoryUsageCurrent", description="The current amount of memory(B) used by this queue.") + public Long getMemoryUsageCurrent(); + + /** + * When a queue exceeds its MemoryUsageMaximum value then the Queue will start flowing to disk. + * + * This boolean is used to show that change in state. + * + * @return true if the Queue is currently flowing to disk + */ + @MBeanAttribute(name="isFlowed", description="true if the queue is currently flowing to disk.") + public boolean isFlowed(); + //********** Operations *****************// diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 501e90b4d7..a4945bc11a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -19,7 +19,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.pool.ReadWriteRunnable; import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -77,6 +76,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final AtomicLong _atomicQueueSize = new AtomicLong(0L); + private final AtomicLong _atomicQueueInMemory = new AtomicLong(0L); + private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); protected final SubscriptionList _subscriptionList = new SubscriptionList(this); @@ -105,6 +106,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener /** the minimum interval between sending out consecutive alerts of the same type */ public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap(); + /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */ + private long _memoryUsageMaximum = 0; + + /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */ + private long _memoryUsageMinimum = 0; + private static final int MAX_ASYNC_DELIVERIES = 10; private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); @@ -113,6 +120,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private AtomicReference _asynchronousRunner = new AtomicReference(null); private AtomicInteger _deliveredMessages = new AtomicInteger(); private AtomicBoolean _stopped = new AtomicBoolean(false); + /** Control to determin if this queue is flowed or not. */ + protected AtomicBoolean _flowed = new AtomicBoolean(false); protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -159,7 +168,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } resetNotifications(); + resetFlowToDisk(); + } + public void resetFlowToDisk() + { + setMemoryUsageMaximum(_memoryUsageMaximum); + setMemoryUsageMinimum(_memoryUsageMinimum); } public void resetNotifications() @@ -188,6 +203,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _autoDelete; } + public boolean isFlowed() + { + return _flowed.get(); + } + public AMQShortString getOwner() { return _owner; @@ -468,6 +488,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private void incrementQueueSize(final AMQMessage message) { getAtomicQueueSize().addAndGet(message.getSize()); + getAtomicQueueInMemory().addAndGet(message.getSize()); } private void incrementQueueCount() @@ -607,6 +628,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private void decrementQueueSize(final QueueEntry entry) { getAtomicQueueSize().addAndGet(-entry.getMessage().getSize()); + getAtomicQueueInMemory().addAndGet(-entry.getMessage().getSize()); } void decrementQueueCount() @@ -658,6 +680,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return getMessageCount() == 0; } + public long getMemoryUsageCurrent() + { + return getAtomicQueueInMemory().get(); + } + public int getMessageCount() { return getAtomicQueueCount().get(); @@ -751,6 +778,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _atomicQueueSize; } + public AtomicLong getAtomicQueueInMemory() + { + return _atomicQueueInMemory; + } + + private boolean isExclusiveSubscriber() { return _exclusiveSubscriber != null; @@ -1457,6 +1490,51 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + + public long getMemoryUsageMaximum() + { + return _memoryUsageMaximum; + } + + public void setMemoryUsageMaximum(long maximumMemoryUsage) + { + _memoryUsageMaximum = maximumMemoryUsage; + + // Don't attempt to start the inhaler/purger unless we have a minimum value specified. + if (_memoryUsageMaximum > 0) + { + // If we've increased the max memory above what we have in memory then we can inhale more + if (_memoryUsageMaximum > _atomicQueueInMemory.get()) + { + //TODO start inhaler + } + else // if we have now have to much memory in use we need to purge. + { + //TODO start purger + } + } + } + + public long getMemoryUsageMinimum() + { + return _memoryUsageMinimum; + } + + public void setMemoryUsageMinimum(long minimumMemoryUsage) + { + _memoryUsageMinimum = minimumMemoryUsage; + + // Don't attempt to start the inhaler unless we have a minimum value specified. + if (_memoryUsageMinimum > 0) + { + // If we've increased the minimum memory above what we have in memory then we need to inhale more + if (_memoryUsageMinimum >= _atomicQueueInMemory.get()) + { + //TODO start inhaler + } + } + } + public long getMinimumAlertRepeatGap() { return _minimumAlertRepeatGap; diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index ba504d3064..7239ec9303 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -120,5 +120,33 @@ public class VirtualHostConfigurationTest extends TestCase assertEquals(3, bTest.getMaximumMessageAge()); } + + public void testQueueMemoryValues() throws Exception + { + // Set up queue with 5 priorities + configXml.addProperty("virtualhost.test.queues.exchange", "amq.topic"); + configXml.addProperty("virtualhost.test.queues.maximumMemoryUsage", "11"); + configXml.addProperty("virtualhost.test.queues.minimumMemoryUsage", "22"); + + configXml.addProperty("virtualhost.test.queues(-1).queue(1).name(1)", "atest"); + configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", "amq.direct"); + configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumMemoryUsage", "44"); + configXml.addProperty("virtualhost.test.queues.queue.atest(-1).minimumMemoryUsage", "55"); + + configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", "btest"); + + VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test"))); + + // Check specifically configured values + AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest")); + assertEquals(44, aTest.getMemoryUsageMaximum()); + assertEquals(55, aTest.getMemoryUsageMinimum()); + + // Check default values + AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest")); + assertEquals(11, bTest.getMemoryUsageMaximum()); + assertEquals(22, bTest.getMemoryUsageMinimum()); + } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 39b78b99d1..d9e4cc9b70 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -115,6 +115,11 @@ public class MockAMQQueue implements AMQQueue return false; //To change body of implemented methods use File | Settings | File Templates. } + public boolean isFlowed() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + public int getMessageCount() { return 0; //To change body of implemented methods use File | Settings | File Templates. @@ -216,6 +221,26 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } + public long getMemoryUsageMaximum() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setMemoryUsageMaximum(long maximumMemoryUsage) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public long getMemoryUsageMinimum() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setMemoryUsageMinimum(long minimumMemoryUsage) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public long getMaximumMessageSize() { return 0; //To change body of implemented methods use File | Settings | File Templates. @@ -271,7 +296,6 @@ public class MockAMQQueue implements AMQQueue return 0; //To change body of implemented methods use File | Settings | File Templates. } - @Override public void checkMessageStatus() throws AMQException { //To change body of implemented methods use File | Settings | File Templates. @@ -302,6 +326,11 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } + public long getMemoryUsageCurrent() + { + return 0; + } + public ManagedObject getManagedObject() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -312,7 +341,6 @@ public class MockAMQQueue implements AMQQueue return 0; //To change body of implemented methods use File | Settings | File Templates. } - @Override public void setMinimumAlertRepeatGap(long value) { |
