summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java13
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java27
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java25
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java12
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java47
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java14
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java15
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java15
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java13
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java17
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java15
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java13
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java14
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java14
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java13
23 files changed, 348 insertions, 24 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 7e5f5bbb3f..338882e6df 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -1424,12 +1424,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore
storedSizeChangeOccurred(-delta);
}
+ @Override
+ public boolean isInMemory()
+ {
+ return _messageDataRef.isHardRef();
+ }
+
private boolean stored()
{
return !_messageDataRef.isHardRef();
}
@Override
+ public boolean flowToDisk()
+ {
+ flushToStore();
+ return true;
+ }
+
+ @Override
public String toString()
{
return this.getClass() + "[messageId=" + _messageId + "]";
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java
index 74242df7c6..55805b5626 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java
@@ -338,6 +338,18 @@ public class BDBHAReplicaVirtualHost extends AbstractConfiguredObject<BDBHARepli
}
@Override
+ public void setTargetSize(final long targetSize)
+ {
+
+ }
+
+ @Override
+ public long getTotalQueueDepthBytes()
+ {
+ return 0l;
+ }
+
+ @Override
public org.apache.qpid.server.security.SecurityManager getSecurityManager()
{
return null;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 4f2327adee..c3fd0ba428 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -20,14 +20,6 @@
*/
package org.apache.qpid.server.message.internal;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.util.ByteBufferInputStream;
-
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -39,6 +31,13 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.util.ByteBufferInputStream;
+
public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, InternalMessageMetaData>
{
private final Object _messageBody;
@@ -239,6 +238,18 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}
catch (IOException e)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 8c8c5d4b9d..7d49d0b85f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -56,6 +56,7 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay";
String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute";
+ String BROKER_FLOW_TO_DISK_THRESHOLD = "broker.flowToDiskThreshold";
String QPID_AMQP_PORT = "qpid.amqp_port";
String QPID_HTTP_PORT = "qpid.http_port";
@@ -74,6 +75,9 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedContextDefault(name = QPID_JMX_PORT)
public static final String DEFAULT_JMX_PORT_NUMBER = "9099";
+ @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
+ public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory());
+
@DerivedAttribute
String getBuildVersion();
@@ -190,4 +194,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
AuthenticationProvider<?> getManagementModeAuthenticationProvider();
+ void assignTargetSizes();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index e98f6cd19b..4e216925e4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -49,6 +49,14 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
+ String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
+ @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
+ long DEFAULT_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = 102400l;
+
+ String QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = "queue.estimatedMessageMemoryOverhead";
+ @ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD)
+ long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l;
+
@ManagedAttribute
Exchange getAlternateExchange();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 518141a21d..6714a53e9e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -172,4 +172,9 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
MessageStore getMessageStore();
String getType();
+
+ void setTargetSize(long targetSize);
+
+ long getTotalQueueDepthBytes();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 2d8a64b920..67c713e9d9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -384,6 +384,39 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
return children;
}
+ @Override
+ public synchronized void assignTargetSizes()
+ {
+ long totalTarget = getContextValue(Long.class,BROKER_FLOW_TO_DISK_THRESHOLD);
+ long totalSize = 0l;
+ Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
+ Map<VirtualHost<?,?,?>,Long> vhs = new HashMap<>();
+ for(VirtualHostNode<?> vhn : vhns)
+ {
+ VirtualHost<?, ?, ?> vh = vhn.getVirtualHost();
+ if(vh != null)
+ {
+ long totalQueueDepthBytes = vh.getTotalQueueDepthBytes();
+ vhs.put(vh,totalQueueDepthBytes);
+ totalSize += totalQueueDepthBytes;
+ }
+ }
+
+ for(Map.Entry<VirtualHost<?, ?, ?>,Long> entry : vhs.entrySet())
+ {
+
+ long size = (long) (entry.getValue().doubleValue() * ((double) totalTarget / (double) totalSize));
+ entry.getKey().setTargetSize(size);
+ }
+ }
+
+ @Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ assignTargetSizes();
+ }
+
public AuthenticationProvider<?> findAuthenticationProviderByName(String authenticationProviderName)
{
if (isManagementMode())
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 03bba43d57..9cfa7dbcf3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -111,4 +111,8 @@ public interface AMQQueue<X extends AMQQueue<X>>
void completeRecovery();
void recover(ServerMessage<?> message);
+
+ void setTargetSize(long targetSize);
+
+ long getPotentialMemoryFootprint();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 11d5cc733f..c889fa7740 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -79,6 +79,7 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -113,6 +114,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
};
+ private static final long INITIAL_TARGET_QUEUE_SIZE = 102400l;
+
private final VirtualHostImpl _virtualHost;
private final DeletedChildListener _deletedChildListener = new DeletedChildListener();
@@ -130,6 +133,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
+ private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
+
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
private final AtomicLong _totalMessagesReceived = new AtomicLong();
@@ -924,6 +929,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
incrementQueueCount();
incrementQueueSize(message);
+ if((_atomicQueueSize.get() + _atomicQueueCount.get()*1024l) > _targetQueueSize.get() && message.getStoredMessage().isInMemory())
+ {
+ message.getStoredMessage().flowToDisk();
+ }
+
_totalMessagesReceived.incrementAndGet();
if(_recovering.get())
@@ -1206,6 +1216,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
+ @Override
+ public void setTargetSize(final long targetSize)
+ {
+ _targetQueueSize.set(targetSize);
+ }
+
public long getTotalDequeuedMessages()
{
return _dequeueCount.get();
@@ -2188,6 +2204,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
QueueEntryIterator queueListIterator = getEntries().iterator();
+ long totalSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages();
+ long targetSize = _targetQueueSize.get();
+
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
@@ -2210,8 +2229,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
// the time the check actually occurs. So verify we
// can actually get the message to perform the check.
ServerMessage msg = node.getMessage();
+
if (msg != null)
{
+ totalSize += msg.getSize();
+ StoredMessage storedMessage = msg.getStoredMessage();
+ if(totalSize > targetSize && storedMessage.isInMemory())
+ {
+ storedMessage.flowToDisk();
+ }
checkForNotification(msg);
}
}
@@ -2220,6 +2246,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
+ @Override
+ public long getPotentialMemoryFootprint()
+ {
+ return Math.max(getContextValue(Long.class,QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT),
+ getQueueDepthBytes() + getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages());
+ }
+
public long getAlertRepeatGap()
{
return _alertRepeatGap;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 1a1085339d..bb7a726a0c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -1576,6 +1576,31 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
storedSizeChange(-delta);
}
+ @Override
+ public boolean isInMemory()
+ {
+ return _messageDataRef.isHardRef();
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ try(Connection conn = newConnection())
+ {
+ store(conn);
+ conn.commit();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException(e);
+ }
+ finally
+ {
+
+ }
+ return true;
+ }
+
private synchronized Runnable store(final Connection conn) throws SQLException
{
if (!stored())
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index e1043e8807..e8402c9268 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -130,4 +130,16 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
public void remove()
{
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index 6beb74f4ae..7561b4a11c 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -35,4 +35,8 @@ public interface StoredMessage<M extends StorableMessageMetaData>
ByteBuffer getContent(int offsetInMessage, int size);
void remove();
+
+ boolean isInMemory();
+
+ boolean flowToDisk();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 707be9ed7b..f15f608907 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
@@ -130,6 +131,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private final AtomicBoolean _deleted = new AtomicBoolean();
private final VirtualHostNode<?> _virtualHostNode;
+ private final AtomicLong _targetSize = new AtomicLong(1024*1024);
+
private MessageStoreLogSubject _messageStoreLogSubject;
@ManagedAttributeField
@@ -847,6 +850,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
public void execute()
{
+ VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class);
+ Broker<?> broker = virtualHostNode.getParent(Broker.class);
+ broker.assignTargetSizes();
+
for (AMQQueue<?> q : getQueues())
{
if (q.getState() == State.ACTIVE)
@@ -1311,6 +1318,46 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
+ public void setTargetSize(final long targetSize)
+ {
+ _targetSize.set(targetSize);
+ allocateTargetSizeToQueues();
+ }
+
+ private void allocateTargetSizeToQueues()
+ {
+ long targetSize = _targetSize.get();
+ Collection<AMQQueue<?>> queues = getQueues();
+ long totalSize = calculateTotalEnqueuedSize(queues);
+ if(targetSize > 0l)
+ {
+ for (AMQQueue<?> q : queues)
+ {
+ long size = (long) ((((double) q.getPotentialMemoryFootprint() / (double) totalSize))
+ * (double) targetSize);
+
+ q.setTargetSize(size);
+ }
+ }
+ }
+
+ @Override
+ public long getTotalQueueDepthBytes()
+ {
+ return calculateTotalEnqueuedSize(getQueues());
+ }
+
+ private long calculateTotalEnqueuedSize(final Collection<AMQQueue<?>> queues)
+ {
+ long total = 0;
+ for(AMQQueue<?> queue : queues)
+ {
+ total += queue.getPotentialMemoryFootprint();
+ }
+ return total;
+ }
+
+ @Override
protected void onCreate()
{
super.onCreate();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index 750efc23ae..899cfdcd6e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -147,6 +149,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
entry.getValue().release();
entry.setValue(null); // free up any memory associated with the reference object
}
+ final List<StoredMessage<?>> messagesToDelete = new ArrayList<>();
getStore().visitMessages(new MessageHandler()
{
@Override
@@ -156,12 +159,19 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
long messageNumber = storedMessage.getMessageNumber();
if(!_recoveredMessages.containsKey(messageNumber))
{
- _logger.info("Message id " + messageNumber + " in store, but not in any queue - removing....");
- storedMessage.remove();
+ messagesToDelete.add(storedMessage);
}
return messageNumber <_maxMessageId-1;
}
});
+ for(StoredMessage<?> storedMessage : messagesToDelete)
+ {
+
+ _logger.info("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing....");
+ storedMessage.remove();
+ }
+
+ messagesToDelete.clear();
_recoveredMessages.clear();
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index dfdc4e230c..69abcd7727 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.DeliveryProperties;
@@ -106,7 +105,19 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
{
throw new UnsupportedOperationException();
}
- };
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
+ };
}
private MessageMetaData_0_10 convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index ad99d14170..209eae9ad1 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -31,7 +31,6 @@ import java.util.Map;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -106,7 +105,19 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
{
throw new UnsupportedOperationException();
}
- };
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
+ };
}
private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index b7d7e5b236..0f383c5ff0 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -37,7 +37,6 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -118,6 +117,18 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
index c4de7a252b..9689976e6f 100755
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
@@ -20,15 +20,14 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import java.nio.ByteBuffer;
-
public class MockStoredMessage implements StoredMessage<MessageMetaData>
{
private long _messageId;
@@ -107,4 +106,16 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
public void remove()
{
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index f6d849bf79..5b9bdc7244 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -37,7 +37,6 @@ import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
import org.apache.qpid.amqp_1_0.type.messaging.Data;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -265,7 +264,19 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
{
throw new UnsupportedOperationException();
}
- };
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
+ };
}
protected Section getBodySection(final M serverMessage, final String mimeType)
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
index bc2d3fe375..8d77a8cfaf 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -29,7 +29,6 @@ import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0;
import org.apache.qpid.server.protocol.v1_0.Message_1_0;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.DeliveryProperties;
@@ -115,6 +114,18 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 5412a09b4c..d3c1dae29b 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.qpid.AMQPInvalidClassException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -36,7 +37,6 @@ import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.DeliveryProperties;
@@ -214,6 +214,18 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index 46b7c322e6..fbc809305e 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
+
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
@@ -32,7 +33,6 @@ import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.DeliveryProperties;
@@ -103,6 +103,18 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index 639c16a71a..5b1c25e879 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -36,7 +36,6 @@ import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0;
import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
import org.apache.qpid.server.protocol.v1_0.Message_1_0;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -119,6 +118,18 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}