summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-11-08 12:56:12 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-11-08 12:56:12 +0000
commitacdb57af871cbfef3a768b2dd765246063129d5d (patch)
treee1e498f39238251fc3878ca0308b4aed28244053 /qpid/java
parenta943841f59639388b526ae51629d2c0f32311670 (diff)
downloadqpid-python-acdb57af871cbfef3a768b2dd765246063129d5d.tar.gz
QPID-6221 : [Java Broker] Detect low disk space conditions and enforce flow control
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1637558 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java7
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/EventManager.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java90
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java4
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java6
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java6
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java7
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java7
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java7
19 files changed, 227 insertions, 32 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
index 5550381c9c..409679c2de 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
@@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
@@ -570,6 +569,12 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
}
@Override
+ public File getStoreLocationAsFile()
+ {
+ return new File(getStoreLocation());
+ }
+
+ @Override
protected long getPersistentSizeLowThreshold()
{
return _persistentSizeLowThreshold;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 66e98ac314..2f00d9852c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -165,4 +165,10 @@ public class BDBMessageStore extends AbstractBDBMessageStore
{
return ((FileBasedSettings)_parent).getStorePath();
}
+
+ @Override
+ public File getStoreLocationAsFile()
+ {
+ return new File(getStoreLocation());
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java
index 97b166a9d0..e7c247f1a5 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java
@@ -413,18 +413,6 @@ public class BDBHAReplicaVirtualHost extends AbstractConfiguredObject<BDBHARepli
}
@Override
- public void block()
- {
- throwUnsupportedForReplica();
- }
-
- @Override
- public void unblock()
- {
- throwUnsupportedForReplica();
- }
-
- @Override
public boolean getDefaultDeadLetterQueueEnabled()
{
return false;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
index 98708270ab..d848008f0f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
@@ -145,7 +145,7 @@ public class PortMessages
/**
* Log a Port message of the Format:
- * <pre>PRT-1005 : Connection from {0} reject as connection limit reached</pre>
+ * <pre>PRT-1005 : Connection from {0} rejected</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java
index a53927c283..7755b5ebc2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java
@@ -47,6 +47,8 @@ public class VirtualHostMessages
public static final String CLOSED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.closed";
public static final String STATS_DATA_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.stats_data";
public static final String STATS_MSGS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.stats_msgs";
+ public static final String FILESYSTEM_FULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.filesystem_full";
+ public static final String FILESYSTEM_NOTFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.filesystem_notfull";
public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.created";
public static final String ERRORED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.errored";
@@ -56,6 +58,8 @@ public class VirtualHostMessages
Logger.getLogger(CLOSED_LOG_HIERARCHY);
Logger.getLogger(STATS_DATA_LOG_HIERARCHY);
Logger.getLogger(STATS_MSGS_LOG_HIERARCHY);
+ Logger.getLogger(FILESYSTEM_FULL_LOG_HIERARCHY);
+ Logger.getLogger(FILESYSTEM_NOTFULL_LOG_HIERARCHY);
Logger.getLogger(CREATED_LOG_HIERARCHY);
Logger.getLogger(ERRORED_LOG_HIERARCHY);
@@ -160,6 +164,70 @@ public class VirtualHostMessages
/**
* Log a VirtualHost message of the Format:
+ * <pre>VHT-1006 : Filesystem is over {0,number} per cent full, enforcing flow control.</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage FILESYSTEM_FULL(Number param1)
+ {
+ String rawMessage = _messages.getString("FILESYSTEM_FULL");
+
+ final Object[] messageArguments = {param1};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return FILESYSTEM_FULL_LOG_HIERARCHY;
+ }
+ };
+ }
+
+ /**
+ * Log a VirtualHost message of the Format:
+ * <pre>VHT-1007 : Filesystem is no longer over {0,number} per cent full.</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage FILESYSTEM_NOTFULL(Number param1)
+ {
+ String rawMessage = _messages.getString("FILESYSTEM_NOTFULL");
+
+ final Object[] messageArguments = {param1};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return FILESYSTEM_NOTFULL_LOG_HIERARCHY;
+ }
+ };
+ }
+
+ /**
+ * Log a VirtualHost message of the Format:
* <pre>VHT-1001 : Created : {0}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
index 8cd15f0120..6bab8ecc3f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
@@ -25,4 +25,7 @@ CLOSED = VHT-1002 : Closed : {0}
STATS_DATA = VHT-1003 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} kB/s peak : {3,number,#} bytes total
STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total
-ERRORED = VHT-1005 : {0} Unexpected fatal error \ No newline at end of file
+ERRORED = VHT-1005 : {0} Unexpected fatal error
+
+FILESYSTEM_FULL = VHT-1006 : Filesystem is over {0,number} per cent full, enforcing flow control.
+FILESYSTEM_NOTFULL = VHT-1007 : Filesystem is no longer over {0,number} per cent full.
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 17a5bc3a04..9f15b234c3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -60,6 +60,8 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
String BROKER_FLOW_TO_DISK_THRESHOLD = "broker.flowToDiskThreshold";
String BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD = "broker.failStartupWithErroredChild";
+ String STORE_FILESYSTEM_MAX_USAGE_PERCENT = "store.filesystem.maxUsagePercent";
+
String QPID_AMQP_PORT = "qpid.amqp_port";
String QPID_HTTP_PORT = "qpid.http_port";
String QPID_RMI_PORT = "qpid.rmi_port";
@@ -83,6 +85,9 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedContextDefault(name = CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT)
long DEFAULT_CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT = 5000l;
+ @ManagedContextDefault(name = STORE_FILESYSTEM_MAX_USAGE_PERCENT)
+ int DEFAULT_FILESYSTEM_MAX_USAGE_PERCENT = 90;
+
String BROKER_FRAME_SIZE = "qpid.broker_frame_size";
@ManagedContextDefault(name = BROKER_FRAME_SIZE)
int DEFAULT_FRAME_SIZE = 65535;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java
index a9a5ea8086..ae35816516 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java
@@ -22,5 +22,7 @@ package org.apache.qpid.server.store;
public enum Event
{
PERSISTENT_MESSAGE_SIZE_OVERFULL,
- PERSISTENT_MESSAGE_SIZE_UNDERFULL
+ PERSISTENT_MESSAGE_SIZE_UNDERFULL,
+ FILESYSTEM_OVERFULL,
+ FILESYSTEM_UNDERFULL
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/EventManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/EventManager.java
index bf3de2611d..196ea05a43 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/EventManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/EventManager.java
@@ -60,4 +60,9 @@ public class EventManager
}
}
}
+
+ public synchronized boolean hasListeners(Event event)
+ {
+ return _listeners.containsKey(event);
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index c108918d34..efe040fbb3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.store;
+import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -260,6 +261,12 @@ public class MemoryMessageStore implements MessageStore
}
@Override
+ public File getStoreLocationAsFile()
+ {
+ return null;
+ }
+
+ @Override
public void onDelete(ConfiguredObject<?> parent)
{
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 1629454cde..28a330b205 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.store;
+import java.io.File;
+
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
@@ -36,6 +38,8 @@ public interface MessageStore
String getStoreLocation();
+ File getStoreLocationAsFile();
+
void addEventListener(EventListener eventListener, Event... events);
/**
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index b6f4ea52ce..83d61fc3fe 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.server.store;
+import java.io.File;
import java.util.UUID;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -117,6 +118,12 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
+ public File getStoreLocationAsFile()
+ {
+ return null;
+ }
+
+ @Override
public void onDelete(ConfiguredObject<?> parent)
{
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 52d779f877..4dc975ad80 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.io.File;
import java.security.AccessControlException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -92,6 +94,8 @@ import org.apache.qpid.server.util.MapValueConverter;
public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X>
implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, IConnectionRegistry.RegistryChangeListener, EventListener
{
+ private static enum BlockingType { STORE, FILESYSTEM };
+
private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery";
public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
@@ -116,7 +120,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
- private boolean _blocked;
+ private AtomicBoolean _blocked = new AtomicBoolean();
private final Map<String, MessageDestination> _systemNodeDestinations =
Collections.synchronizedMap(new HashMap<String,MessageDestination>());
@@ -134,6 +138,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private MessageStoreLogSubject _messageStoreLogSubject;
+ private final Set<BlockingType> _blockingReasons = Collections.synchronizedSet(EnumSet.noneOf(BlockingType.class));
+
+
@ManagedAttributeField
private boolean _queue_deadLetterQueueEnabled;
@@ -162,6 +169,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private MessageStore _messageStore;
private MessageStoreRecoverer _messageStoreRecoverer;
+ private final FileSystemSpaceChecker _fileSystemSpaceChecker = new FileSystemSpaceChecker();
+ private int _fileSystemMaxUsagePercent;
public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
@@ -285,6 +294,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
addChangeListener(new StoreUpdatingChangeListener());
+
+
+ _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
}
private void checkVHostStateIsActive()
@@ -694,6 +706,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
getMessageStore().closeMessageStore();
+
}
catch (StoreException e)
{
@@ -770,13 +783,13 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _dtxRegistry;
}
- public void block()
+ private void block(BlockingType blockingType)
{
synchronized (_connectionRegistry)
{
- if(!_blocked)
+ _blockingReasons.add(blockingType);
+ if(!_blocked.compareAndSet(false,true))
{
- _blocked = true;
for(AMQConnectionModel conn : _connectionRegistry.getConnections())
{
conn.block();
@@ -786,13 +799,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
- public void unblock()
+ private void unblock(BlockingType blockingType)
{
+
synchronized (_connectionRegistry)
{
- if(_blocked)
+ _blockingReasons.remove(blockingType);
+ if(_blockingReasons.isEmpty() && _blocked.compareAndSet(true,false))
{
- _blocked = false;
for(AMQConnectionModel conn : _connectionRegistry.getConnections())
{
conn.unblock();
@@ -803,7 +817,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
public void connectionRegistered(final AMQConnectionModel connection)
{
- if(_blocked)
+ if(_blocked.get())
{
connection.block();
}
@@ -824,11 +838,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
switch(event)
{
case PERSISTENT_MESSAGE_SIZE_OVERFULL:
- block();
+ block(BlockingType.STORE);
_eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.OVERFULL());
break;
case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
- unblock();
+ unblock(BlockingType.STORE);
_eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.UNDERFULL());
break;
}
@@ -1403,6 +1417,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
MessageStore messageStore = getMessageStore();
messageStore.openMessageStore(this);
+ startFileSystemSpaceChecking();
+
+
if (!(_virtualHostNode.getConfigurationStore() instanceof MessageStoreProvider))
{
getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CREATED());
@@ -1440,6 +1457,17 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
+ protected void startFileSystemSpaceChecking()
+ {
+ File storeLocationAsFile = _messageStore.getStoreLocationAsFile();
+ if (storeLocationAsFile != null && _fileSystemMaxUsagePercent > 0)
+ {
+ _fileSystemSpaceChecker.setFileSystem(storeLocationAsFile);
+
+ scheduleHouseKeepingTask(getHousekeepingCheckPeriod(), _fileSystemSpaceChecker);
+ }
+ }
+
@StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
private void onRestart()
{
@@ -1530,5 +1558,47 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
+ private class FileSystemSpaceChecker extends HouseKeepingTask
+ {
+ private boolean _fileSystemFull;
+ private File _fileSystem;
+
+ public FileSystemSpaceChecker()
+ {
+ super(AbstractVirtualHost.this);
+
+ }
+
+ @Override
+ public void execute()
+ {
+ long totalSpace = _fileSystem.getTotalSpace();
+ long freeSpace = _fileSystem.getFreeSpace();
+
+ long usagePercent = (100l * (totalSpace - freeSpace)) / totalSpace;
+
+ if (_fileSystemFull && (usagePercent < _fileSystemMaxUsagePercent))
+ {
+ _fileSystemFull = false;
+ getEventLogger().message(getMessageStoreLogSubject(), VirtualHostMessages.FILESYSTEM_NOTFULL(
+ _fileSystemMaxUsagePercent));
+ unblock(BlockingType.FILESYSTEM);
+ }
+ else if(!_fileSystemFull && usagePercent > _fileSystemMaxUsagePercent)
+ {
+ _fileSystemFull = true;
+ getEventLogger().message(getMessageStoreLogSubject(), VirtualHostMessages.FILESYSTEM_FULL(
+ _fileSystemMaxUsagePercent));
+ block(BlockingType.FILESYSTEM);
+ }
+
+ }
+
+ public void setFileSystem(final File fileSystem)
+ {
+ _fileSystem = fileSystem;
+ }
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 2ca1f1f5c8..29729b6c7d 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -104,10 +104,6 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM
ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
- public void block();
-
- public void unblock();
-
boolean getDefaultDeadLetterQueueEnabled();
EventLogger getEventLogger();
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
index 85960cd25b..2764597bd0 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
@@ -221,6 +221,12 @@ public class DerbyConfigurationStore extends AbstractJDBCConfigurationStore
}
@Override
+ public File getStoreLocationAsFile()
+ {
+ return DerbyUtils.isInMemoryDatabase(getStoreLocation()) ? null : new File(getStoreLocation());
+ }
+
+ @Override
protected Logger getLogger()
{
return DerbyConfigurationStore.this.getLogger();
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 8ba5af9130..7d1191da2c 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -109,5 +109,9 @@ public class DerbyMessageStore extends AbstractDerbyMessageStore
return ((FileBasedSettings)_parent).getStorePath();
}
-
+ @Override
+ public File getStoreLocationAsFile()
+ {
+ return DerbyUtils.isInMemoryDatabase(getStoreLocation()) ? null : new File(getStoreLocation());
+ }
}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
index b3c449747b..df9c48d6f9 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
@@ -57,7 +57,7 @@ public class DerbyUtils
public static String createConnectionUrl(final String name, final String databasePath)
{
// Derby wont use an existing directory, so we append parent name
- if (MEMORY_STORE_LOCATION.equals(databasePath))
+ if (isInMemoryDatabase(databasePath))
{
return "jdbc:derby:" + MEMORY_STORE_LOCATION + "/" + name + ";create=true";
}
@@ -79,6 +79,11 @@ public class DerbyUtils
}
+ public static boolean isInMemoryDatabase(final String databasePath)
+ {
+ return MEMORY_STORE_LOCATION.equals(databasePath);
+ }
+
public static void shutdownDatabase(String connectionURL) throws SQLException
{
try
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
index 13d5c54ee4..3c3b1da880 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
@@ -19,6 +19,7 @@
package org.apache.qpid.server.store.jdbc;
+import java.io.File;
import java.nio.charset.Charset;
import java.security.PrivilegedAction;
import java.sql.Blob;
@@ -287,6 +288,12 @@ public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStor
}
@Override
+ public File getStoreLocationAsFile()
+ {
+ return null;
+ }
+
+ @Override
protected Logger getLogger()
{
return GenericJDBCConfigurationStore.this.getLogger();
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
index 59e165acc2..a75ca4d685 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store.jdbc;
+import java.io.File;
import java.security.PrivilegedAction;
import java.sql.Blob;
import java.sql.Connection;
@@ -177,6 +178,12 @@ public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore
return _connectionURL;
}
+ @Override
+ public File getStoreLocationAsFile()
+ {
+ return null;
+ }
+
protected String getPlainTextPassword(final JDBCSettings settings)
{
return Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<String>()