diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-02-13 11:24:44 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-02-13 11:24:44 +0000 |
| commit | fc9058fc3df68f6c8c0fae455f34f751b584698e (patch) | |
| tree | 949703225e5c97f96f6220376c8817aeed74ac0c /java/broker/src/test | |
| parent | 9b19b4a3d0b15ee73d7186302443c2fc4d8fab75 (diff) | |
| download | qpid-python-fc9058fc3df68f6c8c0fae455f34f751b584698e.tar.gz | |
QPID-1629 : Convered AMQMessage to Interface and created concrete Transient/PersistentAMQMessage implementations
Removed the use of WeakReferences from PersistentAMQMessage and therefore the need to have a StoreContext on get requests.
NOTE: this checking will break persistent recovery.
Coverted all uses of *MessageHandle to AMQMessage. A number of tests (SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message) still use a custom constructor on Transient/PersistentAMQMessage. This is because they have their own Message implemntations that are used for testing. However, I'm sure they could be modified to override the required functionality rather than attempt to use the existing Factory and Wrap the resulting Message. A new JIRA to address this QPID-1659.
QPID-1628 : The update to MessageFactory removes the commented out code
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@744079 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/test')
19 files changed, 666 insertions, 463 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java index 5fbf9484f7..2a97db6066 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java @@ -22,14 +22,13 @@ package org.apache.qpid.server; import junit.framework.TestCase; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; -import org.apache.qpid.server.queue.MockQueueEntry; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.SimpleQueueEntryList; import org.apache.qpid.server.queue.MockAMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; -import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.QueueEntryIterator; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.MockSubscription; @@ -38,7 +37,6 @@ import org.apache.qpid.AMQException; import java.util.Map; import java.util.LinkedHashMap; import java.util.LinkedList; -import java.util.Iterator; /** * QPID-1385 : Race condition between added to unacked map and resending due to a rollback. @@ -62,7 +60,7 @@ public class ExtractResendAndRequeueTest extends TestCase UnacknowledgedMessageMapImpl _unacknowledgedMessageMap; private static final int INITIAL_MSG_COUNT = 10; - private AMQQueue _queue = new MockAMQQueue(); + private AMQQueue _queue = new MockAMQQueue("ExtractResendAndRequeueTest"); private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>(); @Override diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index a705c8bbb4..228c99dcbd 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -28,11 +28,11 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.AMQMessageHandle; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.TransientAMQMessage; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; @@ -113,6 +113,8 @@ public class TxAckTest extends TestCase private StoreContext _storeContext = new StoreContext(); private AMQQueue _queue; + private static final int MESSAGE_SIZE=100; + Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception { TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(), @@ -128,7 +130,12 @@ public class TxAckTest extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); - TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); + AMQMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); + + ContentHeaderBody header = new ContentHeaderBody(); + header.bodySize = MESSAGE_SIZE; + message.setPublishAndContentHeaderBody(_storeContext, info, header); + _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message)); } _acked = acked; @@ -190,16 +197,15 @@ public class TxAckTest extends TestCase } } - private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) + private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody) { - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - null, - false); + final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, + null, + false); try { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), - publishBody, - new ContentHeaderBody() + // Safe to use null here as we just created a TransientMessage above + amqMessage.setPublishAndContentHeaderBody(null, publishBody, new ContentHeaderBody() { public int getSize() { @@ -213,11 +219,11 @@ public class TxAckTest extends TestCase } - return amqMessageHandle; + return amqMessage; } - private class TestMessage extends AMQMessage + private class TestMessage extends TransientAMQMessage { private final long _tag; private int _count; @@ -225,7 +231,7 @@ public class TxAckTest extends TestCase TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) throws AMQException { - super(createMessageHandle(messageId, publishBody), storeContext, publishBody); + super(createMessage(messageId, publishBody)); _tag = tag; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 883a712bef..e0a4357990 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; @@ -54,7 +55,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase private StoreContext _storeContext = new StoreContext(); - private MessageHandleFactory _handleFactory = new MessageHandleFactory(); + private MessageFactory _handleFactory = new MessageFactory(); private int count; @@ -370,7 +371,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase /** * Just add some extra utility methods to AMQMessage to aid testing. */ - static class Message extends AMQMessage + static class Message extends PersistentAMQMessage { private class TestIncomingMessage extends IncomingMessage { @@ -392,14 +393,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase public ContentHeaderBody getContentHeaderBody() { - try - { - return Message.this.getContentHeaderBody(); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } + return Message.this.getContentHeaderBody(); } } @@ -407,10 +401,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase private static MessageStore _messageStore = new SkeletonMessageStore(); - private static StoreContext _storeContext = new StoreContext(); - - - private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, + private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, new LinkedList<RequiredDeliveryException>() ); @@ -422,7 +413,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase Message(String id, FieldTable headers) throws AMQException { - this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null); + this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers)); } public IncomingMessage getIncomingMessage() @@ -432,42 +423,35 @@ public class AbstractHeadersExchangeTestBase extends TestCase private Message(long messageId, MessagePublishInfo publish, - ContentHeaderBody header, - List<ContentBody> bodies) throws AMQException - { - super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish); - - - - _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore)); - _incoming.setContentHeaderBody(header); - - - } - - private static AMQMessageHandle createMessageHandle(final long messageId, - final MessagePublishInfo publish, - final ContentHeaderBody header) + ContentHeaderBody header) throws AMQException { - - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - _messageStore, - true); + super(messageId, _messageStore); try { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header); + setPublishAndContentHeaderBody(_txnContext.getStoreContext(), publish,header); } catch (AMQException e) { - + } - return amqMessageHandle; + + _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore)); + _incoming.setContentHeaderBody(header); } private Message(AMQMessage msg) throws AMQException { - super(msg); + super(msg.getMessageId(), _messageStore); + + this.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), msg.getMessagePublishInfo(), msg.getContentHeaderBody()); + + Iterator<ContentChunk> iterator = msg.getContentBodyIterator(); + + while(iterator.hasNext()) + { + this.addContentBodyFrame(_txnContext.getStoreContext(), iterator.next(),iterator.hasNext()); + } } @@ -500,15 +484,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase private Object getKey() { - try - { - return getMessagePublishInfo().getRoutingKey(); - } - catch (AMQException e) - { - _log.error("Error getting routing key: " + e, e); - return null; - } + return getMessagePublishInfo().getRoutingKey(); } } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java index d1a69c9d3c..ddf177690c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -497,7 +497,7 @@ public class DestWildExchangeTest extends TestCase throws AMQException { _exchange.route(message); - message.routingComplete(_store, new MessageHandleFactory()); + message.routingComplete(_store, new MessageFactory()); message.deliverToQueues(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index aff7af6952..ffe858f517 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -25,10 +25,12 @@ import java.util.ArrayList; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.ContentHeaderBody; import junit.framework.AssertionFailedError; public class AMQPriorityQueueTest extends SimpleAMQQueueTest { + private static final long MESSAGE_SIZE = 100L; @Override protected void setUp() throws Exception @@ -92,11 +94,18 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest protected AMQMessage createMessage(Long id, byte i) throws AMQException { - AMQMessage msg = super.createMessage(id); + AMQMessage message = super.createMessage(id); + + ContentHeaderBody header = new ContentHeaderBody(); + header.bodySize = MESSAGE_SIZE; + + //The createMessage above is for a Transient Message so it is safe to have no context. + message.setPublishAndContentHeaderBody(null, info, header); + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); props.setPriority(i); - msg.getContentHeaderBody().properties = props; - return msg; + message.getContentHeaderBody().properties = props; + return message; } protected AMQMessage createMessage(Long id) throws AMQException diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index fba30528ea..b159e2cda5 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -136,6 +136,7 @@ public class AMQQueueAlertTest extends TestCase while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH) { sendMessages(1, MAX_MESSAGE_SIZE); + System.err.println(_queue.getQueueDepth() + ":" + MAX_QUEUE_DEPTH); } Notification lastNotification = _queueMBean.getLastNotification(); @@ -307,7 +308,7 @@ public class AMQQueueAlertTest extends TestCase ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); messages[i].enqueue(qs); - messages[i].routingComplete(_messageStore, new MessageHandleFactory()); + messages[i].routingComplete(_messageStore, new MessageFactory()); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 38f030f670..a5e2da7b36 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -221,7 +221,7 @@ public class AMQQueueMBeanTest extends TestCase ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_messageStore, new MessageHandleFactory()); + msg.routingComplete(_messageStore, new MessageFactory()); msg.addContentBodyFrame(new ContentChunk() { @@ -305,7 +305,7 @@ public class AMQQueueMBeanTest extends TestCase currentMessage.enqueue(qs); // route header - currentMessage.routingComplete(_messageStore, new MessageHandleFactory()); + currentMessage.routingComplete(_messageStore, new MessageFactory()); // Add the body so we have somthing to test later currentMessage.addContentBodyFrame( diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 01674c5b3d..cd1ee65c0c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -98,7 +98,7 @@ public class AckTest extends TestCase new LinkedList<RequiredDeliveryException>() ); _queue.registerSubscription(_subscription,false); - MessageHandleFactory factory = new MessageHandleFactory(); + MessageFactory factory = new MessageFactory(); for (int i = 1; i <= count; i++) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java deleted file mode 100644 index cac84c01b4..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java +++ /dev/null @@ -1,311 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.TestCase; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentHeaderProperties; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; - -public class InMemoryMessageHandleTest extends TestCase -{ - AMQMessageHandle _handle; - - protected AMQMessageHandle newHandle(Long id) - { - return new InMemoryMessageHandle(id); - } - - public void testMessageID() - { - Long id = 1L; - _handle = newHandle(id); - - assertEquals("Message not set value", id, _handle.getMessageId()); - } - - public void testInvalidContentChunk() - { - _handle = newHandle(1L); - - try - { - _handle.getContentChunk(null, 0); - fail("getContentChunk should not succeed"); - } - catch (RuntimeException e) - { - assertTrue(e.getMessage().equals("No ContentBody has been set")); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - ContentChunk cc = new MockContentChunk(null, 100); - - try - { - _handle.addContentBodyFrame(null, cc, false); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - _handle.getContentChunk(null, -1); - fail("getContentChunk should not succeed"); - } - catch (IllegalArgumentException e) - { - assertTrue(e.getMessage().contains("out of valid range")); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - _handle.getContentChunk(null, 1); - fail("getContentChunk should not succeed"); - } - catch (IllegalArgumentException e) - { - assertTrue(e.getMessage().contains("out of valid range")); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - } - - public void testAddSingleContentChunk() - { - - _handle = newHandle(1L); - - ContentChunk cc = new MockContentChunk(null, 100); - - try - { - _handle.addContentBodyFrame(null, cc, true); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - assertEquals("Incorrect body count", 1, _handle.getBodyCount(null)); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - assertEquals("Incorrect ContentChunk returned.", cc, _handle.getContentChunk(null, 0)); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - cc = new MockContentChunk(null, 100); - - try - { - _handle.addContentBodyFrame(null, cc, true); - fail("Exception should prevent adding two final chunks"); - } - catch (UnsupportedOperationException e) - { - //normal path - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - } - - public void testAddMultipleContentChunk() - { - - _handle = newHandle(1L); - - ContentChunk cc = new MockContentChunk(null, 100); - - try - { - _handle.addContentBodyFrame(null, cc, false); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - assertEquals("Incorrect body count", 1, _handle.getBodyCount(null)); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - assertEquals("Incorrect ContentChunk returned.", cc, _handle.getContentChunk(null, 0)); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - cc = new MockContentChunk(null, 100); - - try - { - _handle.addContentBodyFrame(null, cc, true); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - assertEquals("Incorrect body count", 2, _handle.getBodyCount(null)); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - try - { - assertEquals("Incorrect ContentChunk returned.", cc, _handle.getContentChunk(null, 1)); - } - catch (AMQException e) - { - fail("AMQException thrown:" + e.getMessage()); - } - - } - - // todo Move test to QueueEntry -// public void testRedelivered() -// { -// _handle = newHandle(1L); -// -// assertFalse("New message should not be redelivered", _handle.isRedelivered()); -// -// _handle.setRedelivered(true); -// -// assertTrue("New message should not be redelivered", _handle.isRedelivered()); -// } - - public void testInitialArrivalTime() - { - _handle = newHandle(1L); - - assertEquals("Initial Arrival time should be 0L", 0L, _handle.getArrivalTime()); - } - - public void testSetPublishAndContentHeaderBody_WithBody() - { - _handle = newHandle(1L); - - MessagePublishInfo mpi = new MessagePublishInfoImpl(); - int bodySize = 100; - - ContentHeaderBody chb = new ContentHeaderBody(0, 0, new BasicContentHeaderProperties(), bodySize); - - try - { - _handle.setPublishAndContentHeaderBody(null, mpi, chb); - - assertEquals("BodySize not returned correctly. ", bodySize, _handle.getBodySize(null)); - } - catch (AMQException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - - public void testSetPublishAndContentHeaderBody_Empty() - { - _handle = newHandle(1L); - - MessagePublishInfo mpi = new MessagePublishInfoImpl(); - int bodySize = 0; - - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - - props.setAppId("HandleTest"); - - ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); - - try - { - _handle.setPublishAndContentHeaderBody(null, mpi, chb); - - assertEquals("BodySize not returned correctly. ", bodySize, _handle.getBodySize(null)); - - ContentHeaderBody retreived_chb = _handle.getContentHeaderBody(null); - - ContentHeaderProperties chp = retreived_chb.properties; - - assertEquals("ContentHeaderBody not correct", chb, retreived_chb); - - assertEquals("AppID not correctly retreived", "HandleTest", - ((BasicContentHeaderProperties) chp).getAppIdAsString()); - - MessagePublishInfo retreived_mpi = _handle.getMessagePublishInfo(null); - - assertEquals("MessagePublishInfo not correct", mpi, retreived_mpi); - - - } - catch (AMQException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - - public void testIsPersistent() - { - _handle = newHandle(1L); - - assertFalse(_handle.isPersistent()); - } - -} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java new file mode 100644 index 0000000000..582e2bfb00 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; + +public class MessageFactoryTest extends TestCase +{ + private MessageFactory _factory; + + public void setUp() + { + _factory = new MessageFactory(); + } + + public void testTransientMessageCreation() + { + AMQMessage message = _factory.createMessage(0L, null, false); + + assertEquals("Transient Message creation does not return correct class.", TransientAMQMessage.class, message.getClass()); + } + + public void testPersistentMessageCreation() + { + AMQMessage message = _factory.createMessage(0L, null, true); + + assertEquals("Transient Message creation does not return correct class.", PersistentAMQMessage.class, message.getClass()); + } + +}
\ No newline at end of file diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java index a05eb0892b..cc6c486e11 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java @@ -22,23 +22,13 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -public class MockAMQMessage extends AMQMessage +public class MockAMQMessage extends TransientAMQMessage { public MockAMQMessage(long messageId) throws AMQException { - super(new MockAMQMessageHandle(messageId) , - (StoreContext)null, - (MessagePublishInfo)new MessagePublishInfoImpl()); - } - - protected MockAMQMessage(AMQMessage msg) - throws AMQException - { - super(msg); + super(messageId); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 758c8ddb2e..5f1cc81772 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -27,19 +27,16 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.AMQException; import org.apache.commons.configuration.Configuration; import java.util.List; import java.util.Set; -import java.util.Map; -import java.util.HashMap; -import java.util.LinkedList; public class MockAMQQueue implements AMQQueue { private boolean _deleted = false; + private int _queueCount; private AMQShortString _name; public MockAMQQueue(String name) @@ -47,11 +44,6 @@ public class MockAMQQueue implements AMQQueue _name = new AMQShortString(name); } - public MockAMQQueue() - { - - } - public AMQShortString getName() { return _name; @@ -134,7 +126,7 @@ public class MockAMQQueue implements AMQQueue public long getQueueDepth() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return _queueCount; } public long getReceivedMessageCount() @@ -159,6 +151,7 @@ public class MockAMQQueue implements AMQQueue public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException { + _queueCount++; return null; //To change body of implemented methods use File | Settings | File Templates. } @@ -169,7 +162,7 @@ public class MockAMQQueue implements AMQQueue public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException { - //To change body of implemented methods use File | Settings | File Templates. + _queueCount--; } public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java index ee85fecfa3..8a9d1ae771 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java @@ -20,14 +20,32 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.FixedSizeByteBufferAllocator; +import org.apache.qpid.framing.abstraction.ContentChunk; public class MockContentChunk implements ContentChunk { + public static final int DEFAULT_SIZE=0; + private ByteBuffer _bytebuffer; private int _size; + + + public MockContentChunk() + { + this(0); + } + + public MockContentChunk(int size) + { + FixedSizeByteBufferAllocator allocator = new FixedSizeByteBufferAllocator(); + _bytebuffer = allocator.allocate(size, false); + + _size = size; + } + public MockContentChunk(ByteBuffer bytebuffer, int size) { _bytebuffer = bytebuffer; diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java index c6e7e2ebe2..e213be7560 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java @@ -21,8 +21,9 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; -public class WeakMessageHandleTest extends InMemoryMessageHandleTest +public class PersistentMessageTest extends TransientMessageTest { private MemoryMessageStore _messageStore; @@ -30,19 +31,20 @@ public class WeakMessageHandleTest extends InMemoryMessageHandleTest { _messageStore = new MemoryMessageStore(); _messageStore.configure(); + _storeContext = new StoreContext(); } - protected AMQMessageHandle newHandle(Long id) + @Override + protected AMQMessage newMessage(Long id) { - return new WeakReferenceMessageHandle(id, _messageStore); + return new MessageFactory().createMessage(id, _messageStore, true); } @Override public void testIsPersistent() { - _handle = newHandle(1L); - assertTrue(_handle.isPersistent()); + _message = newMessage(1L); + assertTrue(_message.isPersistent()); } - } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java index bdb0707c27..f7cd860c22 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java @@ -20,18 +20,30 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.store.StoreContext; +import junit.framework.TestCase; -public class MockAMQMessageHandle extends InMemoryMessageHandle +public class QueueEntryImplTest extends TestCase { - public MockAMQMessageHandle(final Long messageId) - { - super(messageId); - } - @Override - public long getBodySize(StoreContext store) + /** + * Test the Redelivered state of a QueueEntryImpl + */ + public void testRedelivered() { - return 0l; + QueueEntry entry = new QueueEntryImpl(null, null); + + assertFalse("New message should not be redelivered", entry.isRedelivered()); + + entry.setRedelivered(true); + + assertTrue("New message should not be redelivered", entry.isRedelivered()); + + //Check we can revert it.. not that we ever should. + entry.setRedelivered(false); + + assertFalse("New message should not be redelivered", entry.isRedelivered()); + } + + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 500655c07c..2dcb081739 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -56,7 +56,8 @@ public class SimpleAMQQueueTest extends TestCase protected FieldTable _arguments = null; MessagePublishInfo info = new MessagePublishInfoImpl(); - + private static final long MESSAGE_SIZE = 100; + @Override protected void setUp() throws Exception { @@ -317,7 +318,7 @@ public class SimpleAMQQueueTest extends TestCase // Send persistent message qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_store, new MessageHandleFactory()); + msg.routingComplete(_store, new MessageFactory()); _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1)); // Check that it is enqueued @@ -326,9 +327,14 @@ public class SimpleAMQQueueTest extends TestCase // Dequeue message MockQueueEntry entry = new MockQueueEntry(); - AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext); + AMQMessage message = new MessageFactory().createMessage(1L, _store, true); - entry.setMessage(amqmsg); + ContentHeaderBody header = new ContentHeaderBody(); + header.bodySize = MESSAGE_SIZE; + // This is a persist message but we are not in a transaction so create a new context for the message + message.setPublishAndContentHeaderBody(new StoreContext(), info, header); + + entry.setMessage(message); _queue.dequeue(null, entry); // Check that it is dequeued @@ -338,22 +344,19 @@ public class SimpleAMQQueueTest extends TestCase // FIXME: move this to somewhere useful - private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) + private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody) { - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - null, - false); + final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, null, false); try { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), - publishBody, - new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); + //Safe to use a null StoreContext as we have created a TransientMessage (see false param above) + amqMessage.setPublishAndContentHeaderBody( null, publishBody, new ContentHeaderBody() + { + public int getSize() + { + return 1; + } + }); } catch (AMQException e) { @@ -361,18 +364,18 @@ public class SimpleAMQQueueTest extends TestCase } - return amqMessageHandle; + return amqMessage; } - public class TestMessage extends AMQMessage + public class TestMessage extends TransientAMQMessage { private final long _tag; private int _count; - TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) + TestMessage(long tag, long messageId, MessagePublishInfo publishBody) throws AMQException { - super(createMessageHandle(messageId, publishBody), storeContext, publishBody); + super(createMessage(messageId, publishBody)); _tag = tag; } @@ -396,7 +399,8 @@ public class SimpleAMQQueueTest extends TestCase protected AMQMessage createMessage(Long id) throws AMQException { - AMQMessage messageA = new TestMessage(id, id, info, new StoreContext()); + + AMQMessage messageA = new TestMessage(id, id, info); return messageA; } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java new file mode 100644 index 0000000000..e37269526c --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java @@ -0,0 +1,467 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ContentHeaderProperties; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.server.store.StoreContext; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public class TransientMessageTest extends TestCase +{ + AMQMessage _message; + StoreContext _storeContext = null; + + protected AMQMessage newMessage(Long id) + { + return new MessageFactory().createMessage(id, null, false); + } + + public void testMessageID() + { + Long id = 1L; + _message = newMessage(id); + + assertEquals("Message not set value", id, _message.getMessageId()); + } + + public void testInvalidContentChunk() + { + _message = newMessage(1L); + + try + { + _message.getContentChunk(0); + fail("getContentChunk should not succeed"); + } + catch (RuntimeException e) + { + assertTrue(e.getMessage().equals("No ContentBody has been set")); + } + + ContentChunk cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, false); + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + try + { + _message.getContentChunk(-1); + fail("getContentChunk should not succeed"); + } + catch (IllegalArgumentException e) + { + assertTrue(e.getMessage().contains("out of valid range")); + } + + try + { + _message.getContentChunk(1); + fail("getContentChunk should not succeed"); + } + catch (IllegalArgumentException e) + { + assertTrue(e.getMessage().contains("out of valid range")); + } + } + + public void testAddSingleContentChunk() + { + + _message = newMessage(1L); + + ContentChunk cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, true); + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + assertEquals("Incorrect body count", 1, _message.getBodyCount()); + + assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(0)); + + cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, true); + fail("Exception should prevent adding two final chunks"); + } + catch (UnsupportedOperationException e) + { + //normal path + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + } + + public void testAddMultipleContentChunk() + { + + _message = newMessage(1L); + + ContentChunk cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, false); + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + assertEquals("Incorrect body count", 1, _message.getBodyCount()); + + assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(0)); + + cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, true); + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + assertEquals("Incorrect body count", 2, _message.getBodyCount()); + + assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(1)); + + } + + public void testInitialArrivalTime() + { + _message = newMessage(1L); + + assertEquals("Initial Arrival time should be 0L", 0L, _message.getArrivalTime()); + } + + public void testSetPublishAndContentHeaderBody_WithBody() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + int bodySize = 100; + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, new BasicContentHeaderProperties(), bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertEquals("BodySize not returned correctly. ", bodySize, _message.getSize()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void testSetPublishAndContentHeaderBody_Null() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, null); + fail("setPublishAndContentHeaderBody with null ContentHeaederBody did not throw NPE."); + } + catch (NullPointerException npe) + { + assertEquals("HeaderBody cannot be null", npe.getMessage()); + } + catch (AMQException e) + { + fail("setPublishAndContentHeaderBody should not throw AMQException here:" + e.getMessage()); + } + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, null, chb); + fail("setPublishAndContentHeaderBody with null MessagePublishInfo did not throw NPE."); + } + catch (NullPointerException npe) + { + assertEquals("PublishInfo cannot be null", npe.getMessage()); + } + catch (AMQException e) + { + fail("setPublishAndContentHeaderBody should not throw AMQException here:" + e.getMessage()); + } + } + + public void testSetPublishAndContentHeaderBody_Empty() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertEquals("BodySize not returned correctly. ", bodySize, _message.getSize()); + + ContentHeaderBody retreived_chb = _message.getContentHeaderBody(); + + ContentHeaderProperties chp = retreived_chb.properties; + + assertEquals("ContentHeaderBody not correct", chb, retreived_chb); + + assertEquals("AppID not correctly retreived", "HandleTest", + ((BasicContentHeaderProperties) chp).getAppIdAsString()); + + MessagePublishInfo retreived_mpi = _message.getMessagePublishInfo(); + + assertEquals("MessagePublishInfo not correct", mpi, retreived_mpi); + + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void testIsPersistent() + { + _message = newMessage(1L); + + assertFalse(_message.isPersistent()); + } + + public void testImmediateAndNotDelivered() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertTrue("Undelivered Immediate message should still be marked as so", _message.immediateAndNotDelivered()); + + assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer()); + + _message.setDeliveredToConsumer(); + + assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer()); + + assertFalse("Delivered Immediate message now be marked as so", _message.immediateAndNotDelivered()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void testNotImmediateAndNotDelivered() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertFalse("Undelivered Non-Immediate message should not result in true.", _message.immediateAndNotDelivered()); + + assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer()); + + _message.setDeliveredToConsumer(); + + assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer()); + + assertFalse("Delivered Non-Immediate message not change this return", _message.immediateAndNotDelivered()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void testExpiry() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + ReentrantLock waitLock = new ReentrantLock(); + Condition wait = waitLock.newCondition(); + try + { + _message.setExpiration(System.currentTimeMillis() + 10L); + + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertFalse("New messages should not be expired.", _message.expired()); + + final long MILLIS =1000000L; + long waitTime = 20 * MILLIS; + + while (waitTime > 0) + { + try + { + waitLock.lock(); + + waitTime = wait.awaitNanos(waitTime); + } + catch (InterruptedException e) + { + //Stop if we are interrupted + fail(e.getMessage()); + } + finally + { + waitLock.unlock(); + } + + } + + assertTrue("After a sleep messages should now be expired.", _message.expired()); + + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + + public void testNoExpiry() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + ReentrantLock waitLock = new ReentrantLock(); + Condition wait = waitLock.newCondition(); + try + { + + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertFalse("New messages should not be expired.", _message.expired()); + + final long MILLIS =1000000L; + long waitTime = 10 * MILLIS; + + while (waitTime > 0) + { + try + { + waitLock.lock(); + + waitTime = wait.awaitNanos(waitTime); + } + catch (InterruptedException e) + { + //Stop if we are interrupted + fail(e.getMessage()); + } + finally + { + waitLock.unlock(); + } + + } + + assertFalse("After a sleep messages without an expiry should not expire.", _message.expired()); + + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 12ed928e7f..b4ed1f8709 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -30,7 +30,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.SimpleAMQQueue; @@ -389,7 +389,7 @@ public class MessageStoreTest extends TestCase try { - currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory()); + currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageFactory()); } catch (AMQException e) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index 51820f72dd..9a9fe3644c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -26,9 +26,8 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.AMQMessageHandle; /** * Tests that reference counting works correctly with AMQMessage and the message store @@ -56,10 +55,9 @@ public class TestReferenceCounting extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); final long messageId = _store.getNewMessageId(); - AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); - messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb); - AMQMessage message = new AMQMessage(messageHandle, - _storeContext,info); + + AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true); + message.setPublishAndContentHeaderBody(_storeContext, info, chb); message = message.takeReference(); @@ -88,18 +86,10 @@ public class TestReferenceCounting extends TestCase final Long messageId = _store.getNewMessageId(); final ContentHeaderBody chb = createPersistentContentHeader(); - AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); - messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb); - AMQMessage message = new AMQMessage(messageHandle, - _storeContext, - info); - + AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true); + message.setPublishAndContentHeaderBody(_storeContext, info, chb); message = message.takeReference(); - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - - assertEquals(1, _store.getMessageMetaDataMap().size()); message = message.takeReference(); |
