diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-06 11:05:05 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-06 11:05:05 +0000 |
| commit | 90c8a29045f18554fd4c2da5ad01dd00af11cae7 (patch) | |
| tree | 6a057b68d8079713557182bff34a25b5e78372ee /qpid/java/broker-core | |
| parent | 9c7fb20582566d7a53a78bdca9d46f5d5b0b3fb7 (diff) | |
| download | qpid-python-90c8a29045f18554fd4c2da5ad01dd00af11cae7.tar.gz | |
QPID-5965 : [Java Broker] flow transient messages to disk in low memory situations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616155 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
12 files changed, 208 insertions, 10 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java index 4f2327adee..c3fd0ba428 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java @@ -20,14 +20,6 @@ */ package org.apache.qpid.server.message.internal; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.AbstractServerMessageImpl; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.util.ByteBufferInputStream; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; @@ -39,6 +31,13 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.AbstractServerMessageImpl; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.util.ByteBufferInputStream; + public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, InternalMessageMetaData> { private final Object _messageBody; @@ -239,6 +238,18 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, { throw new UnsupportedOperationException(); } + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } }; } catch (IOException e) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 8c8c5d4b9d..7d49d0b85f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -56,6 +56,7 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay"; String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute"; + String BROKER_FLOW_TO_DISK_THRESHOLD = "broker.flowToDiskThreshold"; String QPID_AMQP_PORT = "qpid.amqp_port"; String QPID_HTTP_PORT = "qpid.http_port"; @@ -74,6 +75,9 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL @ManagedContextDefault(name = QPID_JMX_PORT) public static final String DEFAULT_JMX_PORT_NUMBER = "9099"; + @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD) + public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory()); + @DerivedAttribute String getBuildVersion(); @@ -190,4 +194,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL AuthenticationProvider<?> getManagementModeAuthenticationProvider(); + void assignTargetSizes(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index e98f6cd19b..4e216925e4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -49,6 +49,14 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl"; String MINIMUM_MESSAGE_TTL = "minimumMessageTtl"; + String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint"; + @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT) + long DEFAULT_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = 102400l; + + String QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = "queue.estimatedMessageMemoryOverhead"; + @ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) + long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l; + @ManagedAttribute Exchange getAlternateExchange(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 518141a21d..6714a53e9e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -172,4 +172,9 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, MessageStore getMessageStore(); String getType(); + + void setTargetSize(long targetSize); + + long getTotalQueueDepthBytes(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 2d8a64b920..67c713e9d9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -384,6 +384,39 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple return children; } + @Override + public synchronized void assignTargetSizes() + { + long totalTarget = getContextValue(Long.class,BROKER_FLOW_TO_DISK_THRESHOLD); + long totalSize = 0l; + Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes(); + Map<VirtualHost<?,?,?>,Long> vhs = new HashMap<>(); + for(VirtualHostNode<?> vhn : vhns) + { + VirtualHost<?, ?, ?> vh = vhn.getVirtualHost(); + if(vh != null) + { + long totalQueueDepthBytes = vh.getTotalQueueDepthBytes(); + vhs.put(vh,totalQueueDepthBytes); + totalSize += totalQueueDepthBytes; + } + } + + for(Map.Entry<VirtualHost<?, ?, ?>,Long> entry : vhs.entrySet()) + { + + long size = (long) (entry.getValue().doubleValue() * ((double) totalTarget / (double) totalSize)); + entry.getKey().setTargetSize(size); + } + } + + @Override + protected void onOpen() + { + super.onOpen(); + assignTargetSizes(); + } + public AuthenticationProvider<?> findAuthenticationProviderByName(String authenticationProviderName) { if (isManagementMode()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 03bba43d57..9cfa7dbcf3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -111,4 +111,8 @@ public interface AMQQueue<X extends AMQQueue<X>> void completeRecovery(); void recover(ServerMessage<?> message); + + void setTargetSize(long targetSize); + + long getPotentialMemoryFootprint(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 11d5cc733f..c889fa7740 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -79,6 +79,7 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -113,6 +114,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } }; + private static final long INITIAL_TARGET_QUEUE_SIZE = 102400l; + private final VirtualHostImpl _virtualHost; private final DeletedChildListener _deletedChildListener = new DeletedChildListener(); @@ -130,6 +133,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private final AtomicLong _atomicQueueSize = new AtomicLong(0L); + private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE); + private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); private final AtomicLong _totalMessagesReceived = new AtomicLong(); @@ -924,6 +929,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> incrementQueueCount(); incrementQueueSize(message); + if((_atomicQueueSize.get() + _atomicQueueCount.get()*1024l) > _targetQueueSize.get() && message.getStoredMessage().isInMemory()) + { + message.getStoredMessage().flowToDisk(); + } + _totalMessagesReceived.incrementAndGet(); if(_recovering.get()) @@ -1206,6 +1216,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } + @Override + public void setTargetSize(final long targetSize) + { + _targetQueueSize.set(targetSize); + } + public long getTotalDequeuedMessages() { return _dequeueCount.get(); @@ -2188,6 +2204,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { QueueEntryIterator queueListIterator = getEntries().iterator(); + long totalSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages(); + long targetSize = _targetQueueSize.get(); + while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); @@ -2210,8 +2229,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> // the time the check actually occurs. So verify we // can actually get the message to perform the check. ServerMessage msg = node.getMessage(); + if (msg != null) { + totalSize += msg.getSize(); + StoredMessage storedMessage = msg.getStoredMessage(); + if(totalSize > targetSize && storedMessage.isInMemory()) + { + storedMessage.flowToDisk(); + } checkForNotification(msg); } } @@ -2220,6 +2246,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } + @Override + public long getPotentialMemoryFootprint() + { + return Math.max(getContextValue(Long.class,QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT), + getQueueDepthBytes() + getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages()); + } + public long getAlertRepeatGap() { return _alertRepeatGap; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 1a1085339d..bb7a726a0c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -1576,6 +1576,31 @@ public abstract class AbstractJDBCMessageStore implements MessageStore storedSizeChange(-delta); } + @Override + public boolean isInMemory() + { + return _messageDataRef.isHardRef(); + } + + @Override + public boolean flowToDisk() + { + try(Connection conn = newConnection()) + { + store(conn); + conn.commit(); + } + catch (SQLException e) + { + throw new StoreException(e); + } + finally + { + + } + return true; + } + private synchronized Runnable store(final Connection conn) throws SQLException { if (!stored()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index e1043e8807..e8402c9268 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -130,4 +130,16 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S public void remove() { } + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java index 6beb74f4ae..7561b4a11c 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java @@ -35,4 +35,8 @@ public interface StoredMessage<M extends StorableMessageMetaData> ByteBuffer getContent(int offsetInMessage, int size); void remove(); + + boolean isInMemory(); + + boolean flowToDisk(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 707be9ed7b..f15f608907 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -36,6 +36,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; @@ -130,6 +131,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private final AtomicBoolean _deleted = new AtomicBoolean(); private final VirtualHostNode<?> _virtualHostNode; + private final AtomicLong _targetSize = new AtomicLong(1024*1024); + private MessageStoreLogSubject _messageStoreLogSubject; @ManagedAttributeField @@ -847,6 +850,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte public void execute() { + VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class); + Broker<?> broker = virtualHostNode.getParent(Broker.class); + broker.assignTargetSizes(); + for (AMQQueue<?> q : getQueues()) { if (q.getState() == State.ACTIVE) @@ -1311,6 +1318,46 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @Override + public void setTargetSize(final long targetSize) + { + _targetSize.set(targetSize); + allocateTargetSizeToQueues(); + } + + private void allocateTargetSizeToQueues() + { + long targetSize = _targetSize.get(); + Collection<AMQQueue<?>> queues = getQueues(); + long totalSize = calculateTotalEnqueuedSize(queues); + if(targetSize > 0l) + { + for (AMQQueue<?> q : queues) + { + long size = (long) ((((double) q.getPotentialMemoryFootprint() / (double) totalSize)) + * (double) targetSize); + + q.setTargetSize(size); + } + } + } + + @Override + public long getTotalQueueDepthBytes() + { + return calculateTotalEnqueuedSize(getQueues()); + } + + private long calculateTotalEnqueuedSize(final Collection<AMQQueue<?>> queues) + { + long total = 0; + for(AMQQueue<?> queue : queues) + { + total += queue.getPotentialMemoryFootprint(); + } + return total; + } + + @Override protected void onCreate() { super.onCreate(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java index 750efc23ae..899cfdcd6e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -147,6 +149,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer entry.getValue().release(); entry.setValue(null); // free up any memory associated with the reference object } + final List<StoredMessage<?>> messagesToDelete = new ArrayList<>(); getStore().visitMessages(new MessageHandler() { @Override @@ -156,12 +159,19 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer long messageNumber = storedMessage.getMessageNumber(); if(!_recoveredMessages.containsKey(messageNumber)) { - _logger.info("Message id " + messageNumber + " in store, but not in any queue - removing...."); - storedMessage.remove(); + messagesToDelete.add(storedMessage); } return messageNumber <_maxMessageId-1; } }); + for(StoredMessage<?> storedMessage : messagesToDelete) + { + + _logger.info("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing...."); + storedMessage.remove(); + } + + messagesToDelete.clear(); _recoveredMessages.clear(); } |
