summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-06 11:05:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-06 11:05:05 +0000
commit90c8a29045f18554fd4c2da5ad01dd00af11cae7 (patch)
tree6a057b68d8079713557182bff34a25b5e78372ee /qpid/java/broker-core
parent9c7fb20582566d7a53a78bdca9d46f5d5b0b3fb7 (diff)
downloadqpid-python-90c8a29045f18554fd4c2da5ad01dd00af11cae7.tar.gz
QPID-5965 : [Java Broker] flow transient messages to disk in low memory situations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616155 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java27
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java25
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java12
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java47
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java14
12 files changed, 208 insertions, 10 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 4f2327adee..c3fd0ba428 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -20,14 +20,6 @@
*/
package org.apache.qpid.server.message.internal;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.util.ByteBufferInputStream;
-
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -39,6 +31,13 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.util.ByteBufferInputStream;
+
public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, InternalMessageMetaData>
{
private final Object _messageBody;
@@ -239,6 +238,18 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}
catch (IOException e)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 8c8c5d4b9d..7d49d0b85f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -56,6 +56,7 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay";
String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute";
+ String BROKER_FLOW_TO_DISK_THRESHOLD = "broker.flowToDiskThreshold";
String QPID_AMQP_PORT = "qpid.amqp_port";
String QPID_HTTP_PORT = "qpid.http_port";
@@ -74,6 +75,9 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedContextDefault(name = QPID_JMX_PORT)
public static final String DEFAULT_JMX_PORT_NUMBER = "9099";
+ @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
+ public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory());
+
@DerivedAttribute
String getBuildVersion();
@@ -190,4 +194,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
AuthenticationProvider<?> getManagementModeAuthenticationProvider();
+ void assignTargetSizes();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index e98f6cd19b..4e216925e4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -49,6 +49,14 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
+ String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
+ @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
+ long DEFAULT_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = 102400l;
+
+ String QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = "queue.estimatedMessageMemoryOverhead";
+ @ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD)
+ long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l;
+
@ManagedAttribute
Exchange getAlternateExchange();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 518141a21d..6714a53e9e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -172,4 +172,9 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
MessageStore getMessageStore();
String getType();
+
+ void setTargetSize(long targetSize);
+
+ long getTotalQueueDepthBytes();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 2d8a64b920..67c713e9d9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -384,6 +384,39 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
return children;
}
+ @Override
+ public synchronized void assignTargetSizes()
+ {
+ long totalTarget = getContextValue(Long.class,BROKER_FLOW_TO_DISK_THRESHOLD);
+ long totalSize = 0l;
+ Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
+ Map<VirtualHost<?,?,?>,Long> vhs = new HashMap<>();
+ for(VirtualHostNode<?> vhn : vhns)
+ {
+ VirtualHost<?, ?, ?> vh = vhn.getVirtualHost();
+ if(vh != null)
+ {
+ long totalQueueDepthBytes = vh.getTotalQueueDepthBytes();
+ vhs.put(vh,totalQueueDepthBytes);
+ totalSize += totalQueueDepthBytes;
+ }
+ }
+
+ for(Map.Entry<VirtualHost<?, ?, ?>,Long> entry : vhs.entrySet())
+ {
+
+ long size = (long) (entry.getValue().doubleValue() * ((double) totalTarget / (double) totalSize));
+ entry.getKey().setTargetSize(size);
+ }
+ }
+
+ @Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ assignTargetSizes();
+ }
+
public AuthenticationProvider<?> findAuthenticationProviderByName(String authenticationProviderName)
{
if (isManagementMode())
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 03bba43d57..9cfa7dbcf3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -111,4 +111,8 @@ public interface AMQQueue<X extends AMQQueue<X>>
void completeRecovery();
void recover(ServerMessage<?> message);
+
+ void setTargetSize(long targetSize);
+
+ long getPotentialMemoryFootprint();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 11d5cc733f..c889fa7740 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -79,6 +79,7 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -113,6 +114,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
};
+ private static final long INITIAL_TARGET_QUEUE_SIZE = 102400l;
+
private final VirtualHostImpl _virtualHost;
private final DeletedChildListener _deletedChildListener = new DeletedChildListener();
@@ -130,6 +133,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
+ private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
+
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
private final AtomicLong _totalMessagesReceived = new AtomicLong();
@@ -924,6 +929,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
incrementQueueCount();
incrementQueueSize(message);
+ if((_atomicQueueSize.get() + _atomicQueueCount.get()*1024l) > _targetQueueSize.get() && message.getStoredMessage().isInMemory())
+ {
+ message.getStoredMessage().flowToDisk();
+ }
+
_totalMessagesReceived.incrementAndGet();
if(_recovering.get())
@@ -1206,6 +1216,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
+ @Override
+ public void setTargetSize(final long targetSize)
+ {
+ _targetQueueSize.set(targetSize);
+ }
+
public long getTotalDequeuedMessages()
{
return _dequeueCount.get();
@@ -2188,6 +2204,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
QueueEntryIterator queueListIterator = getEntries().iterator();
+ long totalSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages();
+ long targetSize = _targetQueueSize.get();
+
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
@@ -2210,8 +2229,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
// the time the check actually occurs. So verify we
// can actually get the message to perform the check.
ServerMessage msg = node.getMessage();
+
if (msg != null)
{
+ totalSize += msg.getSize();
+ StoredMessage storedMessage = msg.getStoredMessage();
+ if(totalSize > targetSize && storedMessage.isInMemory())
+ {
+ storedMessage.flowToDisk();
+ }
checkForNotification(msg);
}
}
@@ -2220,6 +2246,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
+ @Override
+ public long getPotentialMemoryFootprint()
+ {
+ return Math.max(getContextValue(Long.class,QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT),
+ getQueueDepthBytes() + getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages());
+ }
+
public long getAlertRepeatGap()
{
return _alertRepeatGap;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 1a1085339d..bb7a726a0c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -1576,6 +1576,31 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
storedSizeChange(-delta);
}
+ @Override
+ public boolean isInMemory()
+ {
+ return _messageDataRef.isHardRef();
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ try(Connection conn = newConnection())
+ {
+ store(conn);
+ conn.commit();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException(e);
+ }
+ finally
+ {
+
+ }
+ return true;
+ }
+
private synchronized Runnable store(final Connection conn) throws SQLException
{
if (!stored())
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index e1043e8807..e8402c9268 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -130,4 +130,16 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
public void remove()
{
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index 6beb74f4ae..7561b4a11c 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -35,4 +35,8 @@ public interface StoredMessage<M extends StorableMessageMetaData>
ByteBuffer getContent(int offsetInMessage, int size);
void remove();
+
+ boolean isInMemory();
+
+ boolean flowToDisk();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 707be9ed7b..f15f608907 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
@@ -130,6 +131,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private final AtomicBoolean _deleted = new AtomicBoolean();
private final VirtualHostNode<?> _virtualHostNode;
+ private final AtomicLong _targetSize = new AtomicLong(1024*1024);
+
private MessageStoreLogSubject _messageStoreLogSubject;
@ManagedAttributeField
@@ -847,6 +850,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
public void execute()
{
+ VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class);
+ Broker<?> broker = virtualHostNode.getParent(Broker.class);
+ broker.assignTargetSizes();
+
for (AMQQueue<?> q : getQueues())
{
if (q.getState() == State.ACTIVE)
@@ -1311,6 +1318,46 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
+ public void setTargetSize(final long targetSize)
+ {
+ _targetSize.set(targetSize);
+ allocateTargetSizeToQueues();
+ }
+
+ private void allocateTargetSizeToQueues()
+ {
+ long targetSize = _targetSize.get();
+ Collection<AMQQueue<?>> queues = getQueues();
+ long totalSize = calculateTotalEnqueuedSize(queues);
+ if(targetSize > 0l)
+ {
+ for (AMQQueue<?> q : queues)
+ {
+ long size = (long) ((((double) q.getPotentialMemoryFootprint() / (double) totalSize))
+ * (double) targetSize);
+
+ q.setTargetSize(size);
+ }
+ }
+ }
+
+ @Override
+ public long getTotalQueueDepthBytes()
+ {
+ return calculateTotalEnqueuedSize(getQueues());
+ }
+
+ private long calculateTotalEnqueuedSize(final Collection<AMQQueue<?>> queues)
+ {
+ long total = 0;
+ for(AMQQueue<?> queue : queues)
+ {
+ total += queue.getPotentialMemoryFootprint();
+ }
+ return total;
+ }
+
+ @Override
protected void onCreate()
{
super.onCreate();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index 750efc23ae..899cfdcd6e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -147,6 +149,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
entry.getValue().release();
entry.setValue(null); // free up any memory associated with the reference object
}
+ final List<StoredMessage<?>> messagesToDelete = new ArrayList<>();
getStore().visitMessages(new MessageHandler()
{
@Override
@@ -156,12 +159,19 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
long messageNumber = storedMessage.getMessageNumber();
if(!_recoveredMessages.containsKey(messageNumber))
{
- _logger.info("Message id " + messageNumber + " in store, but not in any queue - removing....");
- storedMessage.remove();
+ messagesToDelete.add(storedMessage);
}
return messageNumber <_maxMessageId-1;
}
});
+ for(StoredMessage<?> storedMessage : messagesToDelete)
+ {
+
+ _logger.info("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing....");
+ storedMessage.remove();
+ }
+
+ messagesToDelete.clear();
_recoveredMessages.clear();
}