summaryrefslogtreecommitdiff
path: root/java/broker/src/test
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-02-27 13:37:03 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-02-27 13:37:03 +0000
commit2a2f31bf97366c0c67e7fe0d41135bd3e4c7b1ba (patch)
treecde292bc392d07d8fb48b1eee5e370f5dde644ec /java/broker/src/test
parent9f75e5deb5ef0ca80e77a4f11983bb6a44e10e50 (diff)
downloadqpid-python-2a2f31bf97366c0c67e7fe0d41135bd3e4c7b1ba.tar.gz
QPID-1635,QPID-1636,QPID-1638 : Updated QueueEntries to contain additional values from AMQMessage, _flags and expiry this allows the checking of immediate delivery and expiry on unloaded messages.
Updated nomenclature to use load/unload rather than the overloaded flow/recover. Created new FileQueueBackingStoreFactory to ensure that validates and creates initial flowToDiskLocation and creates a new BackingStore. Responsibility for FlowToDisk has been added to the QueueEntryLists. This will allow the easy unloading of the structure in the future. Inorder to do this the size,count and memory count properties had to be moved from the SimpleAMQQueue to the QueueEntryList. An Inhaler thread was created in addition to the synchronous loading of messages. This is initiated as a result of a flowed QEL dropping below the minimumMemory value. A test to ensure that the queue never exceeds its set memory usage and that the count does not go negative has been added to SimpleAMQQueueTest. The SimpleAMQQueue is responsible for deciding when a message can be unloaded after delivery takes place. The QEL cannot decide this as there is no state for a message being marked as sent to a consumer. Only Aquired and Dequeued. The unloaded message is only deleted after the QueueEntry is deleted from the QEL. This negates the need to recreated the data on disk if the message needs to be unloaded again. All files/directories relating to FtD are created as deleteOnExit files so that under clean shutdown the VM will ensure that the files are deleted. On startup the flowToDiskLocation is also purged to ensure a clean starting point. SAMQQueueThreadPoolTest was augmented to take in to account the new inhaler executor reference. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@748519 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/test')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java9
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java30
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java70
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java29
6 files changed, 96 insertions, 50 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 6021f100f5..4716f6691a 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -240,6 +240,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isAvailable()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean acquire()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -346,12 +351,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void flow() throws UnableToFlowMessageException
+ public void unload() throws UnableToFlowMessageException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void recover()
+ public void load()
{
//To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index a11e60d7de..f73bafd3b4 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import java.util.ArrayList;
@@ -64,7 +63,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
- ArrayList<QueueEntry> msgs = _subscription.getMessages();
+ ArrayList<QueueEntry> msgs = _subscription.getQueueEntries();
try
{
assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
index bb2a5f3d3b..d2cbd46e28 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
@@ -21,9 +21,9 @@
package org.apache.qpid.server.queue;
import junit.framework.TestCase;
+import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -42,22 +42,26 @@ import java.io.File;
public class FileQueueBackingStoreTest extends TestCase
{
- FileQueueBackingStore _backing;
+ QueueBackingStore _backing;
private TransactionLog _transactionLog;
VirtualHost _vhost;
- VirtualHostConfiguration _vhostConfig;
+ VirtualHostConfiguration _vhostConfig;
+ FileQueueBackingStoreFactory _factory;
+ AMQQueue _queue;
public void setUp() throws Exception
{
- _backing = new FileQueueBackingStore();
+ _factory = new FileQueueBackingStoreFactory();
PropertiesConfiguration config = new PropertiesConfiguration();
config.addProperty("store.class", MemoryMessageStore.class.getName());
_vhostConfig = new VirtualHostConfiguration(this.getName() + "-Vhost", config);
_vhost = new VirtualHost(_vhostConfig);
_transactionLog = _vhost.getTransactionLog();
- _backing.configure(_vhost, _vhost.getConfiguration());
+ _factory.configure(_vhost, _vhost.getConfiguration());
+ _queue = new SimpleAMQQueue(new AMQShortString(this.getName()), false, null, false, _vhost);
+ _backing = _factory.createBacking(_queue);
}
private void resetBacking(Configuration configuration) throws Exception
@@ -67,9 +71,11 @@ public class FileQueueBackingStoreTest extends TestCase
_vhost = new VirtualHost(_vhostConfig);
_transactionLog = _vhost.getTransactionLog();
- _backing = new FileQueueBackingStore();
+ _factory = new FileQueueBackingStoreFactory();
+
+ _factory.configure(_vhost, _vhost.getConfiguration());
- _backing.configure(_vhost, _vhost.getConfiguration());
+ _backing = _factory.createBacking(_queue);
}
public void testInvalidSetupRootExistsIsFile() throws Exception
@@ -171,18 +177,18 @@ public class FileQueueBackingStoreTest extends TestCase
chb);
if (chb.bodySize > 0)
{
- ContentChunk chunk = new MockContentChunk((int) chb.bodySize/2);
+ ContentChunk chunk = new MockContentChunk((int) chb.bodySize / 2);
original.addContentBodyFrame(null, chunk, false);
- chunk = new MockContentChunk((int) chb.bodySize/2);
+ chunk = new MockContentChunk((int) chb.bodySize / 2);
- original.addContentBodyFrame(null, chunk, true);
+ original.addContentBodyFrame(null, chunk, true);
}
- _backing.flow(original);
+ _backing.unload(original);
- AMQMessage fromDisk = _backing.recover(original.getMessageId());
+ AMQMessage fromDisk = _backing.load(original.getMessageId());
assertEquals("Message IDs do not match", original.getMessageId(), fromDisk.getMessageId());
assertEquals("Message arrival times do not match", original.getArrivalTime(), fromDisk.getArrivalTime());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 9a5f7f20c6..0e2b17914c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -20,6 +20,7 @@ package org.apache.qpid.server.queue;
*
*/
+import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
@@ -29,6 +30,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -58,7 +60,7 @@ public class SimpleAMQQueueTest extends TestCase
protected FieldTable _arguments = null;
MessagePublishInfo info = new MessagePublishInfoImpl();
- private static final long MESSAGE_SIZE = 100;
+ private static long MESSAGE_SIZE = 100;
@Override
protected void setUp() throws Exception
@@ -68,7 +70,7 @@ public class SimpleAMQQueueTest extends TestCase
ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _transactionLog);
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -362,56 +364,69 @@ public class SimpleAMQQueueTest extends TestCase
// Create IncomingMessage and nondurable queue
NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ MESSAGE_SIZE = 1;
+ long MEMORY_MAX = 500;
+ int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
//Set the Memory Usage to be very low
- _queue.setMemoryUsageMaximum(10);
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
- for (int msgCount = 0; msgCount < 10; msgCount++)
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
{
sendMessage(txnContext);
}
//Check that we can hold 10 messages without flowing
- assertEquals(10, _queue.getMessageCount());
- assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
assertTrue("Queue is flowed.", !_queue.isFlowed());
// Send anothe and ensure we are flowed
sendMessage(txnContext);
- assertEquals(11, _queue.getMessageCount());
- assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
assertTrue("Queue is not flowed.", _queue.isFlowed());
- //send another 9 so there are 20msgs in total on the queue
- for (int msgCount = 0; msgCount < 9; msgCount++)
+ //send another 99 so there are 200msgs in total on the queue
+ for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
{
sendMessage(txnContext);
+
+ long usage = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + usage,
+ usage <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + usage,usage > 0);
+
}
- assertEquals(20, _queue.getMessageCount());
- assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
assertTrue("Queue is not flowed.", _queue.isFlowed());
_queue.registerSubscription(_subscription, false);
- Thread.sleep(200);
+ int slept = 0;
+ while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+ {
+ Thread.sleep(500);
+ slept++;
+ }
//Ensure the messages are retreived
- assertEquals("Not all messages were received.", 20, _subscription.getMessages().size());
+ assertEquals("Not all messages were received, slept:"+slept/2+"s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
- //Ensure we got the content
- for (int index = 0; index < 10; index++)
- {
- QueueEntry entry = _subscription.getMessages().get(index);
- assertNotNull("Message:" + index + " was null.", entry.getMessage());
- assertTrue(!entry.isFlowed());
- }
+ //Check the queue is still within it's limits.
+ assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
+ _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
- //ensure we were received 10 flowed messages
- for (int index = 10; index < 20; index++)
+ assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() > 0);
+
+ for (int index = 0; index < MESSAGE_COUNT; index++)
{
- QueueEntry entry = _subscription.getMessages().get(index);
- assertNull("Message:" + index + " was not null.", entry.getMessage());
- assertTrue(entry.isFlowed());
+ // Ensure that we have received the messages and it wasn't flushed to disk before we received it.
+ AMQMessage message = _subscription.getMessages().get(index);
+ assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
}
+
}
private void sendMessage(TransactionalContext txnContext) throws AMQException
@@ -419,7 +434,8 @@ public class SimpleAMQQueueTest extends TestCase
IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.bodySize = 1;
+ contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+ contentHeaderBody.bodySize = MESSAGE_SIZE;
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index 832df80004..0c33b406e6 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -46,7 +46,10 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase
assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
- assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount());
+ //This is +2 because:
+ // 1 - asyncDelivery Thread
+ // 2 - queueInhalerThread
+ assertEquals("References not increased", initialCount + 2, ReferenceCountingExecutorService.getInstance().getReferenceCount());
queue.stop();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 33fd669d5c..ab0870144b 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -30,10 +30,13 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
+import org.apache.log4j.Logger;
public class MockSubscription implements Subscription
{
+ private static final Logger _logger = Logger.getLogger(MockSubscription.class);
private boolean _closed = false;
private AMQShortString tag = new AMQShortString("mocktag");
@@ -41,8 +44,12 @@ public class MockSubscription implements Subscription
private StateListener _listener = null;
private QueueEntry lastSeen = null;
private State _state = State.ACTIVE;
- private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
+ private ArrayList<QueueEntry> _queueEntries = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private ArrayList<AMQMessage> _messages = new ArrayList<AMQMessage>();
+
+
+
public void close()
{
@@ -136,10 +143,14 @@ public class MockSubscription implements Subscription
{
}
- public void send(QueueEntry msg) throws AMQException
+ public void send(QueueEntry entry) throws AMQException
{
- lastSeen = msg;
- messages.add(msg);
+ _logger.info("Sending Message(" + entry.debugIdentity() + ") to subscription:" + this);
+
+ lastSeen = entry;
+ _queueEntries.add(entry);
+ _messages.add(entry.getMessage());
+ entry.setDeliveredToSubscription();
}
public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
@@ -173,8 +184,14 @@ public class MockSubscription implements Subscription
return false;
}
- public ArrayList<QueueEntry> getMessages()
+ public ArrayList<QueueEntry> getQueueEntries()
{
- return messages;
+ return _queueEntries;
}
+
+ public ArrayList<AMQMessage> getMessages()
+ {
+ return _messages;
+ }
+
}