summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java305
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java166
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java168
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java13
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java9
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java30
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java70
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java29
20 files changed, 643 insertions, 292 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
index 7acb683e3b..4e3b2298d1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
@@ -22,19 +22,12 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.util.FileUtils;
-import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.ConfigurationException;
import java.io.File;
import java.io.FileInputStream;
@@ -43,219 +36,142 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
public class FileQueueBackingStore implements QueueBackingStore
{
private static final Logger _log = Logger.getLogger(FileQueueBackingStore.class);
- private AtomicBoolean _closed = new AtomicBoolean(false);
private String _flowToDiskLocation;
- private static final String QUEUE_BACKING_DIR = "queueBacking";
- public void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException
+ public FileQueueBackingStore(String location)
{
- setFlowToDisk(virtualHost.getName(), config.getFlowToDiskLocation());
+ _flowToDiskLocation = location;
}
- private void setFlowToDisk(String vHostName, String location) throws ConfigurationException
+ public AMQMessage load(Long messageId)
{
- if (vHostName == null)
- {
- throw new ConfigurationException("Unable to setup to Flow to Disk as Virtualhost name was not specified");
- }
+ _log.info("Loading Message (ID:" + messageId + ")");
- if (location == null)
- {
- throw new ConfigurationException("Unable to setup to Flow to Disk as location was not specified.");
- }
+ MessageMetaData mmd;
- _flowToDiskLocation = location;
+ File handle = getFileHandle(messageId);
+ handle.deleteOnExit();
- _flowToDiskLocation += File.separator + QUEUE_BACKING_DIR + File.separator + vHostName;
+ ObjectInputStream input = null;
- File root = new File(location);
- if (!root.exists())
- {
- throw new ConfigurationException("Specified Flow to Disk root does not exist:" + root.getAbsolutePath());
- }
- else
+ Exception error = null;
+ try
{
+ input = new ObjectInputStream(new FileInputStream(handle));
- if (root.isFile())
- {
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified root is a file:"+
- root.getAbsolutePath());
- }
+ long arrivaltime = input.readLong();
+
+ final AMQShortString exchange = new AMQShortString(input.readUTF());
+ final AMQShortString routingKey = new AMQShortString(input.readUTF());
+ final boolean mandatory = input.readBoolean();
+ final boolean immediate = input.readBoolean();
+
+ int bodySize = input.readInt();
+ byte[] underlying = new byte[bodySize];
+
+ input.readFully(underlying, 0, bodySize);
+
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
- if(!root.canWrite())
+ ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, bodySize);
+
+ int chunkCount = input.readInt();
+
+ // There are WAY to many annonymous MPIs in the code this should be made concrete.
+ MessagePublishInfo info = new MessagePublishInfo()
{
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to specified root:"+
- root.getAbsolutePath());
- }
- }
+ public AMQShortString getExchange()
+ {
+ return exchange;
+ }
+ public void setExchange(AMQShortString exchange)
+ {
- File store = new File(_flowToDiskLocation);
- if (store.exists())
- {
- if (!FileUtils.delete(store, true))
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ };
+
+ mmd = new MessageMetaData(info, chb, chunkCount);
+ mmd.setArrivalTime(arrivaltime);
+
+ AMQMessage message;
+ if (((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2)
+ {
+ message = new PersistentAMQMessage(messageId, null);
+ }
+ else
{
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store as directory already exsits:"
- + store.getAbsolutePath());
+ message = new TransientAMQMessage(messageId);
}
- if (store.isFile())
+ message.recoverFromMessageMetaData(mmd);
+
+ for (int chunk = 0; chunk < chunkCount; chunk++)
{
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified location is a file:"+
- store.getAbsolutePath());
+ int length = input.readInt();
+
+ byte[] data = new byte[length];
+
+ input.readFully(data, 0, length);
+
+ try
+ {
+ message.recoverContentBodyFrame(new RecoverDataBuffer(length, data), (chunk + 1 == chunkCount));
+ }
+ catch (AMQException e)
+ {
+ //ignore as this will not occur.
+ // It is thrown by the _transactionLog method in load on PersistentAMQMessage
+ // but we have created the message with a null log and will never call that method.
+ }
}
+ return message;
}
- else
+ catch (Exception e)
{
- if (!store.getParentFile().getParentFile().canWrite())
- {
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to parent location:"+
- store.getParentFile().getParentFile().getAbsolutePath());
- }
+ error = e;
}
-
-
- _log.info("Creating Flow to Disk Store : " + store.getAbsolutePath());
- store.deleteOnExit();
- if (!store.mkdirs())
+ finally
{
- throw new ConfigurationException("Unable to create Temporary Flow to Disk store:" + store.getAbsolutePath());
+ try
+ {
+ input.close();
+ // We can purge the message here then reflow it if required but I believe it to be cleaner to leave it
+ // on disk until it has been deleted from the queue at that point we can be sure we won't need the data
+ //handle.delete();
+ }
+ catch (IOException e)
+ {
+ _log.info("Unable to close input on message(" + messageId + ") recovery due to:" + e.getMessage());
+ }
}
- }
-
-
- public AMQMessage recover(Long messageId)
- {
- MessageMetaData mmd;
- List<ContentChunk> contentBodies = new LinkedList<ContentChunk>();
-
- File handle = getFileHandle(messageId);
- handle.deleteOnExit();
-
- ObjectInputStream input = null;
-
- Exception error = null;
- try
- {
- input = new ObjectInputStream(new FileInputStream(handle));
-
- long arrivaltime = input.readLong();
-
- final AMQShortString exchange = new AMQShortString(input.readUTF());
- final AMQShortString routingKey = new AMQShortString(input.readUTF());
- final boolean mandatory = input.readBoolean();
- final boolean immediate = input.readBoolean();
-
- int bodySize = input.readInt();
- byte[] underlying = new byte[bodySize];
-
- input.readFully(underlying, 0, bodySize);
-
- ByteBuffer buf = ByteBuffer.wrap(underlying);
-
- ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, bodySize);
-
- int chunkCount = input.readInt();
-
- // There are WAY to many annonymous MPIs in the code this should be made concrete.
- MessagePublishInfo info = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- };
-
- mmd = new MessageMetaData(info, chb, chunkCount);
- mmd.setArrivalTime(arrivaltime);
-
- AMQMessage message;
- if (((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2)
- {
- message = new PersistentAMQMessage(messageId, null);
- }
- else
- {
- message = new TransientAMQMessage(messageId);
- }
-
- message.recoverFromMessageMetaData(mmd);
-
- for (int chunk = 0; chunk < chunkCount; chunk++)
- {
- int length = input.readInt();
-
- byte[] data = new byte[length];
-
- input.readFully(data, 0, length);
-
- // There are WAY to many annonymous CCs in the code this should be made concrete.
- try
- {
- message.recoverContentBodyFrame(new RecoverDataBuffer(length, data), (chunk + 1 == chunkCount));
- }
- catch (AMQException e)
- {
- //ignore as this will not occur.
- // It is thrown by the _transactionLog method in recover on PersistentAMQMessage
- // but we have created the message with a null log and will never call that method.
- }
- }
-
- return message;
- }
- catch (Exception e)
- {
- error = e;
- }
- finally
- {
- try
- {
- input.close();
- }
- catch (IOException e)
- {
- _log.info("Unable to close input on message("+messageId+") recovery due to:"+e.getMessage());
- }
- }
throw new UnableToRecoverMessageException(error);
}
-
- public void flow(AMQMessage message) throws UnableToFlowMessageException
+ public void unload(AMQMessage message) throws UnableToFlowMessageException
{
long messageId = message.getMessageId();
@@ -264,10 +180,18 @@ public class FileQueueBackingStore implements QueueBackingStore
//If we have written the data once then we don't need to do it again.
if (handle.exists())
{
- _log.debug("Message(" + messageId + ") already flowed to disk.");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message(ID:" + messageId + ") already unloaded.");
+ }
return;
}
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Unloading Message (ID:" + messageId + ")");
+ }
+
handle.deleteOnExit();
ObjectOutputStream writer = null;
@@ -334,7 +258,7 @@ public class FileQueueBackingStore implements QueueBackingStore
if (error != null)
{
- _log.error("Unable to flow message(" + messageId + ") to disk, restoring state.");
+ _log.error("Unable to unload message(" + messageId + ") to disk, restoring state.");
handle.delete();
throw new UnableToFlowMessageException(messageId, error);
}
@@ -358,7 +282,7 @@ public class FileQueueBackingStore implements QueueBackingStore
// grab the 8 LSB to give us 256 bins
long bin = messageId & 0xFFL;
- String bin_path =_flowToDiskLocation + File.separator + bin;
+ String bin_path = _flowToDiskLocation + File.separator + bin;
File bin_dir = new File(bin_path);
if (!bin_dir.exists())
@@ -379,7 +303,10 @@ public class FileQueueBackingStore implements QueueBackingStore
if (handle.exists())
{
- _log.debug("Message(" + messageId + ") delete flowToDisk.");
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Message(" + messageId + ") delete flowToDisk.");
+ }
if (!handle.delete())
{
throw new RuntimeException("Unable to delete flowToDisk data");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java
new file mode 100644
index 0000000000..0cfa9d6b32
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStoreFactory.java
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.util.FileUtils;
+
+import java.io.File;
+
+public class FileQueueBackingStoreFactory implements QueueBackingStoreFactory
+{
+ private static final Logger _log = Logger.getLogger(FileQueueBackingStoreFactory.class);
+
+ private String _flowToDiskLocation;
+ private static final String QUEUE_BACKING_DIR = "queueBacking";
+
+ public void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException
+ {
+ setFlowToDisk(virtualHost.getName(), config.getFlowToDiskLocation());
+ }
+
+ private void setFlowToDisk(String vHostName, String location) throws ConfigurationException
+ {
+ if (vHostName == null)
+ {
+ throw new ConfigurationException("Unable to setup to Flow to Disk as Virtualhost name was not specified");
+ }
+
+ if (location == null)
+ {
+ throw new ConfigurationException("Unable to setup to Flow to Disk as location was not specified.");
+ }
+
+ _flowToDiskLocation = location;
+
+ _flowToDiskLocation += File.separator + QUEUE_BACKING_DIR + File.separator + vHostName;
+
+ File root = new File(location);
+ if (!root.exists())
+ {
+ throw new ConfigurationException("Specified Flow to Disk root does not exist:" + root.getAbsolutePath());
+ }
+ else
+ {
+
+ if (root.isFile())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified root is a file:" +
+ root.getAbsolutePath());
+ }
+
+ if (!root.canWrite())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store. Unable to write to specified root:" +
+ root.getAbsolutePath());
+ }
+
+ }
+
+ // if we don't mark QUEUE_BAKCING_DIR as a deleteOnExit it will remain.
+ File backingDir = new File(location + File.separator + QUEUE_BACKING_DIR);
+ if (backingDir.exists())
+ {
+ if (!FileUtils.delete(backingDir, true))
+ {
+ throw new ConfigurationException("Unable to delete existing Flow to Disk root at:"
+ + backingDir.getAbsolutePath());
+ }
+
+ if (backingDir.isFile())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk root as specified location is a file:" +
+ backingDir.getAbsolutePath());
+ }
+ }
+
+ backingDir.deleteOnExit();
+ if (!backingDir.mkdirs())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk root:" + location + File.separator + QUEUE_BACKING_DIR);
+ }
+
+
+ File store = new File(_flowToDiskLocation);
+ if (store.exists())
+ {
+ if (!FileUtils.delete(store, true))
+ {
+ throw new ConfigurationException("Unable to delete existing Flow to Disk store at:"
+ + store.getAbsolutePath());
+ }
+
+ if (store.isFile())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store as specified location is a file:" +
+ store.getAbsolutePath());
+ }
+
+ }
+
+ _log.info("Creating Flow to Disk Store : " + store.getAbsolutePath());
+ store.deleteOnExit();
+
+ if(!store.mkdir())
+ {
+ throw new ConfigurationException("Unable to create Temporary Flow to Disk store:" + store.getAbsolutePath());
+ }
+ }
+
+ public QueueBackingStore createBacking(AMQQueue queue)
+ {
+ return new FileQueueBackingStore(createStore(queue.getName().toString()));
+ }
+
+ private String createStore(String name)
+ {
+ return createStore(name, 0);
+ }
+
+ private String createStore(String name, int index)
+ {
+
+ String store = _flowToDiskLocation + File.separator + name;
+ if (index > 0)
+ {
+ store += "-" + index;
+ }
+
+ //TODO ensure store is safe for the OS
+
+ File storeFile = new File(store);
+
+ if (storeFile.exists())
+ {
+ return createStore(name, index + 1);
+ }
+
+ storeFile.mkdirs();
+
+ storeFile.deleteOnExit();
+
+ return store;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
index 72ea5f2667..a4f80a44b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
@@ -20,16 +20,18 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.log4j.Logger;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
-/**
- * This is an abstract base class to handle
- */
+/** This is an abstract base class to handle */
public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryList
{
private static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class);
@@ -43,9 +45,12 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
/** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */
private long _memoryUsageMinimum = 0;
- private AtomicBoolean _flowed;
+ private volatile AtomicBoolean _flowed;
private QueueBackingStore _backingStore;
protected AMQQueue _queue;
+ private Executor _inhaler;
+ private AtomicBoolean _stopped;
+ private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null);
FlowableBaseQueueEntryList(AMQQueue queue)
{
@@ -54,15 +59,18 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
VirtualHost vhost = queue.getVirtualHost();
if (vhost != null)
{
- _backingStore = vhost.getQueueBackingStore();
+ _backingStore = vhost.getQueueBackingStoreFactory().createBacking(queue);
}
+
+ _stopped = new AtomicBoolean(false);
+ _inhaler = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
}
public void setFlowed(boolean flowed)
{
if (_flowed.get() != flowed)
{
- _log.info("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
+ _log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
_flowed.set(flowed);
}
}
@@ -94,14 +102,15 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
// Don't attempt to start the inhaler/purger unless we have a minimum value specified.
if (_memoryUsageMaximum > 0)
{
- // If we've increased the max memory above what we have in memory then we can inhale more
- if (_memoryUsageMaximum > _atomicQueueInMemory.get())
+ if (_memoryUsageMinimum == 0)
{
- //TODO start inhaler
+ setMemoryUsageMinimum(_memoryUsageMaximum / 2);
}
- else // if we have now have to much memory in use we need to purge.
+
+ // if we have now have to much memory in use we need to purge.
+ if (_memoryUsageMaximum < _atomicQueueInMemory.get())
{
- //TODO start purger
+ startPurger();
}
}
}
@@ -118,19 +127,78 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
// Don't attempt to start the inhaler unless we have a minimum value specified.
if (_memoryUsageMinimum > 0)
{
- // If we've increased the minimum memory above what we have in memory then we need to inhale more
- if (_memoryUsageMinimum >= _atomicQueueInMemory.get())
+ checkAndStartLoader();
+ }
+ }
+
+ private void checkAndStartLoader()
+ {
+ // If we've increased the minimum memory above what we have in memory then we need to inhale more
+ if (_atomicQueueInMemory.get() <= _memoryUsageMinimum)
+ {
+ startInhaler();
+ }
+ }
+
+ private void startInhaler()
+ {
+ if (_flowed.get())
+ {
+ MessageInhaler inhaler = new MessageInhaler();
+
+ if (_asynchronousInhaler.compareAndSet(null, inhaler))
{
- //TODO start inhaler
+ _inhaler.execute(inhaler);
}
}
}
+ private void startPurger()
+ {
+ //TODO create purger, used when maxMemory is reduced creating over memory situation.
+ _log.warn("Requested Purger Start.. purger TBC.");
+ //_purger.execute(new MessagePurger(this));
+ }
+
public long getMemoryUsageMinimum()
{
return _memoryUsageMinimum;
}
+ public void unloadEntry(QueueEntry queueEntry)
+ {
+ try
+ {
+ queueEntry.unload();
+ _atomicQueueInMemory.addAndGet(-queueEntry.getSize());
+ checkAndStartLoader();
+ }
+ catch (UnableToFlowMessageException e)
+ {
+ _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+ }
+ }
+
+ public void loadEntry(QueueEntry queueEntry)
+ {
+ queueEntry.load();
+ if( _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+ {
+ _log.error("Loaded to much data!:"+_atomicQueueInMemory.get()+"/"+_memoryUsageMaximum);
+ }
+ }
+
+ public void stop()
+ {
+ if (!_stopped.getAndSet(true))
+ {
+ // The SimpleAMQQueue keeps running when stopped so we should just release the services
+ // rather than actively shutdown our threads.
+ //Shutdown thread for inhaler.
+ ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+ }
+ }
+
protected boolean willCauseFlowToDisk(QueueEntryImpl queueEntry)
{
return _memoryUsageMaximum != 0 && memoryUsed() + queueEntry.getSize() > _memoryUsageMaximum;
@@ -153,13 +221,14 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
/**
* Called when we are now flowing to disk
+ *
* @param queueEntry the entry that is being flowed to disk
*/
protected void flowingToDisk(QueueEntryImpl queueEntry)
{
try
{
- queueEntry.flow();
+ queueEntry.unload();
}
catch (UnableToFlowMessageException e)
{
@@ -182,4 +251,71 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
return _backingStore;
}
+ private class MessageInhaler implements Runnable
+ {
+ public void run()
+ {
+ String threadName = Thread.currentThread().getName();
+ Thread.currentThread().setName("Inhaler-" + _queue.getVirtualHost().getName() + "-" + _queue.getName());
+ try
+ {
+ inhaleList(this);
+ }
+ finally
+ {
+ Thread.currentThread().setName(threadName);
+ }
+ }
+ }
+
+ private void inhaleList(MessageInhaler messageInhaler)
+ {
+ _log.info("Inhaler Running");
+ // If in memory count is at or over max then we can't inhale
+ if (_atomicQueueInMemory.get() >= _memoryUsageMaximum)
+ {
+ _log.debug("Unable to start inhaling as we are already over quota:" +
+ _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum);
+ return;
+ }
+
+ _asynchronousInhaler.compareAndSet(messageInhaler, null);
+ while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && _asynchronousInhaler.compareAndSet(null, messageInhaler))
+ {
+ QueueEntryIterator iterator = iterator();
+
+ while (!iterator.getNode().isAvailable() && iterator.advance())
+ {
+ //Find first AVAILABLE node
+ }
+
+ while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && !iterator.atTail())
+ {
+ QueueEntry entry = iterator.getNode();
+
+ if (entry.isAvailable() && entry.isFlowed())
+ {
+ loadEntry(entry);
+ }
+
+ iterator.advance();
+ }
+
+ if (iterator.atTail())
+ {
+ setFlowed(false);
+ }
+
+ _asynchronousInhaler.set(null);
+ }
+
+ //If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
+ if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+ {
+ _inhaler.execute(messageInhaler);
+
+ }
+
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
index 4e95978bf8..b547a41047 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.queue;
-public interface FlowableQueueEntryList
+import java.util.concurrent.atomic.AtomicLong;
+
+public interface FlowableQueueEntryList extends QueueEntryList
{
void setFlowed(boolean flowed);
@@ -38,5 +40,19 @@ public interface FlowableQueueEntryList
void setMemoryUsageMinimum(long minimumMemoryUsage);
- long getMemoryUsageMinimum();
+ long getMemoryUsageMinimum();
+
+ /**
+ * Immediately unload Entry
+ * @param queueEntry the entry to unload
+ */
+ public void unloadEntry(QueueEntry queueEntry);
+
+ /**
+ * Immediately load Entry
+ * @param queueEntry the entry to load
+ */
+ public void loadEntry(QueueEntry queueEntry);
+
+ void stop();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
index d812b8ceca..23307d8acf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
@@ -22,10 +22,10 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.CommonContentHeaderProperties;
-public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
+public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
{
private final AMQQueue _queue;
- private final QueueEntryList[] _priorityLists;
+ private final FlowableQueueEntryList[] _priorityLists;
private final int _priorities;
private final int _priorityOffset;
@@ -33,7 +33,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
{
super(queue);
_queue = queue;
- _priorityLists = new QueueEntryList[priorities];
+ _priorityLists = new FlowableQueueEntryList[priorities];
_priorities = priorities;
_priorityOffset = 5-((priorities + 1)/2);
for(int i = 0; i < priorities; i++)
@@ -53,7 +53,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
}
public QueueEntry add(AMQMessage message)
- {
+ {
int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
if(index >= _priorities)
{
@@ -152,7 +152,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
_priorities = priorities;
}
- public QueueEntryList createQueueEntryList(AMQQueue queue)
+ public FlowableQueueEntryList createQueueEntryList(AMQQueue queue)
{
return new PriorityQueueEntryList(queue, _priorities);
}
@@ -162,7 +162,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
public int size()
{
int size=0;
- for (QueueEntryList queueEntryList : _priorityLists)
+ for (FlowableQueueEntryList queueEntryList : _priorityLists)
{
size += queueEntryList.size();
}
@@ -174,9 +174,6 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
@Override
protected void flowingToDisk(QueueEntryImpl queueEntry)
{
- //TODO this disables FTD for priority queues
- // As the incomming message isn't always the one to purge.
- // More logic is required up in the add() method here to determine if the
- // incomming message is at the 'front' or not.
+ //This class doesn't maintain it's own sizes it delegates to the sub FlowableQueueEntryLists
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
index 376e6f1b57..1f575d1e05 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
@@ -26,11 +26,9 @@ import org.apache.commons.configuration.ConfigurationException;
public interface QueueBackingStore
{
- void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException;
+ AMQMessage load(Long messageId);
- AMQMessage recover(Long messageId);
-
- void flow(AMQMessage message) throws UnableToFlowMessageException;
+ void unload(AMQMessage message) throws UnableToFlowMessageException;
void delete(Long messageId);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java
new file mode 100644
index 0000000000..3dd23a2f40
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStoreFactory.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+
+public interface QueueBackingStoreFactory
+{
+ void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException;
+
+ public QueueBackingStore createBacking(AMQQueue queue);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 25d41c8203..7e41cf53a2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -157,6 +157,8 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
boolean isAcquired();
+ boolean isAvailable();
+
boolean acquire();
boolean acquire(Subscription sub);
@@ -219,9 +221,9 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
boolean removeStateChangeListener(StateChangeListener listener);
- void flow() throws UnableToFlowMessageException;
+ void unload() throws UnableToFlowMessageException;
- void recover();
+ void load();
boolean isFlowed();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 3d464d01d3..8ee03d3d74 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -181,6 +181,11 @@ public class QueueEntryImpl implements QueueEntry
return _state.getState() == State.ACQUIRED;
}
+ public boolean isAvailable()
+ {
+ return _state.getState() == State.AVAILABLE;
+ }
+
public boolean acquire()
{
return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
@@ -220,7 +225,15 @@ public class QueueEntryImpl implements QueueEntry
public String debugIdentity()
{
- return getMessage().debugIdentity();
+ String entry="[State:"+_state.getState().name()+"]";
+ if (_message == null)
+ {
+ return entry+"(Message Unloaded ID:" + _messageId +")";
+ }
+ else
+ {
+ return entry+_message.debugIdentity();
+ }
}
@@ -380,25 +393,29 @@ public class QueueEntryImpl implements QueueEntry
return false;
}
- public void flow() throws UnableToFlowMessageException
+ public void unload() throws UnableToFlowMessageException
{
if (_message != null && _backingStore != null)
{
if(_log.isDebugEnabled())
{
- _log.debug("Flowing message:" + _message.debugIdentity());
+ _log.debug("Unloading:" + debugIdentity());
}
- _backingStore.flow(_message);
+ _backingStore.unload(_message);
_message = null;
- _flowed.getAndSet(true);
+ _flowed.getAndSet(true);
}
}
- public void recover()
+ public void load()
{
if (_messageId != null && _backingStore != null)
{
- _message = _backingStore.recover(_messageId);
+ _message = _backingStore.load(_messageId);
+ if(_log.isDebugEnabled())
+ {
+ _log.debug("Loading:" + debugIdentity());
+ }
_flowed.getAndSet(false);
}
}
@@ -471,5 +488,4 @@ public class QueueEntryImpl implements QueueEntry
return _queueEntryList;
}
-
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 72783e3f78..313e076f61 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryList extends FlowableQueueEntryList
+public interface QueueEntryList
{
AMQQueue getQueue();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
index 4dbce45f67..b4a868cf3c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
@@ -22,5 +22,5 @@ package org.apache.qpid.server.queue;
interface QueueEntryListFactory
{
- public QueueEntryList createQueueEntryList(AMQQueue queue);
+ public FlowableQueueEntryList createQueueEntryList(AMQQueue queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 7f46a6063a..fa67162228 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -79,7 +79,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private volatile Subscription _exclusiveSubscriber;
- protected final QueueEntryList _entries;
+ protected final FlowableQueueEntryList _entries;
private final AMQQueueMBean _managedObject;
private final Executor _asyncDelivery;
@@ -465,8 +465,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throws AMQException
{
_deliveredMessages.incrementAndGet();
+
+ if (entry.isFlowed())
+ {
+ _logger.debug("Synchronously loading flowed entry:" + entry.debugIdentity());
+ _entries.loadEntry(entry);
+ }
+
sub.send(entry);
+ // We have delivered this message so we can unload it.
+ if (_entries.isFlowed() && entry.isAcquired() && entry.getDeliveredToConsumer())
+ {
+ _entries.unloadEntry(entry);
+ }
+
}
private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
@@ -1101,6 +1114,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (!_stopped.getAndSet(true))
{
ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+ _entries.stop();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index 10abdd8318..c72353db6e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
* under the License.
*
*/
-public class SimpleQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
+public class SimpleQueueEntryList extends FlowableBaseQueueEntryList
{
private final QueueEntryImpl _head;
@@ -172,7 +172,7 @@ public class SimpleQueueEntryList extends FlowableBaseQueueEntryList implements
static class Factory implements QueueEntryListFactory
{
- public QueueEntryList createQueueEntryList(AMQQueue queue)
+ public FlowableQueueEntryList createQueueEntryList(AMQQueue queue)
{
return new SimpleQueueEntryList(queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index c5b6eeca3e..db05c7b299 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -49,7 +49,8 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.QueueBackingStore;
-import org.apache.qpid.server.queue.FileQueueBackingStore;
+import org.apache.qpid.server.queue.FileQueueBackingStoreFactory;
+import org.apache.qpid.server.queue.QueueBackingStoreFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.server.security.access.ACLManager;
@@ -88,7 +89,7 @@ public class VirtualHost implements Accessable
private final Timer _houseKeepingTimer;
private VirtualHostConfiguration _configuration;
- private QueueBackingStore _queueBackingStore;
+ private QueueBackingStoreFactory _queueBackingStoreFactory;
public void setAccessableName(String name)
{
@@ -116,9 +117,9 @@ public class VirtualHost implements Accessable
return _configuration ;
}
- public QueueBackingStore getQueueBackingStore()
+ public QueueBackingStoreFactory getQueueBackingStoreFactory()
{
- return _queueBackingStore;
+ return _queueBackingStoreFactory;
}
/**
@@ -194,8 +195,8 @@ public class VirtualHost implements Accessable
initialiseRoutingTable(hostConfig);
}
- _queueBackingStore = new FileQueueBackingStore();
- _queueBackingStore.configure(this,hostConfig);
+ _queueBackingStoreFactory = new FileQueueBackingStoreFactory();
+ _queueBackingStoreFactory.configure(this, hostConfig);
_exchangeFactory.initialise(hostConfig);
_exchangeRegistry.initialise();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 6021f100f5..4716f6691a 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -240,6 +240,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isAvailable()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean acquire()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -346,12 +351,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void flow() throws UnableToFlowMessageException
+ public void unload() throws UnableToFlowMessageException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void recover()
+ public void load()
{
//To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index a11e60d7de..f73bafd3b4 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import java.util.ArrayList;
@@ -64,7 +63,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
- ArrayList<QueueEntry> msgs = _subscription.getMessages();
+ ArrayList<QueueEntry> msgs = _subscription.getQueueEntries();
try
{
assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
index bb2a5f3d3b..d2cbd46e28 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
@@ -21,9 +21,9 @@
package org.apache.qpid.server.queue;
import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -42,22 +42,26 @@ import java.io.File;
public class FileQueueBackingStoreTest extends TestCase
{
- FileQueueBackingStore _backing;
+ QueueBackingStore _backing;
private TransactionLog _transactionLog;
VirtualHost _vhost;
- VirtualHostConfiguration _vhostConfig;
+ VirtualHostConfiguration _vhostConfig;
+ FileQueueBackingStoreFactory _factory;
+ AMQQueue _queue;
public void setUp() throws Exception
{
- _backing = new FileQueueBackingStore();
+ _factory = new FileQueueBackingStoreFactory();
PropertiesConfiguration config = new PropertiesConfiguration();
config.addProperty("store.class", MemoryMessageStore.class.getName());
_vhostConfig = new VirtualHostConfiguration(this.getName() + "-Vhost", config);
_vhost = new VirtualHost(_vhostConfig);
_transactionLog = _vhost.getTransactionLog();
- _backing.configure(_vhost, _vhost.getConfiguration());
+ _factory.configure(_vhost, _vhost.getConfiguration());
+ _queue = new SimpleAMQQueue(new AMQShortString(this.getName()), false, null, false, _vhost);
+ _backing = _factory.createBacking(_queue);
}
private void resetBacking(Configuration configuration) throws Exception
@@ -67,9 +71,11 @@ public class FileQueueBackingStoreTest extends TestCase
_vhost = new VirtualHost(_vhostConfig);
_transactionLog = _vhost.getTransactionLog();
- _backing = new FileQueueBackingStore();
+ _factory = new FileQueueBackingStoreFactory();
+
+ _factory.configure(_vhost, _vhost.getConfiguration());
- _backing.configure(_vhost, _vhost.getConfiguration());
+ _backing = _factory.createBacking(_queue);
}
public void testInvalidSetupRootExistsIsFile() throws Exception
@@ -171,18 +177,18 @@ public class FileQueueBackingStoreTest extends TestCase
chb);
if (chb.bodySize > 0)
{
- ContentChunk chunk = new MockContentChunk((int) chb.bodySize/2);
+ ContentChunk chunk = new MockContentChunk((int) chb.bodySize / 2);
original.addContentBodyFrame(null, chunk, false);
- chunk = new MockContentChunk((int) chb.bodySize/2);
+ chunk = new MockContentChunk((int) chb.bodySize / 2);
- original.addContentBodyFrame(null, chunk, true);
+ original.addContentBodyFrame(null, chunk, true);
}
- _backing.flow(original);
+ _backing.unload(original);
- AMQMessage fromDisk = _backing.recover(original.getMessageId());
+ AMQMessage fromDisk = _backing.load(original.getMessageId());
assertEquals("Message IDs do not match", original.getMessageId(), fromDisk.getMessageId());
assertEquals("Message arrival times do not match", original.getArrivalTime(), fromDisk.getArrivalTime());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 9a5f7f20c6..0e2b17914c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -20,6 +20,7 @@ package org.apache.qpid.server.queue;
*
*/
+import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
@@ -29,6 +30,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -58,7 +60,7 @@ public class SimpleAMQQueueTest extends TestCase
protected FieldTable _arguments = null;
MessagePublishInfo info = new MessagePublishInfoImpl();
- private static final long MESSAGE_SIZE = 100;
+ private static long MESSAGE_SIZE = 100;
@Override
protected void setUp() throws Exception
@@ -68,7 +70,7 @@ public class SimpleAMQQueueTest extends TestCase
ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _transactionLog);
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -362,56 +364,69 @@ public class SimpleAMQQueueTest extends TestCase
// Create IncomingMessage and nondurable queue
NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ MESSAGE_SIZE = 1;
+ long MEMORY_MAX = 500;
+ int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
//Set the Memory Usage to be very low
- _queue.setMemoryUsageMaximum(10);
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
- for (int msgCount = 0; msgCount < 10; msgCount++)
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
{
sendMessage(txnContext);
}
//Check that we can hold 10 messages without flowing
- assertEquals(10, _queue.getMessageCount());
- assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
assertTrue("Queue is flowed.", !_queue.isFlowed());
// Send anothe and ensure we are flowed
sendMessage(txnContext);
- assertEquals(11, _queue.getMessageCount());
- assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
assertTrue("Queue is not flowed.", _queue.isFlowed());
- //send another 9 so there are 20msgs in total on the queue
- for (int msgCount = 0; msgCount < 9; msgCount++)
+ //send another 99 so there are 200msgs in total on the queue
+ for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
{
sendMessage(txnContext);
+
+ long usage = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + usage,
+ usage <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + usage,usage > 0);
+
}
- assertEquals(20, _queue.getMessageCount());
- assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
assertTrue("Queue is not flowed.", _queue.isFlowed());
_queue.registerSubscription(_subscription, false);
- Thread.sleep(200);
+ int slept = 0;
+ while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+ {
+ Thread.sleep(500);
+ slept++;
+ }
//Ensure the messages are retreived
- assertEquals("Not all messages were received.", 20, _subscription.getMessages().size());
+ assertEquals("Not all messages were received, slept:"+slept/2+"s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
- //Ensure we got the content
- for (int index = 0; index < 10; index++)
- {
- QueueEntry entry = _subscription.getMessages().get(index);
- assertNotNull("Message:" + index + " was null.", entry.getMessage());
- assertTrue(!entry.isFlowed());
- }
+ //Check the queue is still within it's limits.
+ assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
+ _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
- //ensure we were received 10 flowed messages
- for (int index = 10; index < 20; index++)
+ assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() > 0);
+
+ for (int index = 0; index < MESSAGE_COUNT; index++)
{
- QueueEntry entry = _subscription.getMessages().get(index);
- assertNull("Message:" + index + " was not null.", entry.getMessage());
- assertTrue(entry.isFlowed());
+ // Ensure that we have received the messages and it wasn't flushed to disk before we received it.
+ AMQMessage message = _subscription.getMessages().get(index);
+ assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
}
+
}
private void sendMessage(TransactionalContext txnContext) throws AMQException
@@ -419,7 +434,8 @@ public class SimpleAMQQueueTest extends TestCase
IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.bodySize = 1;
+ contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+ contentHeaderBody.bodySize = MESSAGE_SIZE;
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index 832df80004..0c33b406e6 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -46,7 +46,10 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase
assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
- assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount());
+ //This is +2 because:
+ // 1 - asyncDelivery Thread
+ // 2 - queueInhalerThread
+ assertEquals("References not increased", initialCount + 2, ReferenceCountingExecutorService.getInstance().getReferenceCount());
queue.stop();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 33fd669d5c..ab0870144b 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -30,10 +30,13 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
+import org.apache.log4j.Logger;
public class MockSubscription implements Subscription
{
+ private static final Logger _logger = Logger.getLogger(MockSubscription.class);
private boolean _closed = false;
private AMQShortString tag = new AMQShortString("mocktag");
@@ -41,8 +44,12 @@ public class MockSubscription implements Subscription
private StateListener _listener = null;
private QueueEntry lastSeen = null;
private State _state = State.ACTIVE;
- private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
+ private ArrayList<QueueEntry> _queueEntries = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private ArrayList<AMQMessage> _messages = new ArrayList<AMQMessage>();
+
+
+
public void close()
{
@@ -136,10 +143,14 @@ public class MockSubscription implements Subscription
{
}
- public void send(QueueEntry msg) throws AMQException
+ public void send(QueueEntry entry) throws AMQException
{
- lastSeen = msg;
- messages.add(msg);
+ _logger.info("Sending Message(" + entry.debugIdentity() + ") to subscription:" + this);
+
+ lastSeen = entry;
+ _queueEntries.add(entry);
+ _messages.add(entry.getMessage());
+ entry.setDeliveredToSubscription();
}
public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
@@ -173,8 +184,14 @@ public class MockSubscription implements Subscription
return false;
}
- public ArrayList<QueueEntry> getMessages()
+ public ArrayList<QueueEntry> getQueueEntries()
{
- return messages;
+ return _queueEntries;
}
+
+ public ArrayList<AMQMessage> getMessages()
+ {
+ return _messages;
+ }
+
}