diff options
Diffstat (limited to 'java')
20 files changed, 643 insertions, 292 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java index 7acb683e3b..4e3b2298d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java @@ -22,19 +22,12 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.AMQProtocolVersionException; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.util.FileUtils; -import org.apache.qpid.AMQException; -import org.apache.commons.configuration.ConfigurationException; import java.io.File; import java.io.FileInputStream; @@ -43,219 +36,142 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; public class FileQueueBackingStore implements QueueBackingStore { private static final Logger _log = Logger.getLogger(FileQueueBackingStore.class); - private AtomicBoolean _closed = new AtomicBoolean(false); private String _flowToDiskLocation; - private static final String QUEUE_BACKING_DIR = "queueBacking"; - public void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException + public FileQueueBackingStore(String location) { - setFlowToDisk(virtualHost.getName(), config.getFlowToDiskLocation()); + _flowToDiskLocation = location; } - private void setFlowToDisk(String vHostName, String location) throws ConfigurationException + public AMQMessage load(Long messageId) { - if (vHostName == null) - { - throw new ConfigurationException("Unable to setup to Flow to Disk as Virtualhost name was not specified"); - } + _log.info("Loading Message (ID:" + messageId + ")"); - if (location == null) - { - throw new ConfigurationException("Unable to setup to Flow to Disk as location was not specified."); - } + MessageMetaData mmd; - _flowToDiskLocation = location; + File handle = getFileHandle(messageId); + handle.deleteOnExit(); - _flowToDiskLocation += File.separator + QUEUE_BACKING_DIR + File.separator + vHostName; + ObjectInputStream input = null; - File root = new File(location); - if (!root.exists()) - { - throw new ConfigurationException("Specified Flow to Disk root does not exist:" + root.getAbsolutePath()); - } - else + Exception error = null; + try { + input = new ObjectInputStream(new FileInputStream(handle)); - if (root.isFile()) - { - throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified root is a file:"+ - root.getAbsolutePath()); - } + long arrivaltime = input.readLong(); + + final AMQShortString exchange = new AMQShortString(input.readUTF()); + final AMQShortString routingKey = new AMQShortString(input.readUTF()); + final boolean mandatory = input.readBoolean(); + final boolean immediate = input.readBoolean(); + + int bodySize = input.readInt(); + byte[] underlying = new byte[bodySize]; + + input.readFully(underlying, 0, bodySize); + + ByteBuffer buf = ByteBuffer.wrap(underlying); - if(!root.canWrite()) + ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, bodySize); + + int chunkCount = input.readInt(); + + // There are WAY to many annonymous MPIs in the code this should be made concrete. + MessagePublishInfo info = new MessagePublishInfo() { - throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to specified root:"+ - root.getAbsolutePath()); - } - } + public AMQShortString getExchange() + { + return exchange; + } + public void setExchange(AMQShortString exchange) + { - File store = new File(_flowToDiskLocation); - if (store.exists()) - { - if (!FileUtils.delete(store, true)) + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return mandatory; + } + + public AMQShortString getRoutingKey() + { + return routingKey; + } + }; + + mmd = new MessageMetaData(info, chb, chunkCount); + mmd.setArrivalTime(arrivaltime); + + AMQMessage message; + if (((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2) + { + message = new PersistentAMQMessage(messageId, null); + } + else { - throw new ConfigurationException("Unable to create Temporary Flow to Disk store as directory already exsits:" - + store.getAbsolutePath()); + message = new TransientAMQMessage(messageId); } - if (store.isFile()) + message.recoverFromMessageMetaData(mmd); + + for (int chunk = 0; chunk < chunkCount; chunk++) { - throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified location is a file:"+ - store.getAbsolutePath()); + int length = input.readInt(); + + byte[] data = new byte[length]; + + input.readFully(data, 0, length); + + try + { + message.recoverContentBodyFrame(new RecoverDataBuffer(length, data), (chunk + 1 == chunkCount)); + } + catch (AMQException e) + { + //ignore as this will not occur. + // It is thrown by the _transactionLog method in load on PersistentAMQMessage + // but we have created the message with a null log and will never call that method. + } } + return message; } - else + catch (Exception e) { - if (!store.getParentFile().getParentFile().canWrite()) - { - throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to parent location:"+ - store.getParentFile().getParentFile().getAbsolutePath()); - } + error = e; } - - - _log.info("Creating Flow to Disk Store : " + store.getAbsolutePath()); - store.deleteOnExit(); - if (!store.mkdirs()) + finally { - throw new ConfigurationException("Unable to create Temporary Flow to Disk store:" + store.getAbsolutePath()); + try + { + input.close(); + // We can purge the message here then reflow it if required but I believe it to be cleaner to leave it + // on disk until it has been deleted from the queue at that point we can be sure we won't need the data + //handle.delete(); + } + catch (IOException e) + { + _log.info("Unable to close input on message(" + messageId + ") recovery due to:" + e.getMessage()); + } } - } - - - public AMQMessage recover(Long messageId) - { - MessageMetaData mmd; - List<ContentChunk> contentBodies = new LinkedList<ContentChunk>(); - - File handle = getFileHandle(messageId); - handle.deleteOnExit(); - - ObjectInputStream input = null; - - Exception error = null; - try - { - input = new ObjectInputStream(new FileInputStream(handle)); - - long arrivaltime = input.readLong(); - - final AMQShortString exchange = new AMQShortString(input.readUTF()); - final AMQShortString routingKey = new AMQShortString(input.readUTF()); - final boolean mandatory = input.readBoolean(); - final boolean immediate = input.readBoolean(); - - int bodySize = input.readInt(); - byte[] underlying = new byte[bodySize]; - - input.readFully(underlying, 0, bodySize); - - ByteBuffer buf = ByteBuffer.wrap(underlying); - - ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, bodySize); - - int chunkCount = input.readInt(); - - // There are WAY to many annonymous MPIs in the code this should be made concrete. - MessagePublishInfo info = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return exchange; - } - - public void setExchange(AMQShortString exchange) - { - - } - - public boolean isImmediate() - { - return immediate; - } - - public boolean isMandatory() - { - return mandatory; - } - - public AMQShortString getRoutingKey() - { - return routingKey; - } - }; - - mmd = new MessageMetaData(info, chb, chunkCount); - mmd.setArrivalTime(arrivaltime); - - AMQMessage message; - if (((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2) - { - message = new PersistentAMQMessage(messageId, null); - } - else - { - message = new TransientAMQMessage(messageId); - } - - message.recoverFromMessageMetaData(mmd); - - for (int chunk = 0; chunk < chunkCount; chunk++) - { - int length = input.readInt(); - - byte[] data = new byte[length]; - - input.readFully(data, 0, length); - - // There are WAY to many annonymous CCs in the code this should be made concrete. - try - { - message.recoverContentBodyFrame(new RecoverDataBuffer(length, data), (chunk + 1 == chunkCount)); - } - catch (AMQException e) - { - //ignore as this will not occur. - // It is thrown by the _transactionLog method in recover on PersistentAMQMessage - // but we have created the message with a null log and will never call that method. - } - } - - return message; - } - catch (Exception e) - { - error = e; - } - finally - { - try - { - input.close(); - } - catch (IOException e) - { - _log.info("Unable to close input on message("+messageId+") recovery due to:"+e.getMessage()); - } - } throw new UnableToRecoverMessageException(error); } - - public void flow(AMQMessage message) throws UnableToFlowMessageException + public void unload(AMQMessage message) throws UnableToFlowMessageException { long messageId = message.getMessageId(); @@ -264,10 +180,18 @@ public class FileQueueBackingStore implements QueueBackingStore //If we have written the data once then we don't need to do it again. if (handle.exists()) { - _log.debug("Message(" + messageId + ") already flowed to disk."); + if (_log.isDebugEnabled()) + { + _log.debug("Message(ID:" + messageId + ") already unloaded."); + } return; } + if (_log.isInfoEnabled()) + { + _log.info("Unloading Message (ID:" + messageId + ")"); + } + handle.deleteOnExit(); ObjectOutputStream writer = null; @@ -334,7 +258,7 @@ public class FileQueueBackingStore implements QueueBackingStore if (error != null) { - _log.error("Unable to flow message(" + messageId + ") to disk, restoring state."); + _log.error("Unable to unload message(" + messageId + ") to disk, restoring state."); handle.delete(); throw new UnableToFlowMessageException(messageId, error); } @@ -358,7 +282,7 @@ public class FileQueueBackingStore implements QueueBackingStore // grab the 8 LSB to give us 256 bins long bin = messageId & 0xFFL; - String bin_path =_flowToDiskLocation + File.separator + bin; + String bin_path = _flowToDiskLocation + File.separator + bin; File bin_dir = new File(bin_path); if (!bin_dir.exists()) @@ -379,7 +303,10 @@ public class FileQueueBackingStore implements QueueBackingStore if (handle.exists()) { - _log.debug("Message(" + messageId + ") delete flowToDisk."); + if (_log.isInfoEnabled()) + { + _log.info("Message(" + messageId + ") delete flowToDisk."); + } if (!handle.delete()) { throw new RuntimeException("Unable to delete flowToDisk data"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java new file mode 100644 index 0000000000..0cfa9d6b32 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java @@ -0,0 +1,166 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.util.FileUtils; + +import java.io.File; + +public class FileQueueBackingStoreFactory implements QueueBackingStoreFactory +{ + private static final Logger _log = Logger.getLogger(FileQueueBackingStoreFactory.class); + + private String _flowToDiskLocation; + private static final String QUEUE_BACKING_DIR = "queueBacking"; + + public void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException + { + setFlowToDisk(virtualHost.getName(), config.getFlowToDiskLocation()); + } + + private void setFlowToDisk(String vHostName, String location) throws ConfigurationException + { + if (vHostName == null) + { + throw new ConfigurationException("Unable to setup to Flow to Disk as Virtualhost name was not specified"); + } + + if (location == null) + { + throw new ConfigurationException("Unable to setup to Flow to Disk as location was not specified."); + } + + _flowToDiskLocation = location; + + _flowToDiskLocation += File.separator + QUEUE_BACKING_DIR + File.separator + vHostName; + + File root = new File(location); + if (!root.exists()) + { + throw new ConfigurationException("Specified Flow to Disk root does not exist:" + root.getAbsolutePath()); + } + else + { + + if (root.isFile()) + { + throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified root is a file:" + + root.getAbsolutePath()); + } + + if (!root.canWrite()) + { + throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to specified root:" + + root.getAbsolutePath()); + } + + } + + // if we don't mark QUEUE_BAKCING_DIR as a deleteOnExit it will remain. + File backingDir = new File(location + File.separator + QUEUE_BACKING_DIR); + if (backingDir.exists()) + { + if (!FileUtils.delete(backingDir, true)) + { + throw new ConfigurationException("Unable to delete existing Flow to Disk root at:" + + backingDir.getAbsolutePath()); + } + + if (backingDir.isFile()) + { + throw new ConfigurationException("Unable to create Temporary Flow to Disk root as specified location is a file:" + + backingDir.getAbsolutePath()); + } + } + + backingDir.deleteOnExit(); + if (!backingDir.mkdirs()) + { + throw new ConfigurationException("Unable to create Temporary Flow to Disk root:" + location + File.separator + QUEUE_BACKING_DIR); + } + + + File store = new File(_flowToDiskLocation); + if (store.exists()) + { + if (!FileUtils.delete(store, true)) + { + throw new ConfigurationException("Unable to delete existing Flow to Disk store at:" + + store.getAbsolutePath()); + } + + if (store.isFile()) + { + throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified location is a file:" + + store.getAbsolutePath()); + } + + } + + _log.info("Creating Flow to Disk Store : " + store.getAbsolutePath()); + store.deleteOnExit(); + + if(!store.mkdir()) + { + throw new ConfigurationException("Unable to create Temporary Flow to Disk store:" + store.getAbsolutePath()); + } + } + + public QueueBackingStore createBacking(AMQQueue queue) + { + return new FileQueueBackingStore(createStore(queue.getName().toString())); + } + + private String createStore(String name) + { + return createStore(name, 0); + } + + private String createStore(String name, int index) + { + + String store = _flowToDiskLocation + File.separator + name; + if (index > 0) + { + store += "-" + index; + } + + //TODO ensure store is safe for the OS + + File storeFile = new File(store); + + if (storeFile.exists()) + { + return createStore(name, index + 1); + } + + storeFile.mkdirs(); + + storeFile.deleteOnExit(); + + return store; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java index 72ea5f2667..a4f80a44b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java @@ -20,16 +20,18 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.log4j.Logger; +import org.apache.qpid.pool.ReferenceCountingExecutorService; +import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; -/** - * This is an abstract base class to handle - */ +/** This is an abstract base class to handle */ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryList { private static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class); @@ -43,9 +45,12 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */ private long _memoryUsageMinimum = 0; - private AtomicBoolean _flowed; + private volatile AtomicBoolean _flowed; private QueueBackingStore _backingStore; protected AMQQueue _queue; + private Executor _inhaler; + private AtomicBoolean _stopped; + private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null); FlowableBaseQueueEntryList(AMQQueue queue) { @@ -54,15 +59,18 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi VirtualHost vhost = queue.getVirtualHost(); if (vhost != null) { - _backingStore = vhost.getQueueBackingStore(); + _backingStore = vhost.getQueueBackingStoreFactory().createBacking(queue); } + + _stopped = new AtomicBoolean(false); + _inhaler = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); } public void setFlowed(boolean flowed) { if (_flowed.get() != flowed) { - _log.info("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")"); + _log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")"); _flowed.set(flowed); } } @@ -94,14 +102,15 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi // Don't attempt to start the inhaler/purger unless we have a minimum value specified. if (_memoryUsageMaximum > 0) { - // If we've increased the max memory above what we have in memory then we can inhale more - if (_memoryUsageMaximum > _atomicQueueInMemory.get()) + if (_memoryUsageMinimum == 0) { - //TODO start inhaler + setMemoryUsageMinimum(_memoryUsageMaximum / 2); } - else // if we have now have to much memory in use we need to purge. + + // if we have now have to much memory in use we need to purge. + if (_memoryUsageMaximum < _atomicQueueInMemory.get()) { - //TODO start purger + startPurger(); } } } @@ -118,19 +127,78 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi // Don't attempt to start the inhaler unless we have a minimum value specified. if (_memoryUsageMinimum > 0) { - // If we've increased the minimum memory above what we have in memory then we need to inhale more - if (_memoryUsageMinimum >= _atomicQueueInMemory.get()) + checkAndStartLoader(); + } + } + + private void checkAndStartLoader() + { + // If we've increased the minimum memory above what we have in memory then we need to inhale more + if (_atomicQueueInMemory.get() <= _memoryUsageMinimum) + { + startInhaler(); + } + } + + private void startInhaler() + { + if (_flowed.get()) + { + MessageInhaler inhaler = new MessageInhaler(); + + if (_asynchronousInhaler.compareAndSet(null, inhaler)) { - //TODO start inhaler + _inhaler.execute(inhaler); } } } + private void startPurger() + { + //TODO create purger, used when maxMemory is reduced creating over memory situation. + _log.warn("Requested Purger Start.. purger TBC."); + //_purger.execute(new MessagePurger(this)); + } + public long getMemoryUsageMinimum() { return _memoryUsageMinimum; } + public void unloadEntry(QueueEntry queueEntry) + { + try + { + queueEntry.unload(); + _atomicQueueInMemory.addAndGet(-queueEntry.getSize()); + checkAndStartLoader(); + } + catch (UnableToFlowMessageException e) + { + _atomicQueueInMemory.addAndGet(queueEntry.getSize()); + } + } + + public void loadEntry(QueueEntry queueEntry) + { + queueEntry.load(); + if( _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) + { + _log.error("Loaded to much data!:"+_atomicQueueInMemory.get()+"/"+_memoryUsageMaximum); + } + } + + public void stop() + { + if (!_stopped.getAndSet(true)) + { + // The SimpleAMQQueue keeps running when stopped so we should just release the services + // rather than actively shutdown our threads. + //Shutdown thread for inhaler. + ReferenceCountingExecutorService.getInstance().releaseExecutorService(); + } + } + protected boolean willCauseFlowToDisk(QueueEntryImpl queueEntry) { return _memoryUsageMaximum != 0 && memoryUsed() + queueEntry.getSize() > _memoryUsageMaximum; @@ -153,13 +221,14 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi /** * Called when we are now flowing to disk + * * @param queueEntry the entry that is being flowed to disk */ protected void flowingToDisk(QueueEntryImpl queueEntry) { try { - queueEntry.flow(); + queueEntry.unload(); } catch (UnableToFlowMessageException e) { @@ -182,4 +251,71 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi return _backingStore; } + private class MessageInhaler implements Runnable + { + public void run() + { + String threadName = Thread.currentThread().getName(); + Thread.currentThread().setName("Inhaler-" + _queue.getVirtualHost().getName() + "-" + _queue.getName()); + try + { + inhaleList(this); + } + finally + { + Thread.currentThread().setName(threadName); + } + } + } + + private void inhaleList(MessageInhaler messageInhaler) + { + _log.info("Inhaler Running"); + // If in memory count is at or over max then we can't inhale + if (_atomicQueueInMemory.get() >= _memoryUsageMaximum) + { + _log.debug("Unable to start inhaling as we are already over quota:" + + _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum); + return; + } + + _asynchronousInhaler.compareAndSet(messageInhaler, null); + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && _asynchronousInhaler.compareAndSet(null, messageInhaler)) + { + QueueEntryIterator iterator = iterator(); + + while (!iterator.getNode().isAvailable() && iterator.advance()) + { + //Find first AVAILABLE node + } + + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && !iterator.atTail()) + { + QueueEntry entry = iterator.getNode(); + + if (entry.isAvailable() && entry.isFlowed()) + { + loadEntry(entry); + } + + iterator.advance(); + } + + if (iterator.atTail()) + { + setFlowed(false); + } + + _asynchronousInhaler.set(null); + } + + //If we have become flowed or have more capacity since we stopped then schedule the thread to run again. + if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum) + { + _inhaler.execute(messageInhaler); + + } + + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java index 4e95978bf8..b547a41047 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.queue; -public interface FlowableQueueEntryList +import java.util.concurrent.atomic.AtomicLong; + +public interface FlowableQueueEntryList extends QueueEntryList { void setFlowed(boolean flowed); @@ -38,5 +40,19 @@ public interface FlowableQueueEntryList void setMemoryUsageMinimum(long minimumMemoryUsage); - long getMemoryUsageMinimum(); + long getMemoryUsageMinimum(); + + /** + * Immediately unload Entry + * @param queueEntry the entry to unload + */ + public void unloadEntry(QueueEntry queueEntry); + + /** + * Immediately load Entry + * @param queueEntry the entry to load + */ + public void loadEntry(QueueEntry queueEntry); + + void stop(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java index d812b8ceca..23307d8acf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java @@ -22,10 +22,10 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.CommonContentHeaderProperties; -public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList +public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList { private final AMQQueue _queue; - private final QueueEntryList[] _priorityLists; + private final FlowableQueueEntryList[] _priorityLists; private final int _priorities; private final int _priorityOffset; @@ -33,7 +33,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement { super(queue); _queue = queue; - _priorityLists = new QueueEntryList[priorities]; + _priorityLists = new FlowableQueueEntryList[priorities]; _priorities = priorities; _priorityOffset = 5-((priorities + 1)/2); for(int i = 0; i < priorities; i++) @@ -53,7 +53,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement } public QueueEntry add(AMQMessage message) - { + { int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; if(index >= _priorities) { @@ -152,7 +152,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement _priorities = priorities; } - public QueueEntryList createQueueEntryList(AMQQueue queue) + public FlowableQueueEntryList createQueueEntryList(AMQQueue queue) { return new PriorityQueueEntryList(queue, _priorities); } @@ -162,7 +162,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement public int size() { int size=0; - for (QueueEntryList queueEntryList : _priorityLists) + for (FlowableQueueEntryList queueEntryList : _priorityLists) { size += queueEntryList.size(); } @@ -174,9 +174,6 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement @Override protected void flowingToDisk(QueueEntryImpl queueEntry) { - //TODO this disables FTD for priority queues - // As the incomming message isn't always the one to purge. - // More logic is required up in the add() method here to determine if the - // incomming message is at the 'front' or not. + //This class doesn't maintain it's own sizes it delegates to the sub FlowableQueueEntryLists } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java index 376e6f1b57..1f575d1e05 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java @@ -26,11 +26,9 @@ import org.apache.commons.configuration.ConfigurationException; public interface QueueBackingStore { - void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException; + AMQMessage load(Long messageId); - AMQMessage recover(Long messageId); - - void flow(AMQMessage message) throws UnableToFlowMessageException; + void unload(AMQMessage message) throws UnableToFlowMessageException; void delete(Long messageId); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java new file mode 100644 index 0000000000..3dd23a2f40 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.commons.configuration.ConfigurationException; + +public interface QueueBackingStoreFactory +{ + void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException; + + public QueueBackingStore createBacking(AMQQueue queue); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 25d41c8203..7e41cf53a2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -157,6 +157,8 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept boolean isAcquired(); + boolean isAvailable(); + boolean acquire(); boolean acquire(Subscription sub); @@ -219,9 +221,9 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept boolean removeStateChangeListener(StateChangeListener listener); - void flow() throws UnableToFlowMessageException; + void unload() throws UnableToFlowMessageException; - void recover(); + void load(); boolean isFlowed(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 3d464d01d3..8ee03d3d74 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -181,6 +181,11 @@ public class QueueEntryImpl implements QueueEntry return _state.getState() == State.ACQUIRED; } + public boolean isAvailable() + { + return _state.getState() == State.AVAILABLE; + } + public boolean acquire() { return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE); @@ -220,7 +225,15 @@ public class QueueEntryImpl implements QueueEntry public String debugIdentity() { - return getMessage().debugIdentity(); + String entry="[State:"+_state.getState().name()+"]"; + if (_message == null) + { + return entry+"(Message Unloaded ID:" + _messageId +")"; + } + else + { + return entry+_message.debugIdentity(); + } } @@ -380,25 +393,29 @@ public class QueueEntryImpl implements QueueEntry return false; } - public void flow() throws UnableToFlowMessageException + public void unload() throws UnableToFlowMessageException { if (_message != null && _backingStore != null) { if(_log.isDebugEnabled()) { - _log.debug("Flowing message:" + _message.debugIdentity()); + _log.debug("Unloading:" + debugIdentity()); } - _backingStore.flow(_message); + _backingStore.unload(_message); _message = null; - _flowed.getAndSet(true); + _flowed.getAndSet(true); } } - public void recover() + public void load() { if (_messageId != null && _backingStore != null) { - _message = _backingStore.recover(_messageId); + _message = _backingStore.load(_messageId); + if(_log.isDebugEnabled()) + { + _log.debug("Loading:" + debugIdentity()); + } _flowed.getAndSet(false); } } @@ -471,5 +488,4 @@ public class QueueEntryImpl implements QueueEntry return _queueEntryList; } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 72783e3f78..313e076f61 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -public interface QueueEntryList extends FlowableQueueEntryList +public interface QueueEntryList { AMQQueue getQueue(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java index 4dbce45f67..b4a868cf3c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java @@ -22,5 +22,5 @@ package org.apache.qpid.server.queue; interface QueueEntryListFactory { - public QueueEntryList createQueueEntryList(AMQQueue queue); + public FlowableQueueEntryList createQueueEntryList(AMQQueue queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 7f46a6063a..fa67162228 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -79,7 +79,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private volatile Subscription _exclusiveSubscriber; - protected final QueueEntryList _entries; + protected final FlowableQueueEntryList _entries; private final AMQQueueMBean _managedObject; private final Executor _asyncDelivery; @@ -465,8 +465,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throws AMQException { _deliveredMessages.incrementAndGet(); + + if (entry.isFlowed()) + { + _logger.debug("Synchronously loading flowed entry:" + entry.debugIdentity()); + _entries.loadEntry(entry); + } + sub.send(entry); + // We have delivered this message so we can unload it. + if (_entries.isFlowed() && entry.isAcquired() && entry.getDeliveredToConsumer()) + { + _entries.unloadEntry(entry); + } + } private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) @@ -1101,6 +1114,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (!_stopped.getAndSet(true)) { ReferenceCountingExecutorService.getInstance().releaseExecutorService(); + _entries.stop(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index 10abdd8318..c72353db6e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong; * under the License. * */ -public class SimpleQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList +public class SimpleQueueEntryList extends FlowableBaseQueueEntryList { private final QueueEntryImpl _head; @@ -172,7 +172,7 @@ public class SimpleQueueEntryList extends FlowableBaseQueueEntryList implements static class Factory implements QueueEntryListFactory { - public QueueEntryList createQueueEntryList(AMQQueue queue) + public FlowableQueueEntryList createQueueEntryList(AMQQueue queue) { return new SimpleQueueEntryList(queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index c5b6eeca3e..db05c7b299 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -49,7 +49,8 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.QueueBackingStore; -import org.apache.qpid.server.queue.FileQueueBackingStore; +import org.apache.qpid.server.queue.FileQueueBackingStoreFactory; +import org.apache.qpid.server.queue.QueueBackingStoreFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.routing.RoutingTable; import org.apache.qpid.server.security.access.ACLManager; @@ -88,7 +89,7 @@ public class VirtualHost implements Accessable private final Timer _houseKeepingTimer; private VirtualHostConfiguration _configuration; - private QueueBackingStore _queueBackingStore; + private QueueBackingStoreFactory _queueBackingStoreFactory; public void setAccessableName(String name) { @@ -116,9 +117,9 @@ public class VirtualHost implements Accessable return _configuration ; } - public QueueBackingStore getQueueBackingStore() + public QueueBackingStoreFactory getQueueBackingStoreFactory() { - return _queueBackingStore; + return _queueBackingStoreFactory; } /** @@ -194,8 +195,8 @@ public class VirtualHost implements Accessable initialiseRoutingTable(hostConfig); } - _queueBackingStore = new FileQueueBackingStore(); - _queueBackingStore.configure(this,hostConfig); + _queueBackingStoreFactory = new FileQueueBackingStoreFactory(); + _queueBackingStoreFactory.configure(this, hostConfig); _exchangeFactory.initialise(hostConfig); _exchangeRegistry.initialise(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 6021f100f5..4716f6691a 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -240,6 +240,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase return false; //To change body of implemented methods use File | Settings | File Templates. } + public boolean isAvailable() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + public boolean acquire() { return false; //To change body of implemented methods use File | Settings | File Templates. @@ -346,12 +351,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase return false; //To change body of implemented methods use File | Settings | File Templates. } - public void flow() throws UnableToFlowMessageException + public void unload() throws UnableToFlowMessageException { //To change body of implemented methods use File | Settings | File Templates. } - public void recover() + public void load() { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index a11e60d7de..f73bafd3b4 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.queue; import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import java.util.ArrayList; @@ -64,7 +63,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest _queue.registerSubscription(_subscription, false); Thread.sleep(150); - ArrayList<QueueEntry> msgs = _subscription.getMessages(); + ArrayList<QueueEntry> msgs = _subscription.getQueueEntries(); try { assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java index bb2a5f3d3b..d2cbd46e28 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java @@ -21,9 +21,9 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; +import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -42,22 +42,26 @@ import java.io.File; public class FileQueueBackingStoreTest extends TestCase { - FileQueueBackingStore _backing; + QueueBackingStore _backing; private TransactionLog _transactionLog; VirtualHost _vhost; - VirtualHostConfiguration _vhostConfig; + VirtualHostConfiguration _vhostConfig; + FileQueueBackingStoreFactory _factory; + AMQQueue _queue; public void setUp() throws Exception { - _backing = new FileQueueBackingStore(); + _factory = new FileQueueBackingStoreFactory(); PropertiesConfiguration config = new PropertiesConfiguration(); config.addProperty("store.class", MemoryMessageStore.class.getName()); _vhostConfig = new VirtualHostConfiguration(this.getName() + "-Vhost", config); _vhost = new VirtualHost(_vhostConfig); _transactionLog = _vhost.getTransactionLog(); - _backing.configure(_vhost, _vhost.getConfiguration()); + _factory.configure(_vhost, _vhost.getConfiguration()); + _queue = new SimpleAMQQueue(new AMQShortString(this.getName()), false, null, false, _vhost); + _backing = _factory.createBacking(_queue); } private void resetBacking(Configuration configuration) throws Exception @@ -67,9 +71,11 @@ public class FileQueueBackingStoreTest extends TestCase _vhost = new VirtualHost(_vhostConfig); _transactionLog = _vhost.getTransactionLog(); - _backing = new FileQueueBackingStore(); + _factory = new FileQueueBackingStoreFactory(); + + _factory.configure(_vhost, _vhost.getConfiguration()); - _backing.configure(_vhost, _vhost.getConfiguration()); + _backing = _factory.createBacking(_queue); } public void testInvalidSetupRootExistsIsFile() throws Exception @@ -171,18 +177,18 @@ public class FileQueueBackingStoreTest extends TestCase chb); if (chb.bodySize > 0) { - ContentChunk chunk = new MockContentChunk((int) chb.bodySize/2); + ContentChunk chunk = new MockContentChunk((int) chb.bodySize / 2); original.addContentBodyFrame(null, chunk, false); - chunk = new MockContentChunk((int) chb.bodySize/2); + chunk = new MockContentChunk((int) chb.bodySize / 2); - original.addContentBodyFrame(null, chunk, true); + original.addContentBodyFrame(null, chunk, true); } - _backing.flow(original); + _backing.unload(original); - AMQMessage fromDisk = _backing.recover(original.getMessageId()); + AMQMessage fromDisk = _backing.load(original.getMessageId()); assertEquals("Message IDs do not match", original.getMessageId(), fromDisk.getMessageId()); assertEquals("Message arrival times do not match", original.getArrivalTime(), fromDisk.getArrivalTime()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 9a5f7f20c6..0e2b17914c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -20,6 +20,7 @@ package org.apache.qpid.server.queue; * */ +import junit.framework.AssertionFailedError; import junit.framework.TestCase; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; @@ -29,6 +30,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -58,7 +60,7 @@ public class SimpleAMQQueueTest extends TestCase protected FieldTable _arguments = null; MessagePublishInfo info = new MessagePublishInfoImpl(); - private static final long MESSAGE_SIZE = 100; + private static long MESSAGE_SIZE = 100; @Override protected void setUp() throws Exception @@ -68,7 +70,7 @@ public class SimpleAMQQueueTest extends TestCase ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1); PropertiesConfiguration env = new PropertiesConfiguration(); - _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _transactionLog); + _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments); @@ -362,56 +364,69 @@ public class SimpleAMQQueueTest extends TestCase // Create IncomingMessage and nondurable queue NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + MESSAGE_SIZE = 1; + long MEMORY_MAX = 500; + int MESSAGE_COUNT = (int) MEMORY_MAX * 2; //Set the Memory Usage to be very low - _queue.setMemoryUsageMaximum(10); + _queue.setMemoryUsageMaximum(MEMORY_MAX); - for (int msgCount = 0; msgCount < 10; msgCount++) + for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++) { sendMessage(txnContext); } //Check that we can hold 10 messages without flowing - assertEquals(10, _queue.getMessageCount()); - assertEquals(10, _queue.getMemoryUsageCurrent()); + assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent()); assertTrue("Queue is flowed.", !_queue.isFlowed()); // Send anothe and ensure we are flowed sendMessage(txnContext); - assertEquals(11, _queue.getMessageCount()); - assertEquals(10, _queue.getMemoryUsageCurrent()); + assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount()); + assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent()); assertTrue("Queue is not flowed.", _queue.isFlowed()); - //send another 9 so there are 20msgs in total on the queue - for (int msgCount = 0; msgCount < 9; msgCount++) + //send another 99 so there are 200msgs in total on the queue + for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++) { sendMessage(txnContext); + + long usage = _queue.getMemoryUsageCurrent(); + assertTrue("Queue has gone over quota:" + usage, + usage <= _queue.getMemoryUsageMaximum()); + + assertTrue("Queue has a negative quota:" + usage,usage > 0); + } - assertEquals(20, _queue.getMessageCount()); - assertEquals(10, _queue.getMemoryUsageCurrent()); + assertEquals(MESSAGE_COUNT, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent()); assertTrue("Queue is not flowed.", _queue.isFlowed()); _queue.registerSubscription(_subscription, false); - Thread.sleep(200); + int slept = 0; + while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10) + { + Thread.sleep(500); + slept++; + } //Ensure the messages are retreived - assertEquals("Not all messages were received.", 20, _subscription.getMessages().size()); + assertEquals("Not all messages were received, slept:"+slept/2+"s", MESSAGE_COUNT, _subscription.getQueueEntries().size()); - //Ensure we got the content - for (int index = 0; index < 10; index++) - { - QueueEntry entry = _subscription.getMessages().get(index); - assertNotNull("Message:" + index + " was null.", entry.getMessage()); - assertTrue(!entry.isFlowed()); - } + //Check the queue is still within it's limits. + assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(), + _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum()); - //ensure we were received 10 flowed messages - for (int index = 10; index < 20; index++) + assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() > 0); + + for (int index = 0; index < MESSAGE_COUNT; index++) { - QueueEntry entry = _subscription.getMessages().get(index); - assertNull("Message:" + index + " was not null.", entry.getMessage()); - assertTrue(entry.isFlowed()); + // Ensure that we have received the messages and it wasn't flushed to disk before we received it. + AMQMessage message = _subscription.getMessages().get(index); + assertNotNull("Message:" + message.debugIdentity() + " was null.", message); } + } private void sendMessage(TransactionalContext txnContext) throws AMQException @@ -419,7 +434,8 @@ public class SimpleAMQQueueTest extends TestCase IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.bodySize = 1; + contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID; + contentHeaderBody.bodySize = MESSAGE_SIZE; contentHeaderBody.properties = new BasicContentHeaderProperties(); ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); msg.setContentHeaderBody(contentHeaderBody); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java index 832df80004..0c33b406e6 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java @@ -46,7 +46,10 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown()); - assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount()); + //This is +2 because: + // 1 - asyncDelivery Thread + // 2 - queueInhalerThread + assertEquals("References not increased", initialCount + 2, ReferenceCountingExecutorService.getInstance().getReferenceCount()); queue.stop(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 33fd669d5c..ab0870144b 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -30,10 +30,13 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState; +import org.apache.log4j.Logger; public class MockSubscription implements Subscription { + private static final Logger _logger = Logger.getLogger(MockSubscription.class); private boolean _closed = false; private AMQShortString tag = new AMQShortString("mocktag"); @@ -41,8 +44,12 @@ public class MockSubscription implements Subscription private StateListener _listener = null; private QueueEntry lastSeen = null; private State _state = State.ACTIVE; - private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); + private ArrayList<QueueEntry> _queueEntries = new ArrayList<QueueEntry>(); private final Lock _stateChangeLock = new ReentrantLock(); + private ArrayList<AMQMessage> _messages = new ArrayList<AMQMessage>(); + + + public void close() { @@ -136,10 +143,14 @@ public class MockSubscription implements Subscription { } - public void send(QueueEntry msg) throws AMQException + public void send(QueueEntry entry) throws AMQException { - lastSeen = msg; - messages.add(msg); + _logger.info("Sending Message(" + entry.debugIdentity() + ") to subscription:" + this); + + lastSeen = entry; + _queueEntries.add(entry); + _messages.add(entry.getMessage()); + entry.setDeliveredToSubscription(); } public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue) @@ -173,8 +184,14 @@ public class MockSubscription implements Subscription return false; } - public ArrayList<QueueEntry> getMessages() + public ArrayList<QueueEntry> getQueueEntries() { - return messages; + return _queueEntries; } + + public ArrayList<AMQMessage> getMessages() + { + return _messages; + } + } |
