diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-11-08 12:56:12 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-11-08 12:56:12 +0000 |
| commit | acdb57af871cbfef3a768b2dd765246063129d5d (patch) | |
| tree | e1e498f39238251fc3878ca0308b4aed28244053 /qpid/java/broker-core/src | |
| parent | a943841f59639388b526ae51629d2c0f32311670 (diff) | |
| download | qpid-python-acdb57af871cbfef3a768b2dd765246063129d5d.tar.gz | |
QPID-6221 : [Java Broker] Detect low disk space conditions and enforce flow control
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1637558 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
11 files changed, 184 insertions, 17 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java index 98708270ab..d848008f0f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java @@ -145,7 +145,7 @@ public class PortMessages /** * Log a Port message of the Format: - * <pre>PRT-1005 : Connection from {0} reject as connection limit reached</pre> + * <pre>PRT-1005 : Connection from {0} rejected</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. * diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java index a53927c283..7755b5ebc2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java @@ -47,6 +47,8 @@ public class VirtualHostMessages public static final String CLOSED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.closed"; public static final String STATS_DATA_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.stats_data"; public static final String STATS_MSGS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.stats_msgs"; + public static final String FILESYSTEM_FULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.filesystem_full"; + public static final String FILESYSTEM_NOTFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.filesystem_notfull"; public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.created"; public static final String ERRORED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.errored"; @@ -56,6 +58,8 @@ public class VirtualHostMessages Logger.getLogger(CLOSED_LOG_HIERARCHY); Logger.getLogger(STATS_DATA_LOG_HIERARCHY); Logger.getLogger(STATS_MSGS_LOG_HIERARCHY); + Logger.getLogger(FILESYSTEM_FULL_LOG_HIERARCHY); + Logger.getLogger(FILESYSTEM_NOTFULL_LOG_HIERARCHY); Logger.getLogger(CREATED_LOG_HIERARCHY); Logger.getLogger(ERRORED_LOG_HIERARCHY); @@ -160,6 +164,70 @@ public class VirtualHostMessages /** * Log a VirtualHost message of the Format: + * <pre>VHT-1006 : Filesystem is over {0,number} per cent full, enforcing flow control.</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage FILESYSTEM_FULL(Number param1) + { + String rawMessage = _messages.getString("FILESYSTEM_FULL"); + + final Object[] messageArguments = {param1}; + // Create a new MessageFormat to ensure thread safety. + // Sharing a MessageFormat and using applyPattern is not thread safe + MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); + + final String message = formatter.format(messageArguments); + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return FILESYSTEM_FULL_LOG_HIERARCHY; + } + }; + } + + /** + * Log a VirtualHost message of the Format: + * <pre>VHT-1007 : Filesystem is no longer over {0,number} per cent full.</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage FILESYSTEM_NOTFULL(Number param1) + { + String rawMessage = _messages.getString("FILESYSTEM_NOTFULL"); + + final Object[] messageArguments = {param1}; + // Create a new MessageFormat to ensure thread safety. + // Sharing a MessageFormat and using applyPattern is not thread safe + MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); + + final String message = formatter.format(messageArguments); + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return FILESYSTEM_NOTFULL_LOG_HIERARCHY; + } + }; + } + + /** + * Log a VirtualHost message of the Format: * <pre>VHT-1001 : Created : {0}</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties index 8cd15f0120..6bab8ecc3f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties @@ -25,4 +25,7 @@ CLOSED = VHT-1002 : Closed : {0} STATS_DATA = VHT-1003 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} kB/s peak : {3,number,#} bytes total STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total -ERRORED = VHT-1005 : {0} Unexpected fatal error
\ No newline at end of file +ERRORED = VHT-1005 : {0} Unexpected fatal error + +FILESYSTEM_FULL = VHT-1006 : Filesystem is over {0,number} per cent full, enforcing flow control. +FILESYSTEM_NOTFULL = VHT-1007 : Filesystem is no longer over {0,number} per cent full. diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 17a5bc3a04..9f15b234c3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -60,6 +60,8 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL String BROKER_FLOW_TO_DISK_THRESHOLD = "broker.flowToDiskThreshold"; String BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD = "broker.failStartupWithErroredChild"; + String STORE_FILESYSTEM_MAX_USAGE_PERCENT = "store.filesystem.maxUsagePercent"; + String QPID_AMQP_PORT = "qpid.amqp_port"; String QPID_HTTP_PORT = "qpid.http_port"; String QPID_RMI_PORT = "qpid.rmi_port"; @@ -83,6 +85,9 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL @ManagedContextDefault(name = CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT) long DEFAULT_CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT = 5000l; + @ManagedContextDefault(name = STORE_FILESYSTEM_MAX_USAGE_PERCENT) + int DEFAULT_FILESYSTEM_MAX_USAGE_PERCENT = 90; + String BROKER_FRAME_SIZE = "qpid.broker_frame_size"; @ManagedContextDefault(name = BROKER_FRAME_SIZE) int DEFAULT_FRAME_SIZE = 65535; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java index a9a5ea8086..ae35816516 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java @@ -22,5 +22,7 @@ package org.apache.qpid.server.store; public enum Event { PERSISTENT_MESSAGE_SIZE_OVERFULL, - PERSISTENT_MESSAGE_SIZE_UNDERFULL + PERSISTENT_MESSAGE_SIZE_UNDERFULL, + FILESYSTEM_OVERFULL, + FILESYSTEM_UNDERFULL } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/EventManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/EventManager.java index bf3de2611d..196ea05a43 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/EventManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/EventManager.java @@ -60,4 +60,9 @@ public class EventManager } } } + + public synchronized boolean hasListeners(Event event) + { + return _listeners.containsKey(event); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index c108918d34..efe040fbb3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.store; +import java.io.File; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -260,6 +261,12 @@ public class MemoryMessageStore implements MessageStore } @Override + public File getStoreLocationAsFile() + { + return null; + } + + @Override public void onDelete(ConfiguredObject<?> parent) { } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java index 1629454cde..28a330b205 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.store; +import java.io.File; + import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; @@ -36,6 +38,8 @@ public interface MessageStore String getStoreLocation(); + File getStoreLocationAsFile(); + void addEventListener(EventListener eventListener, Event... events); /** diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index b6f4ea52ce..83d61fc3fe 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -19,6 +19,7 @@ */ package org.apache.qpid.server.store; +import java.io.File; import java.util.UUID; import org.apache.qpid.server.model.ConfiguredObject; @@ -117,6 +118,12 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override + public File getStoreLocationAsFile() + { + return null; + } + + @Override public void onDelete(ConfiguredObject<?> parent) { } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 52d779f877..4dc975ad80 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -20,11 +20,13 @@ */ package org.apache.qpid.server.virtualhost; +import java.io.File; import java.security.AccessControlException; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -92,6 +94,8 @@ import org.apache.qpid.server.util.MapValueConverter; public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X> implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, IConnectionRegistry.RegistryChangeListener, EventListener { + private static enum BlockingType { STORE, FILESYSTEM }; + private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery"; public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; @@ -116,7 +120,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>(); - private boolean _blocked; + private AtomicBoolean _blocked = new AtomicBoolean(); private final Map<String, MessageDestination> _systemNodeDestinations = Collections.synchronizedMap(new HashMap<String,MessageDestination>()); @@ -134,6 +138,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private MessageStoreLogSubject _messageStoreLogSubject; + private final Set<BlockingType> _blockingReasons = Collections.synchronizedSet(EnumSet.noneOf(BlockingType.class)); + + @ManagedAttributeField private boolean _queue_deadLetterQueueEnabled; @@ -162,6 +169,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private MessageStore _messageStore; private MessageStoreRecoverer _messageStoreRecoverer; + private final FileSystemSpaceChecker _fileSystemSpaceChecker = new FileSystemSpaceChecker(); + private int _fileSystemMaxUsagePercent; public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) { @@ -285,6 +294,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); addChangeListener(new StoreUpdatingChangeListener()); + + + _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT); } private void checkVHostStateIsActive() @@ -694,6 +706,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } getMessageStore().closeMessageStore(); + } catch (StoreException e) { @@ -770,13 +783,13 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return _dtxRegistry; } - public void block() + private void block(BlockingType blockingType) { synchronized (_connectionRegistry) { - if(!_blocked) + _blockingReasons.add(blockingType); + if(!_blocked.compareAndSet(false,true)) { - _blocked = true; for(AMQConnectionModel conn : _connectionRegistry.getConnections()) { conn.block(); @@ -786,13 +799,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } - public void unblock() + private void unblock(BlockingType blockingType) { + synchronized (_connectionRegistry) { - if(_blocked) + _blockingReasons.remove(blockingType); + if(_blockingReasons.isEmpty() && _blocked.compareAndSet(true,false)) { - _blocked = false; for(AMQConnectionModel conn : _connectionRegistry.getConnections()) { conn.unblock(); @@ -803,7 +817,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte public void connectionRegistered(final AMQConnectionModel connection) { - if(_blocked) + if(_blocked.get()) { connection.block(); } @@ -824,11 +838,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte switch(event) { case PERSISTENT_MESSAGE_SIZE_OVERFULL: - block(); + block(BlockingType.STORE); _eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.OVERFULL()); break; case PERSISTENT_MESSAGE_SIZE_UNDERFULL: - unblock(); + unblock(BlockingType.STORE); _eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.UNDERFULL()); break; } @@ -1403,6 +1417,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte MessageStore messageStore = getMessageStore(); messageStore.openMessageStore(this); + startFileSystemSpaceChecking(); + + if (!(_virtualHostNode.getConfigurationStore() instanceof MessageStoreProvider)) { getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CREATED()); @@ -1440,6 +1457,17 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } } + protected void startFileSystemSpaceChecking() + { + File storeLocationAsFile = _messageStore.getStoreLocationAsFile(); + if (storeLocationAsFile != null && _fileSystemMaxUsagePercent > 0) + { + _fileSystemSpaceChecker.setFileSystem(storeLocationAsFile); + + scheduleHouseKeepingTask(getHousekeepingCheckPeriod(), _fileSystemSpaceChecker); + } + } + @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) private void onRestart() { @@ -1530,5 +1558,47 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } } + private class FileSystemSpaceChecker extends HouseKeepingTask + { + private boolean _fileSystemFull; + private File _fileSystem; + + public FileSystemSpaceChecker() + { + super(AbstractVirtualHost.this); + + } + + @Override + public void execute() + { + long totalSpace = _fileSystem.getTotalSpace(); + long freeSpace = _fileSystem.getFreeSpace(); + + long usagePercent = (100l * (totalSpace - freeSpace)) / totalSpace; + + if (_fileSystemFull && (usagePercent < _fileSystemMaxUsagePercent)) + { + _fileSystemFull = false; + getEventLogger().message(getMessageStoreLogSubject(), VirtualHostMessages.FILESYSTEM_NOTFULL( + _fileSystemMaxUsagePercent)); + unblock(BlockingType.FILESYSTEM); + } + else if(!_fileSystemFull && usagePercent > _fileSystemMaxUsagePercent) + { + _fileSystemFull = true; + getEventLogger().message(getMessageStoreLogSubject(), VirtualHostMessages.FILESYSTEM_FULL( + _fileSystemMaxUsagePercent)); + block(BlockingType.FILESYSTEM); + } + + } + + public void setFileSystem(final File fileSystem) + { + _fileSystem = fileSystem; + } + } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 2ca1f1f5c8..29729b6c7d 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -104,10 +104,6 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask); - public void block(); - - public void unblock(); - boolean getDefaultDeadLetterQueueEnabled(); EventLogger getEventLogger(); |
