summaryrefslogtreecommitdiff
path: root/java/systests/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-02-20 16:20:41 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-02-20 16:20:41 +0000
commitc46d62e6834205408502d89d99f73e47f3ca2eb8 (patch)
treea0cbe2baaaae2d6eab79e0a4491c7299a9bfedc8 /java/systests/src
parentbc6a142a055071e5b7025cd1022485f26a0011f2 (diff)
downloadqpid-python-c46d62e6834205408502d89d99f73e47f3ca2eb8.tar.gz
QPID-325 : Persist durable exchange information in the store
QPID-318 : Remove hardcoding of version numbers (as applies to store) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@509628 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java40
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java103
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java17
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java39
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java33
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java40
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java27
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java75
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java5
9 files changed, 290 insertions, 89 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index 10f5cd5667..9fcd88b1a8 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.ack;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
@@ -103,16 +105,32 @@ public class TxAckTest extends TestCase
for(int i = 0; i < messageCount; i++)
{
long deliveryTag = i + 1;
- // TODO: fix hardcoded protocol version data
- TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- false,
- false,
- null,
- 0), txnContext);
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
_map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
}
_acked = acked;
@@ -174,7 +192,7 @@ public class TxAckTest extends TestCase
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, BasicPublishBody publishBody, TransactionalContext txnContext)
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext)
{
super(messageId, publishBody, txnContext);
_tag = tag;
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index a9d7299bec..6beeb92053 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
@@ -149,15 +150,97 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return headers;
}
- static BasicPublishBody getPublishRequest(String id)
+
+ static final class MessagePublishInfoImpl implements MessagePublishInfo
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,false,false,new AMQShortString(id),0);
-
+ private AMQShortString _exchange;
+ private boolean _immediate;
+ private boolean _mandatory;
+ private AMQShortString _routingKey;
+
+
+ public MessagePublishInfoImpl(AMQShortString routingKey)
+ {
+ _routingKey = routingKey;
+ }
+
+ public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey)
+ {
+ _exchange = exchange;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _routingKey = routingKey;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public boolean isImmediate()
+ {
+ return _immediate;
+
+ }
+
+ public boolean isMandatory()
+ {
+ return _mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+
+ public void setExchange(AMQShortString exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public void setImmediate(boolean immediate)
+ {
+ _immediate = immediate;
+ }
+
+ public void setMandatory(boolean mandatory)
+ {
+ _mandatory = mandatory;
+ }
+
+ public void setRoutingKey(AMQShortString routingKey)
+ {
+ _routingKey = routingKey;
+ }
+ }
+
+ static MessagePublishInfo getPublishRequest(final String id)
+ {
+ MessagePublishInfo request = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString(id);
+ }
+ };
+
return request;
}
@@ -221,7 +304,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
this(getPublishRequest(id), getContentHeader(headers), null);
}
- private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
+ private Message(MessagePublishInfo publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
{
super(_messageStore.getNewMessageId(), publish, _txnContext, header);
}
@@ -265,7 +348,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
{
try
{
- return getPublishBody().routingKey;
+ return getMessagePublishInfo().getRoutingKey();
}
catch (AMQException e)
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 70da7d1692..eca642b556 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.framing.BasicPublishBody;
@@ -55,13 +54,13 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
Message m7 = new Message("Message7", "XXXXX");
- BasicPublishBody pb7 = m7.getPublishBody();
- pb7.mandatory = true;
+ MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo());
+ pb7.setMandatory(true);
routeAndTest(m7,true);
Message m8 = new Message("Message8", "F0000");
- BasicPublishBody pb8 = m8.getPublishBody();
- pb8.mandatory = true;
+ MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo());
+ pb8.setMandatory(true);
routeAndTest(m8,false,q1);
@@ -88,10 +87,10 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
bindDefault("F0000");
Message m1 = new Message("Message1", "XXXXX");
Message m2 = new Message("Message2", "F0000");
- BasicPublishBody pb1 = m1.getPublishBody();
- pb1.mandatory = true;
- BasicPublishBody pb2 = m2.getPublishBody();
- pb2.mandatory = true;
+ MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo());
+ pb1.setMandatory(true);
+ MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo());
+ pb2.setMandatory(true);
routeAndTest(m1,true);
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index c35d38e4ab..2d0315d7f5 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -22,6 +22,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -164,20 +165,32 @@ public class AMQQueueMBeanTest extends TestCase
}
}
- private AMQMessage message(boolean immediate) throws AMQException
+ private AMQMessage message(final boolean immediate) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody publish = new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- immediate,
- false,
- null,
- 0);
-
+ MessagePublishInfo publish = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = 1000; // in bytes
return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index 93050af2b7..ae2209c629 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
@@ -98,15 +99,29 @@ public class AckTest extends TestCase
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Establish some way to determine the version for the test.
- BasicPublishBody publishBody = new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- new AMQShortString("someExchange"),
- false,
- false,
- new AMQShortString("rk"),
- 0);
+ MessagePublishInfo publishBody = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return new AMQShortString("someExchange");
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("rk");
+ }
+ };
AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext);
if (persistent)
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
index cf5baa77bd..03a56df487 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -57,20 +59,32 @@ class MessageTestHelper extends TestCase
return message(false);
}
- AMQMessage message(boolean immediate) throws AMQException
+ AMQMessage message(final boolean immediate) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody publish = new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- immediate,
- false,
- null,
- 0);
-
+ MessagePublishInfo publish = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
new ContentHeaderBody());
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 89889ca017..6ffa3e0e02 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -24,9 +24,12 @@ import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -56,6 +59,26 @@ public class SkeletonMessageStore implements MessageStore
{
}
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void createQueue(AMQQueue queue) throws AMQException
{
}
@@ -87,7 +110,7 @@ public class SkeletonMessageStore implements MessageStore
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException
+ public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
{
}
@@ -102,7 +125,7 @@ public class SkeletonMessageStore implements MessageStore
return null;
}
- public ContentBody getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
+ public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
{
return null;
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 6eacd5168f..2f0eaac29a 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -25,6 +25,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -50,16 +52,32 @@ public class TestReferenceCounting extends TestCase
public void testMessageGetsRemoved() throws AMQException
{
createPersistentContentHeader();
- // TODO: fix hardcoded protocol version data
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- false,
- false,
- null,
- 0),
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ AMQMessage message = new AMQMessage(_store.getNewMessageId(), info,
new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
message.incrementReference();
@@ -81,16 +99,33 @@ public class TestReferenceCounting extends TestCase
public void testMessageRemains() throws AMQException
{
- // TODO: fix hardcoded protocol version data
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- false,
- false,
- null,
- 0),
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ AMQMessage message = new AMQMessage(_store.getNewMessageId(),
+ info,
new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
message.incrementReference();
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 9a649421dd..79d428fee8 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -35,7 +36,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
public TestableMemoryMessageStore()
{
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>();
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
}
public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
@@ -43,7 +44,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
return _metaDataMap;
}
- public ConcurrentMap<Long, List<ContentBody>> getContentBodyMap()
+ public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
{
return _contentBodyMap;
}