diff options
Diffstat (limited to 'qpid/java')
23 files changed, 348 insertions, 24 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 7e5f5bbb3f..338882e6df 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -1424,12 +1424,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore storedSizeChangeOccurred(-delta); } + @Override + public boolean isInMemory() + { + return _messageDataRef.isHardRef(); + } + private boolean stored() { return !_messageDataRef.isHardRef(); } @Override + public boolean flowToDisk() + { + flushToStore(); + return true; + } + + @Override public String toString() { return this.getClass() + "[messageId=" + _messageId + "]"; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java index 74242df7c6..55805b5626 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java @@ -338,6 +338,18 @@ public class BDBHAReplicaVirtualHost extends AbstractConfiguredObject<BDBHARepli } @Override + public void setTargetSize(final long targetSize) + { + + } + + @Override + public long getTotalQueueDepthBytes() + { + return 0l; + } + + @Override public org.apache.qpid.server.security.SecurityManager getSecurityManager() { return null; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java index 4f2327adee..c3fd0ba428 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java @@ -20,14 +20,6 @@ */ package org.apache.qpid.server.message.internal; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.AbstractServerMessageImpl; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.util.ByteBufferInputStream; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; @@ -39,6 +31,13 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.AbstractServerMessageImpl; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.util.ByteBufferInputStream; + public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, InternalMessageMetaData> { private final Object _messageBody; @@ -239,6 +238,18 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, { throw new UnsupportedOperationException(); } + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } }; } catch (IOException e) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 8c8c5d4b9d..7d49d0b85f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -56,6 +56,7 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay"; String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute"; + String BROKER_FLOW_TO_DISK_THRESHOLD = "broker.flowToDiskThreshold"; String QPID_AMQP_PORT = "qpid.amqp_port"; String QPID_HTTP_PORT = "qpid.http_port"; @@ -74,6 +75,9 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL @ManagedContextDefault(name = QPID_JMX_PORT) public static final String DEFAULT_JMX_PORT_NUMBER = "9099"; + @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD) + public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory()); + @DerivedAttribute String getBuildVersion(); @@ -190,4 +194,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL AuthenticationProvider<?> getManagementModeAuthenticationProvider(); + void assignTargetSizes(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index e98f6cd19b..4e216925e4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -49,6 +49,14 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl"; String MINIMUM_MESSAGE_TTL = "minimumMessageTtl"; + String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint"; + @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT) + long DEFAULT_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = 102400l; + + String QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = "queue.estimatedMessageMemoryOverhead"; + @ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) + long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l; + @ManagedAttribute Exchange getAlternateExchange(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 518141a21d..6714a53e9e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -172,4 +172,9 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, MessageStore getMessageStore(); String getType(); + + void setTargetSize(long targetSize); + + long getTotalQueueDepthBytes(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 2d8a64b920..67c713e9d9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -384,6 +384,39 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple return children; } + @Override + public synchronized void assignTargetSizes() + { + long totalTarget = getContextValue(Long.class,BROKER_FLOW_TO_DISK_THRESHOLD); + long totalSize = 0l; + Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes(); + Map<VirtualHost<?,?,?>,Long> vhs = new HashMap<>(); + for(VirtualHostNode<?> vhn : vhns) + { + VirtualHost<?, ?, ?> vh = vhn.getVirtualHost(); + if(vh != null) + { + long totalQueueDepthBytes = vh.getTotalQueueDepthBytes(); + vhs.put(vh,totalQueueDepthBytes); + totalSize += totalQueueDepthBytes; + } + } + + for(Map.Entry<VirtualHost<?, ?, ?>,Long> entry : vhs.entrySet()) + { + + long size = (long) (entry.getValue().doubleValue() * ((double) totalTarget / (double) totalSize)); + entry.getKey().setTargetSize(size); + } + } + + @Override + protected void onOpen() + { + super.onOpen(); + assignTargetSizes(); + } + public AuthenticationProvider<?> findAuthenticationProviderByName(String authenticationProviderName) { if (isManagementMode()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 03bba43d57..9cfa7dbcf3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -111,4 +111,8 @@ public interface AMQQueue<X extends AMQQueue<X>> void completeRecovery(); void recover(ServerMessage<?> message); + + void setTargetSize(long targetSize); + + long getPotentialMemoryFootprint(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 11d5cc733f..c889fa7740 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -79,6 +79,7 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -113,6 +114,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } }; + private static final long INITIAL_TARGET_QUEUE_SIZE = 102400l; + private final VirtualHostImpl _virtualHost; private final DeletedChildListener _deletedChildListener = new DeletedChildListener(); @@ -130,6 +133,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private final AtomicLong _atomicQueueSize = new AtomicLong(0L); + private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE); + private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); private final AtomicLong _totalMessagesReceived = new AtomicLong(); @@ -924,6 +929,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> incrementQueueCount(); incrementQueueSize(message); + if((_atomicQueueSize.get() + _atomicQueueCount.get()*1024l) > _targetQueueSize.get() && message.getStoredMessage().isInMemory()) + { + message.getStoredMessage().flowToDisk(); + } + _totalMessagesReceived.incrementAndGet(); if(_recovering.get()) @@ -1206,6 +1216,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } + @Override + public void setTargetSize(final long targetSize) + { + _targetQueueSize.set(targetSize); + } + public long getTotalDequeuedMessages() { return _dequeueCount.get(); @@ -2188,6 +2204,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { QueueEntryIterator queueListIterator = getEntries().iterator(); + long totalSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages(); + long targetSize = _targetQueueSize.get(); + while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); @@ -2210,8 +2229,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> // the time the check actually occurs. So verify we // can actually get the message to perform the check. ServerMessage msg = node.getMessage(); + if (msg != null) { + totalSize += msg.getSize(); + StoredMessage storedMessage = msg.getStoredMessage(); + if(totalSize > targetSize && storedMessage.isInMemory()) + { + storedMessage.flowToDisk(); + } checkForNotification(msg); } } @@ -2220,6 +2246,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } + @Override + public long getPotentialMemoryFootprint() + { + return Math.max(getContextValue(Long.class,QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT), + getQueueDepthBytes() + getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages()); + } + public long getAlertRepeatGap() { return _alertRepeatGap; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 1a1085339d..bb7a726a0c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -1576,6 +1576,31 @@ public abstract class AbstractJDBCMessageStore implements MessageStore storedSizeChange(-delta); } + @Override + public boolean isInMemory() + { + return _messageDataRef.isHardRef(); + } + + @Override + public boolean flowToDisk() + { + try(Connection conn = newConnection()) + { + store(conn); + conn.commit(); + } + catch (SQLException e) + { + throw new StoreException(e); + } + finally + { + + } + return true; + } + private synchronized Runnable store(final Connection conn) throws SQLException { if (!stored()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index e1043e8807..e8402c9268 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -130,4 +130,16 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S public void remove() { } + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java index 6beb74f4ae..7561b4a11c 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java @@ -35,4 +35,8 @@ public interface StoredMessage<M extends StorableMessageMetaData> ByteBuffer getContent(int offsetInMessage, int size); void remove(); + + boolean isInMemory(); + + boolean flowToDisk(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 707be9ed7b..f15f608907 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -36,6 +36,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; @@ -130,6 +131,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private final AtomicBoolean _deleted = new AtomicBoolean(); private final VirtualHostNode<?> _virtualHostNode; + private final AtomicLong _targetSize = new AtomicLong(1024*1024); + private MessageStoreLogSubject _messageStoreLogSubject; @ManagedAttributeField @@ -847,6 +850,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte public void execute() { + VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class); + Broker<?> broker = virtualHostNode.getParent(Broker.class); + broker.assignTargetSizes(); + for (AMQQueue<?> q : getQueues()) { if (q.getState() == State.ACTIVE) @@ -1311,6 +1318,46 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @Override + public void setTargetSize(final long targetSize) + { + _targetSize.set(targetSize); + allocateTargetSizeToQueues(); + } + + private void allocateTargetSizeToQueues() + { + long targetSize = _targetSize.get(); + Collection<AMQQueue<?>> queues = getQueues(); + long totalSize = calculateTotalEnqueuedSize(queues); + if(targetSize > 0l) + { + for (AMQQueue<?> q : queues) + { + long size = (long) ((((double) q.getPotentialMemoryFootprint() / (double) totalSize)) + * (double) targetSize); + + q.setTargetSize(size); + } + } + } + + @Override + public long getTotalQueueDepthBytes() + { + return calculateTotalEnqueuedSize(getQueues()); + } + + private long calculateTotalEnqueuedSize(final Collection<AMQQueue<?>> queues) + { + long total = 0; + for(AMQQueue<?> queue : queues) + { + total += queue.getPotentialMemoryFootprint(); + } + return total; + } + + @Override protected void onCreate() { super.onCreate(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java index 750efc23ae..899cfdcd6e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -147,6 +149,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer entry.getValue().release(); entry.setValue(null); // free up any memory associated with the reference object } + final List<StoredMessage<?>> messagesToDelete = new ArrayList<>(); getStore().visitMessages(new MessageHandler() { @Override @@ -156,12 +159,19 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer long messageNumber = storedMessage.getMessageNumber(); if(!_recoveredMessages.containsKey(messageNumber)) { - _logger.info("Message id " + messageNumber + " in store, but not in any queue - removing...."); - storedMessage.remove(); + messagesToDelete.add(storedMessage); } return messageNumber <_maxMessageId-1; } }); + for(StoredMessage<?> storedMessage : messagesToDelete) + { + + _logger.info("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing...."); + storedMessage.remove(); + } + + messagesToDelete.clear(); _recoveredMessages.clear(); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java index dfdc4e230c..69abcd7727 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -25,7 +25,6 @@ import java.nio.ByteBuffer; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.DeliveryProperties; @@ -106,7 +105,19 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte { throw new UnsupportedOperationException(); } - }; + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } + }; } private MessageMetaData_0_10 convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java index ad99d14170..209eae9ad1 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -31,7 +31,6 @@ import java.util.Map; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -106,7 +105,19 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M { throw new UnsupportedOperationException(); } - }; + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } + }; } private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java index b7d7e5b236..0f383c5ff0 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java @@ -37,7 +37,6 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -118,6 +117,18 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter { throw new UnsupportedOperationException(); } + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } }; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java index c4de7a252b..9689976e6f 100755 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java @@ -20,15 +20,14 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.nio.ByteBuffer; + import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; -import java.nio.ByteBuffer; - public class MockStoredMessage implements StoredMessage<MessageMetaData> { private long _messageId; @@ -107,4 +106,16 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> public void remove() { } + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index f6d849bf79..5b9bdc7244 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -37,7 +37,6 @@ import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; import org.apache.qpid.amqp_1_0.type.messaging.Data; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -265,7 +264,19 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement { throw new UnsupportedOperationException(); } - }; + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } + }; } protected Section getBodySection(final M serverMessage, final String mimeType) diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index bc2d3fe375..8d77a8cfaf 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -29,7 +29,6 @@ import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0; import org.apache.qpid.server.protocol.v1_0.Message_1_0; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.DeliveryProperties; @@ -115,6 +114,18 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 { throw new UnsupportedOperationException(); } + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } }; } diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java index 5412a09b4c..d3c1dae29b 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.converter.v0_8_v0_10; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; + import org.apache.qpid.AMQPInvalidClassException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -36,7 +37,6 @@ import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.DeliveryProperties; @@ -214,6 +214,18 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra { throw new UnsupportedOperationException(); } + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } }; } diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java index 46b7c322e6..fbc809305e 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java @@ -24,6 +24,7 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.Map; import java.util.UUID; + import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; @@ -32,7 +33,6 @@ import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.DeliveryProperties; @@ -103,6 +103,18 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag { throw new UnsupportedOperationException(); } + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } }; } diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java index 639c16a71a..5b1c25e879 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java @@ -36,7 +36,6 @@ import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0; import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; import org.apache.qpid.server.protocol.v1_0.Message_1_0; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -119,6 +118,18 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ { throw new UnsupportedOperationException(); } + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } }; } |
