diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2007-02-20 16:20:41 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2007-02-20 16:20:41 +0000 |
| commit | c46d62e6834205408502d89d99f73e47f3ca2eb8 (patch) | |
| tree | a0cbe2baaaae2d6eab79e0a4491c7299a9bfedc8 /java/systests/src | |
| parent | bc6a142a055071e5b7025cd1022485f26a0011f2 (diff) | |
| download | qpid-python-c46d62e6834205408502d89d99f73e47f3ca2eb8.tar.gz | |
QPID-325 : Persist durable exchange information in the store
QPID-318 : Remove hardcoding of version numbers (as applies to store)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@509628 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
9 files changed, 290 insertions, 89 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 10f5cd5667..9fcd88b1a8 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.ack; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; @@ -103,16 +105,32 @@ public class TxAckTest extends TestCase for(int i = 0; i < messageCount; i++) { long deliveryTag = i + 1; - // TODO: fix hardcoded protocol version data - TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - false, - false, - null, - 0), txnContext); + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + TestMessage message = new TestMessage(deliveryTag, i, info, txnContext); _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag)); } _acked = acked; @@ -174,7 +192,7 @@ public class TxAckTest extends TestCase private final long _tag; private int _count; - TestMessage(long tag, long messageId, BasicPublishBody publishBody, TransactionalContext txnContext) + TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext) { super(messageId, publishBody, txnContext); _tag = tag; diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index a9d7299bec..6beeb92053 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageHandleFactory; @@ -149,15 +150,97 @@ public class AbstractHeadersExchangeTestBase extends TestCase return headers; } - static BasicPublishBody getPublishRequest(String id) + + static final class MessagePublishInfoImpl implements MessagePublishInfo { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. - BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null,false,false,new AMQShortString(id),0); - + private AMQShortString _exchange; + private boolean _immediate; + private boolean _mandatory; + private AMQShortString _routingKey; + + + public MessagePublishInfoImpl(AMQShortString routingKey) + { + _routingKey = routingKey; + } + + public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey) + { + _exchange = exchange; + _immediate = immediate; + _mandatory = mandatory; + _routingKey = routingKey; + } + + public AMQShortString getExchange() + { + return _exchange; + } + + public boolean isImmediate() + { + return _immediate; + + } + + public boolean isMandatory() + { + return _mandatory; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + + + public void setExchange(AMQShortString exchange) + { + _exchange = exchange; + } + + public void setImmediate(boolean immediate) + { + _immediate = immediate; + } + + public void setMandatory(boolean mandatory) + { + _mandatory = mandatory; + } + + public void setRoutingKey(AMQShortString routingKey) + { + _routingKey = routingKey; + } + } + + static MessagePublishInfo getPublishRequest(final String id) + { + MessagePublishInfo request = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return new AMQShortString(id); + } + }; + return request; } @@ -221,7 +304,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase this(getPublishRequest(id), getContentHeader(headers), null); } - private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException + private Message(MessagePublishInfo publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException { super(_messageStore.getNewMessageId(), publish, _txnContext, header); } @@ -265,7 +348,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase { try { - return getPublishBody().routingKey; + return getMessagePublishInfo().getRoutingKey(); } catch (AMQException e) { diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 70da7d1692..eca642b556 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.util.NullApplicationRegistry; import org.apache.qpid.framing.BasicPublishBody; @@ -55,13 +54,13 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase Message m7 = new Message("Message7", "XXXXX"); - BasicPublishBody pb7 = m7.getPublishBody(); - pb7.mandatory = true; + MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo()); + pb7.setMandatory(true); routeAndTest(m7,true); Message m8 = new Message("Message8", "F0000"); - BasicPublishBody pb8 = m8.getPublishBody(); - pb8.mandatory = true; + MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo()); + pb8.setMandatory(true); routeAndTest(m8,false,q1); @@ -88,10 +87,10 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase bindDefault("F0000"); Message m1 = new Message("Message1", "XXXXX"); Message m2 = new Message("Message2", "F0000"); - BasicPublishBody pb1 = m1.getPublishBody(); - pb1.mandatory = true; - BasicPublishBody pb2 = m2.getPublishBody(); - pb2.mandatory = true; + MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo()); + pb1.setMandatory(true); + MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo()); + pb2.setMandatory(true); routeAndTest(m1,true); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index c35d38e4ab..2d0315d7f5 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -22,6 +22,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -164,20 +165,32 @@ public class AMQQueueMBeanTest extends TestCase } } - private AMQMessage message(boolean immediate) throws AMQException + private AMQMessage message(final boolean immediate) throws AMQException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. - BasicPublishBody publish = new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - immediate, - false, - null, - 0); - + MessagePublishInfo publish = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = 1000; // in bytes return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index 93050af2b7..ae2209c629 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.ack.UnacknowledgedMessage; @@ -98,15 +99,29 @@ public class AckTest extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody publishBody = new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - new AMQShortString("someExchange"), - false, - false, - new AMQShortString("rk"), - 0); + MessagePublishInfo publishBody = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return new AMQShortString("someExchange"); + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return new AMQShortString("rk"); + } + }; AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext); if (persistent) { diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java index cf5baa77bd..03a56df487 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.StoreContext; @@ -57,20 +59,32 @@ class MessageTestHelper extends TestCase return message(false); } - AMQMessage message(boolean immediate) throws AMQException + AMQMessage message(final boolean immediate) throws AMQException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. - BasicPublishBody publish = new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - immediate, - false, - null, - 0); - + MessagePublishInfo publish = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, new ContentHeaderBody()); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 89889ca017..6ffa3e0e02 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -24,9 +24,12 @@ import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.Exchange; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -56,6 +59,26 @@ public class SkeletonMessageStore implements MessageStore { } + public void createExchange(Exchange exchange) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void removeExchange(Exchange exchange) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + public void createQueue(AMQQueue queue) throws AMQException { } @@ -87,7 +110,7 @@ public class SkeletonMessageStore implements MessageStore return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException + public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException { } @@ -102,7 +125,7 @@ public class SkeletonMessageStore implements MessageStore return null; } - public ContentBody getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException + public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException { return null; } diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java index 6eacd5168f..2f0eaac29a 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -25,6 +25,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.txn.NonTransactionalContext; @@ -50,16 +52,32 @@ public class TestReferenceCounting extends TestCase public void testMessageGetsRemoved() throws AMQException { createPersistentContentHeader(); - // TODO: fix hardcoded protocol version data - AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - false, - false, - null, - 0), + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + AMQMessage message = new AMQMessage(_store.getNewMessageId(), info, new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); @@ -81,16 +99,33 @@ public class TestReferenceCounting extends TestCase public void testMessageRemains() throws AMQException { - // TODO: fix hardcoded protocol version data - AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - false, - false, - null, - 0), + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + AMQMessage message = new AMQMessage(_store.getNewMessageId(), + info, new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 9a649421dd..79d428fee8 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.abstraction.ContentChunk; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -35,7 +36,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore public TestableMemoryMessageStore() { _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(); + _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); } public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() @@ -43,7 +44,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore return _metaDataMap; } - public ConcurrentMap<Long, List<ContentBody>> getContentBodyMap() + public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() { return _contentBodyMap; } |
