summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-05-07 22:40:52 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-05-07 22:40:52 +0000
commit9eab96a9a3569486f6351c94abf4f95ed515e9b1 (patch)
treeae86cedd9fdcea4f49993e5a82954ccda53a1ed3 /qpid/java/broker
parent1427de0275b5db2c8619db9211435897123259d8 (diff)
downloadqpid-python-9eab96a9a3569486f6351c94abf4f95ed515e9b1.tar.gz
QPID-3986 : [Java Broker] Add producer flow control based on total disk usage
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1335290 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java41
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java111
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java212
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java61
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java85
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java28
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java91
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java9
18 files changed, 637 insertions, 130 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 8198cec821..4fd4e02220 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -23,7 +23,9 @@ package org.apache.qpid.server;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -32,10 +34,8 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
@@ -89,7 +89,6 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -157,7 +156,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
private final AMQProtocolSession _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
- private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
+ private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
private final AtomicBoolean _blocking = new AtomicBoolean(false);
@@ -1357,9 +1356,34 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
return _actor;
}
- public void block(AMQQueue queue)
+ public synchronized void block()
+ {
+ if(_blockingEntities.add(this))
+ {
+ if(_blocking.compareAndSet(false,true))
+ {
+ _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **"));
+ flow(false);
+ }
+ }
+ }
+
+ public synchronized void unblock()
+ {
+ if(_blockingEntities.remove(this))
+ {
+ if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
+ {
+ _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+
+ flow(true);
+ }
+ }
+ }
+
+ public synchronized void block(AMQQueue queue)
{
- if(_blockingQueues.add(queue))
+ if(_blockingEntities.add(queue))
{
if(_blocking.compareAndSet(false,true))
@@ -1370,11 +1394,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
- public void unblock(AMQQueue queue)
+ public synchronized void unblock(AMQQueue queue)
{
- if(_blockingQueues.remove(queue))
+ if(_blockingEntities.remove(queue))
{
- if(_blocking.compareAndSet(true,false) && !isClosing())
+ if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
{
_actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java
index 10e40151b0..aaa1766489 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java
@@ -61,7 +61,6 @@ public class TopicConfig extends ConfigurationPlugin
throw new ConfigurationException("Topic section must have a 'name' or 'subscriptionName' element.");
}
- System.err.println("********* Created TC:"+this);
}
@@ -75,5 +74,5 @@ public class TopicConfig extends ConfigurationPlugin
}
return response;
- }
+ }
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 4a58314f51..09dc5a2473 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.transport.TransportException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -37,6 +38,8 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>();
private Logger _logger = Logger.getLogger(ConnectionRegistry.class);
+ private final Collection<RegistryChangeListener> _listeners =
+ new ArrayList<RegistryChangeListener>();
public void initialise()
{
@@ -80,16 +83,48 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
public void registerConnection(AMQConnectionModel connnection)
{
- _registry.add(connnection);
+ synchronized (this)
+ {
+ _registry.add(connnection);
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.connectionRegistered(connnection);
+ }
+ }
+ }
}
public void deregisterConnection(AMQConnectionModel connnection)
{
- _registry.remove(connnection);
+ synchronized (this)
+ {
+ _registry.remove(connnection);
+
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.connectionUnregistered(connnection);
+ }
+ }
+ }
+ }
+
+ public void addRegistryChangeListener(RegistryChangeListener listener)
+ {
+ synchronized (_listeners)
+ {
+ _listeners.add(listener);
+ }
}
public List<AMQConnectionModel> getConnections()
{
- return new ArrayList<AMQConnectionModel>(_registry);
+ synchronized (this)
+ {
+ return new ArrayList<AMQConnectionModel>(_registry);
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
index 954c448b72..76d97e3ad1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
@@ -44,4 +44,13 @@ public interface IConnectionRegistry
public void registerConnection(AMQConnectionModel connnection);
public void deregisterConnection(AMQConnectionModel connnection);
+
+ void addRegistryChangeListener(RegistryChangeListener listener);
+
+ interface RegistryChangeListener
+ {
+ void connectionRegistered(AMQConnectionModel connection);
+ void connectionUnregistered(AMQConnectionModel connection);
+
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
index 081f2bbca3..d3823a71a0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
@@ -26,3 +26,5 @@ RECOVERY_START = MST-1004 : Recovery Start
RECOVERED = MST-1005 : Recovered {0,number} messages
RECOVERY_COMPLETE = MST-1006 : Recovery Complete
PASSIVATE = MST-1007 : Store Passivated
+OVERFULL = MST-1008 : Store overfull, flow control will be enforced
+UNDERFULL = MST-1009 : Store overfull condition cleared
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 5af3899890..b7fd2387a5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -32,23 +32,27 @@ public interface AMQConnectionModel extends StatisticsGatherer
{
/**
* get a unique id for this connection.
- *
+ *
* @return a {@link UUID} representing the connection
*/
public UUID getId();
-
+
/**
* Close the underlying Connection
- *
+ *
* @param cause
* @param message
* @throws org.apache.qpid.AMQException
*/
public void close(AMQConstant cause, String message) throws AMQException;
+ public void block();
+
+ public void unblock();
+
/**
* Close the given requested Session
- *
+ *
* @param session
* @param cause
* @param message
@@ -57,10 +61,10 @@ public interface AMQConnectionModel extends StatisticsGatherer
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
public long getConnectionId();
-
+
/**
* Get a list of all sessions using this connection.
- *
+ *
* @return a list of {@link AMQSessionModel}s
*/
public List<AMQSessionModel> getSessionModels();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index b750b29952..ae5ede5e82 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -20,14 +20,46 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.management.JMException;
+import javax.security.auth.Subject;
+import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -66,24 +98,6 @@ import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
-import javax.management.JMException;
-import javax.security.auth.Subject;
-import javax.security.sasl.SaslServer;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -160,6 +174,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private volatile boolean _deferFlush;
private long _lastReceivedTime;
+ private boolean _blocking;
public ManagedObject getManagedObject()
{
@@ -633,7 +648,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
else
{
- _channelMap.put(channel.getChannelId(), channel);
+ synchronized (_channelMap)
+ {
+ _channelMap.put(channel.getChannelId(), channel);
+ }
}
if (((channelId & CHANNEL_CACHE_SIZE) == channelId))
@@ -641,6 +659,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_cachedChannels[channelId] = channel;
}
+ if(_blocking)
+ {
+ channel.block();
+ }
+
checkForNotification();
}
@@ -735,10 +758,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
*/
public void removeChannel(int channelId)
{
- _channelMap.remove(channelId);
- if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ synchronized (_channelMap)
{
- _cachedChannels[channelId] = null;
+ _channelMap.remove(channelId);
+
+ if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ {
+ _cachedChannels[channelId] = null;
+ }
}
}
@@ -767,8 +794,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
channel.close();
}
-
- _channelMap.clear();
+ synchronized (_channelMap)
+ {
+ _channelMap.clear();
+ }
for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
{
_cachedChannels[i] = null;
@@ -1337,6 +1366,36 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
(Throwable) null));
}
+ public void block()
+ {
+ synchronized (_channelMap)
+ {
+ if(!_blocking)
+ {
+ _blocking = true;
+ for(AMQChannel channel : _channelMap.values())
+ {
+ channel.block();
+ }
+ }
+ }
+ }
+
+ public void unblock()
+ {
+ synchronized (_channelMap)
+ {
+ if(_blocking)
+ {
+ _blocking = false;
+ for(AMQChannel channel : _channelMap.values())
+ {
+ channel.unblock();
+ }
+ }
+ }
+ }
+
public boolean isClosed()
{
return _closed;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index fa171815ca..0896499cda 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -42,21 +42,21 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel>
public AMQConnectionModel getConnectionModel();
public String getClientID();
-
+
public void close() throws AMQException;
public LogSubject getLogSubject();
-
+
/**
* This method is called from the housekeeping thread to check the status of
* transactions on this session and react appropriately.
- *
+ *
* If a transaction is open for too long or idle for too long then a warning
* is logged or the connection is closed, depending on the configuration. An open
* transaction is one that has recent activity. The transaction age is counted
- * from the time the transaction was started. An idle transaction is one that
+ * from the time the transaction was started. An idle transaction is one that
* has had no activity, such as publishing or acknowledgeing messages.
- *
+ *
* @param openWarn time in milliseconds before alerting on open transaction
* @param openClose time in milliseconds before closing connection with open transaction
* @param idleWarn time in milliseconds before alerting on idle transaction
@@ -68,6 +68,10 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel>
void unblock(AMQQueue queue);
+ void block();
+
+ void unblock();
+
boolean onSameConnection(InboundMessage inbound);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
index bbde11ab4c..9b5ceef35f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
@@ -28,5 +28,7 @@ public enum Event
BEFORE_PASSIVATE,
AFTER_PASSIVATE,
BEFORE_CLOSE,
- AFTER_CLOSE
+ AFTER_CLOSE,
+ PERSISTENT_MESSAGE_SIZE_OVERFULL,
+ PERSISTENT_MESSAGE_SIZE_UNDERFULL,
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
index caff17daa5..4ab1a3ab05 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
@@ -33,7 +33,14 @@ public class OperationalLoggingListener implements EventListener
private OperationalLoggingListener(final MessageStore store, LogSubject logSubject)
{
_logSubject = logSubject;
- store.addEventListener(this, Event.BEFORE_INIT, Event.AFTER_INIT, Event.BEFORE_ACTIVATE, Event.AFTER_ACTIVATE, Event.AFTER_CLOSE);
+ store.addEventListener(this,
+ Event.BEFORE_INIT,
+ Event.AFTER_INIT,
+ Event.BEFORE_ACTIVATE,
+ Event.AFTER_ACTIVATE,
+ Event.AFTER_CLOSE,
+ Event.PERSISTENT_MESSAGE_SIZE_OVERFULL,
+ Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
_store = store;
}
@@ -62,7 +69,13 @@ public class OperationalLoggingListener implements EventListener
case AFTER_CLOSE:
CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
break;
-
+ case PERSISTENT_MESSAGE_SIZE_OVERFULL:
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.OVERFULL());
+ break;
+ case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.UNDERFULL());
+ break;
+
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 0371cdcfcb..36351cc426 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -31,6 +31,7 @@ import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Blob;
+import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
@@ -89,6 +90,9 @@ public class DerbyMessageStore implements MessageStore
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
+ public static final String OVERFULL_SIZE_PROPERTY = "overfull-size";
+ public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size";
+
private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
@@ -234,6 +238,11 @@ public class DerbyMessageStore implements MessageStore
private final EventManager _eventManager = new EventManager();
+ private long _totalStoreSize;
+ private boolean _limitBusted;
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+
private MessageStoreRecoveryHandler _messageRecoveryHandler;
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
@@ -308,7 +317,24 @@ public class DerbyMessageStore implements MessageStore
_storeLocation = databasePath;
+ _persistentSizeHighThreshold = storeConfiguration.getLong(OVERFULL_SIZE_PROPERTY, -1l);
+ _persistentSizeLowThreshold = storeConfiguration.getLong(UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+
createOrOpenDatabase(name, databasePath);
+
+ Connection conn = newAutoCommitConnection();;
+ try
+ {
+ _totalStoreSize = getSizeOnDisk(conn);
+ }
+ finally
+ {
+ conn.close();
+ }
}
private static synchronized void initialiseDriver() throws ClassNotFoundException
@@ -1921,6 +1947,7 @@ public class DerbyMessageStore implements MessageStore
private class DerbyTransaction implements Transaction
{
private final ConnectionWrapper _connWrapper;
+ private int _storeSizeIncrease;
private DerbyTransaction()
@@ -1938,18 +1965,19 @@ public class DerbyMessageStore implements MessageStore
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- if(message.getStoredMessage() instanceof StoredDerbyMessage)
+ final StoredMessage storedMessage = message.getStoredMessage();
+ if(storedMessage instanceof StoredDerbyMessage)
{
try
{
- ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection());
+ ((StoredDerbyMessage) storedMessage).store(_connWrapper.getConnection());
}
catch (SQLException e)
{
throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
}
}
-
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
@@ -1964,12 +1992,15 @@ public class DerbyMessageStore implements MessageStore
public void commitTran() throws AMQStoreException
{
DerbyMessageStore.this.commitTran(_connWrapper);
+ storedSizeChange(_storeSizeIncrease);
}
@Override
public StoreFuture commitTranAsync() throws AMQStoreException
{
- return DerbyMessageStore.this.commitTranAsync(_connWrapper);
+ final StoreFuture storeFuture = DerbyMessageStore.this.commitTranAsync(_connWrapper);
+ storedSizeChange(_storeSizeIncrease);
+ return storeFuture;
}
@Override
@@ -2111,6 +2142,7 @@ public class DerbyMessageStore implements MessageStore
conn.commit();
conn.close();
+ storedSizeChange(getMetaData().getContentSize());
}
}
catch (SQLException e)
@@ -2150,7 +2182,9 @@ public class DerbyMessageStore implements MessageStore
@Override
public void remove()
{
+ int delta = getMetaData().getContentSize();
DerbyMessageStore.this.removeMessage(_messageId);
+ storedSizeChange(-delta);
}
}
@@ -2446,4 +2480,174 @@ public class DerbyMessageStore implements MessageStore
}
return results;
}
+
+ private synchronized void storedSizeChange(final int delta)
+ {
+ if(getPersistentSizeHighThreshold() > 0)
+ {
+ synchronized(this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 3*delta;
+
+ Connection conn = null;
+ try
+ {
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk(conn);
+
+ _totalStoreSize = getSizeOnDisk(conn);
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Exception will processing store size change", e);
+ }
+ }
+ }
+ }
+
+ private void reduceSizeOnDisk(Connection conn)
+ {
+ CallableStatement cs = null;
+ PreparedStatement stmt = null;
+ try
+ {
+ String tableQuery =
+ "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'";
+ stmt = conn.prepareStatement(tableQuery);
+ ResultSet rs = null;
+
+ List<String> schemas = new ArrayList<String>();
+ List<String> tables = new ArrayList<String>();
+
+ try
+ {
+ rs = stmt.executeQuery();
+ while(rs.next())
+ {
+ schemas.add(rs.getString(1));
+ tables.add(rs.getString(2));
+ }
+ }
+ finally
+ {
+ if(rs != null)
+ {
+ rs.close();
+ }
+ }
+
+
+ cs = conn.prepareCall
+ ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)");
+
+ for(int i = 0; i < schemas.size(); i++)
+ {
+ cs.setString(1, schemas.get(i));
+ cs.setString(2, tables.get(i));
+ cs.setShort(3, (short) 0);
+ cs.execute();
+ }
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Error reducing on disk size", e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ closePreparedStatement(cs);
+ }
+
+ }
+
+ private long getSizeOnDisk(Connection conn)
+ {
+ PreparedStatement stmt = null;
+ try
+ {
+ String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" +
+ " FROM " +
+ " SYS.SYSTABLES systabs," +
+ " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" +
+ " WHERE systabs.tabletype = 'T'";
+
+ stmt = conn.prepareStatement(sizeQuery);
+
+ ResultSet rs = null;
+ long size = 0l;
+
+ try
+ {
+ rs = stmt.executeQuery();
+ while(rs.next())
+ {
+ size = rs.getLong(1);
+ }
+ }
+ finally
+ {
+ if(rs != null)
+ {
+ rs.close();
+ }
+ }
+
+ return size;
+
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Error establishing on disk size", e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ }
+
+ }
+
+
+ private long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ private long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
+ }
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index c38f3d0761..77d07e49f3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -74,7 +74,8 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
private ServerConnectionMBean _mBean;
private VirtualHost _virtualHost;
private AtomicLong _lastIoTime = new AtomicLong();
-
+ private boolean _blocking;
+
public ServerConnection(final long connectionId)
{
_connectionId = connectionId;
@@ -100,12 +101,12 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
protected void setState(State state)
{
super.setState(state);
-
+
if (state == State.OPEN)
{
if (_onOpenTask != null)
{
- _onOpenTask.run();
+ _onOpenTask.run();
}
_actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), true, true, true));
@@ -193,7 +194,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
((ServerSession)session).close();
}
-
+
public LogSubject getLogSubject()
{
return (LogSubject) this;
@@ -286,6 +287,46 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
close(replyCode, message);
}
+ public synchronized void block()
+ {
+ if(!_blocking)
+ {
+ _blocking = true;
+ for(AMQSessionModel ssn : getSessionModels())
+ {
+ ssn.block();
+ }
+ }
+ }
+
+ public synchronized void unblock()
+ {
+ if(_blocking)
+ {
+ _blocking = false;
+ for(AMQSessionModel ssn : getSessionModels())
+ {
+ ssn.unblock();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void registerSession(final Session ssn)
+ {
+ super.registerSession(ssn);
+ if(_blocking)
+ {
+ ((ServerSession)ssn).block();
+ }
+ }
+
+ @Override
+ public synchronized void removeSession(final Session ssn)
+ {
+ super.removeSession(ssn);
+ }
+
public List<AMQSessionModel> getSessionModels()
{
List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
@@ -315,27 +356,27 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
}
_virtualHost.registerMessageReceived(messageSize, timestamp);
}
-
+
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
-
+
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
-
+
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
-
+
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
-
+
public void resetStatistics()
{
_messagesDelivered.reset();
@@ -348,7 +389,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
{
setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
_virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled());
-
+
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
_dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
_messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 5460c89eab..0d8036ec3a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -70,7 +70,7 @@ public class ServerConnectionDelegate extends ServerDelegate
String localFQDN)
{
super(properties, parseToList(appRegistry.getAuthenticationManager().getMechanisms()), locales);
-
+
_appRegistry = appRegistry;
_localFQDN = localFQDN;
_maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
@@ -118,8 +118,8 @@ public class ServerConnectionDelegate extends ServerDelegate
{
final AuthenticationResult authResult = _appRegistry.getAuthenticationManager().authenticate(ss, response);
final ServerConnection sconn = (ServerConnection) conn;
-
-
+
+
if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
{
tuneAuthorizedConnection(sconn);
@@ -168,7 +168,7 @@ public class ServerConnectionDelegate extends ServerDelegate
vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
-
+
if(vhost != null)
{
sconn.setVirtualHost(vhost);
@@ -194,7 +194,7 @@ public class ServerConnectionDelegate extends ServerDelegate
sconn.setState(Connection.State.CLOSING);
sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
}
-
+
}
@Override
@@ -216,7 +216,7 @@ public class ServerConnectionDelegate extends ServerDelegate
setConnectionTuneOkChannelMax(sconn, okChannelMax);
}
-
+
@Override
protected int getHeartbeatMax()
{
@@ -265,7 +265,9 @@ public class ServerConnectionDelegate extends ServerDelegate
if(isSessionNameUnique(atc.getName(), conn))
{
super.sessionAttach(conn, atc);
- ((ServerConnection)conn).checkForNotification();
+ final ServerConnection serverConnection = (ServerConnection) conn;
+
+ serverConnection.checkForNotification();
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 6f979e035e..d4631ae675 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.transport;
+import java.util.Collections;
+import java.util.HashSet;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageMetaData_0_10;
@@ -40,7 +42,6 @@ import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -90,12 +91,12 @@ import org.apache.qpid.transport.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ServerSession extends Session
- implements AuthorizationHolder, SessionConfig,
+public class ServerSession extends Session
+ implements AuthorizationHolder, SessionConfig,
AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
-
+
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
@@ -105,7 +106,7 @@ public class ServerSession extends Session
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
- private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
+ private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
@@ -167,9 +168,19 @@ public class ServerSession extends Session
if (state == State.OPEN)
{
_actor.message(ChannelMessages.CREATE());
+ if(_blocking.get())
+ {
+ invokeBlock();
+ }
}
}
+ private void invokeBlock()
+ {
+ invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
+ invoke(new MessageStop(""));
+ }
+
private ConfigStore getConfigStore()
{
return getConnectionConfig().getConfigStore();
@@ -455,7 +466,7 @@ public class ServerSession extends Session
{
return _transaction.isTransactional();
}
-
+
public boolean inTransaction()
{
return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
@@ -630,7 +641,7 @@ public class ServerSession extends Session
{
return getConnection().getAuthorizedPrincipal();
}
-
+
public Subject getAuthorizedSubject()
{
return getConnection().getAuthorizedSubject();
@@ -781,37 +792,65 @@ public class ServerSession extends Session
public void block(AMQQueue queue)
{
+ block(queue, queue.getName());
+ }
- if(_blockingQueues.add(queue))
- {
+ public void block()
+ {
+ block(this, "** All Queues **");
+ }
- if(_blocking.compareAndSet(false,true))
+
+ private void block(Object queue, String name)
+ {
+ synchronized (_blockingEntities)
+ {
+ if(_blockingEntities.add(queue))
{
- invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
- invoke(new MessageStop(""));
- _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString()));
- }
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ if(getState() == State.OPEN)
+ {
+ invokeBlock();
+ }
+ _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+ }
+ }
}
}
public void unblock(AMQQueue queue)
{
- if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty())
+ unblock((Object)queue);
+ }
+
+ public void unblock()
+ {
+ unblock(this);
+ }
+
+ private void unblock(Object queue)
+ {
+ synchronized(_blockingEntities)
{
- if(_blocking.compareAndSet(true,false) && !isClosing())
+ if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
{
+ if(_blocking.compareAndSet(true,false) && !isClosing())
+ {
- _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
- MessageFlow mf = new MessageFlow();
- mf.setUnit(MessageCreditUnit.MESSAGE);
- mf.setDestination("");
- _outstandingCredit.set(Integer.MAX_VALUE);
- mf.setValue(Integer.MAX_VALUE);
- invoke(mf);
+ _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _outstandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
+ }
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 79a8bc0e4c..85ea97c107 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -99,9 +99,9 @@ public class ServerSessionDelegate extends SessionDelegate
Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
if(newOutstanding == null || newOutstanding == asyncCommandMark)
{
- session.processed(method);
+ session.processed(method);
}
-
+
if(newOutstanding != null)
{
((ServerSession)session).completeAsyncCommands();
@@ -240,13 +240,13 @@ public class ServerSessionDelegate extends SessionDelegate
}
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
-
+
FilterManager filterManager = null;
- try
+ try
{
filterManager = FilterManagerFactory.createManager(method.getArguments());
- }
- catch (AMQException amqe)
+ }
+ catch (AMQException amqe)
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager");
return;
@@ -257,7 +257,7 @@ public class ServerSessionDelegate extends SessionDelegate
method.getAcceptMode(),
method.getAcquireMode(),
MessageFlowMode.WINDOW,
- creditManager,
+ creditManager,
filterManager,
method.getArguments());
@@ -297,13 +297,13 @@ public class ServerSessionDelegate extends SessionDelegate
final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
messageMetaData.setConnectionReference(((ServerSession)ssn).getReference());
-
+
if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
{
ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
String description = "Permission denied: exchange-name '" + exchange.getName() + "'";
exception(ssn, xfr, errorCode, description);
-
+
return;
}
@@ -807,7 +807,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
- // TODO decouple AMQException and AMQConstant error codes
+ // TODO decouple AMQException and AMQConstant error codes
private void exception(Session session, Method method, AMQException exception, String message)
{
ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
@@ -823,7 +823,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
String description = message + "': " + exception.getMessage();
-
+
exception(session, method, errorCode, description);
}
@@ -1349,9 +1349,9 @@ public class ServerSessionDelegate extends SessionDelegate
+ " as exclusive queue with same name "
+ "declared on another session";
ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
-
+
exception(session, method, errorCode, description);
-
+
return;
}
}
@@ -1436,7 +1436,7 @@ public class ServerSessionDelegate extends SessionDelegate
else
{
VirtualHost virtualHost = getVirtualHost(session);
-
+
try
{
queue.delete();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 489b985222..c59016173a 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -74,7 +74,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
int getHouseKeepingPoolSize();
- void setHouseKeepingPoolSize(int newSize);
+ void setHouseKeepingPoolSize(int newSize);
int getHouseKeepingActiveCount();
@@ -102,4 +102,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
State getState();
+
+ public void block();
+
+ public void unblock();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 9b113525d4..b05025467d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -80,7 +80,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-public class VirtualHostImpl implements VirtualHost
+public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
{
private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
@@ -129,6 +129,7 @@ public class VirtualHostImpl implements VirtualHost
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
+ private boolean _blocked;
public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
{
@@ -157,6 +158,7 @@ public class VirtualHostImpl implements VirtualHost
_securityManager.configureHostPlugins(_vhostConfig);
_connectionRegistry = new ConnectionRegistry();
+ _connectionRegistry.addRegistryChangeListener(this);
_houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
@@ -178,6 +180,9 @@ public class VirtualHostImpl implements VirtualHost
activateNonHAMessageStore();
initialiseStatistics();
+
+ _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
}
public IConnectionRegistry getConnectionRegistry()
@@ -558,7 +563,7 @@ public class VirtualHostImpl implements VirtualHost
{
return _bindingFactory;
}
-
+
public void registerMessageDelivered(long messageSize)
{
if (isStatisticsEnabled())
@@ -568,7 +573,7 @@ public class VirtualHostImpl implements VirtualHost
}
_appRegistry.registerMessageDelivered(messageSize);
}
-
+
public void registerMessageReceived(long messageSize, long timestamp)
{
if (isStatisticsEnabled())
@@ -578,34 +583,34 @@ public class VirtualHostImpl implements VirtualHost
}
_appRegistry.registerMessageReceived(messageSize, timestamp);
}
-
+
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
-
+
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
-
+
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
-
+
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
-
+
public void resetStatistics()
{
_messagesDelivered.reset();
_dataDelivered.reset();
_messagesReceived.reset();
_dataReceived.reset();
-
+
for (AMQConnectionModel connection : _connectionRegistry.getConnections())
{
connection.resetStatistics();
@@ -616,7 +621,7 @@ public class VirtualHostImpl implements VirtualHost
{
setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
_appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
-
+
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
_dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
_messagesReceived = new StatisticsCounter("messages-received-" + getName());
@@ -699,18 +704,72 @@ public class VirtualHostImpl implements VirtualHost
return _dtxRegistry;
}
- @Override
public String toString()
{
return _name;
}
- @Override
public State getState()
{
return _state;
}
+ public void block()
+ {
+ synchronized (_connectionRegistry)
+ {
+ if(!_blocked)
+ {
+ _blocked = true;
+ for(AMQConnectionModel conn : _connectionRegistry.getConnections())
+ {
+ conn.block();
+ }
+ }
+ }
+ }
+
+
+ public void unblock()
+ {
+ synchronized (_connectionRegistry)
+ {
+ if(_blocked)
+ {
+ _blocked = false;
+ for(AMQConnectionModel conn : _connectionRegistry.getConnections())
+ {
+ conn.unblock();
+ }
+ }
+ }
+ }
+
+ public void connectionRegistered(final AMQConnectionModel connection)
+ {
+ if(_blocked)
+ {
+ connection.block();
+ }
+ }
+
+ public void connectionUnregistered(final AMQConnectionModel connection)
+ {
+ }
+
+ public void event(final Event event)
+ {
+ switch(event)
+ {
+ case PERSISTENT_MESSAGE_SIZE_OVERFULL:
+ block();
+ break;
+ case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
+ unblock();
+ break;
+ }
+ }
+
/**
* Virtual host JMX MBean class.
@@ -750,7 +809,8 @@ public class VirtualHostImpl implements VirtualHost
{
_exchangeRegistry.initialise();
initialiseModel(_vhostConfig);
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new RuntimeException("Failed to initialise virtual host after state change", e);
}
@@ -766,7 +826,8 @@ public class VirtualHostImpl implements VirtualHost
try
{
_brokerMBean.register();
- } catch (JMException e)
+ }
+ catch (JMException e)
{
throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
}
@@ -777,8 +838,6 @@ public class VirtualHostImpl implements VirtualHost
public class BeforePassivationListener implements EventListener
{
-
- @Override
public void event(Event event)
{
_connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 91174c5d10..58c7625ad6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -282,9 +282,16 @@ public class MockVirtualHost implements VirtualHost
}
- @Override
public State getState()
{
return State.ACTIVE;
}
+
+ public void block()
+ {
+ }
+
+ public void unblock()
+ {
+ }
} \ No newline at end of file