summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-01-19 15:53:43 +0000
committerAidan Skinner <aidan@apache.org>2009-01-19 15:53:43 +0000
commit0269fb662e844aa90ec659288fde3cd86643e6e4 (patch)
treec5ad6c34d6a46507d46ddcfc4c3c6e9facf91613 /qpid/java/broker
parentb1f26965fd674c21cbbe5d7fa121d95d43c2aa39 (diff)
downloadqpid-python-0269fb662e844aa90ec659288fde3cd86643e6e4.tar.gz
QPID-1573: Move unit tests that were living in systests into appropriate module. Fix up a few bugs in other tests that this exposed.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@735735 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java91
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java275
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java562
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java106
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/PluginTest.java54
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java124
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java86
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java297
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java420
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java262
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java155
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java51
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java169
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java306
15 files changed, 2961 insertions, 2 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
new file mode 100644
index 0000000000..b94f2ef76f
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import junit.framework.TestCase;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.ManagedBroker;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class AMQBrokerManagerMBeanTest extends TestCase
+{
+ private QueueRegistry _queueRegistry;
+ private ExchangeRegistry _exchangeRegistry;
+
+ public void testExchangeOperations() throws Exception
+ {
+ String exchange1 = "testExchange1_" + System.currentTimeMillis();
+ String exchange2 = "testExchange2_" + System.currentTimeMillis();
+ String exchange3 = "testExchange3_" + System.currentTimeMillis();
+
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) == null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null);
+
+ VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) vHost.getManagedObject());
+ mbean.createNewExchange(exchange1, "direct", false);
+ mbean.createNewExchange(exchange2, "topic", false);
+ mbean.createNewExchange(exchange3, "headers", false);
+
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) != null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) != null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) != null);
+
+ mbean.unregisterExchange(exchange1);
+ mbean.unregisterExchange(exchange2);
+ mbean.unregisterExchange(exchange3);
+
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) == null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null);
+ assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null);
+ }
+
+ public void testQueueOperations() throws Exception
+ {
+ String queueName = "testQueue_" + System.currentTimeMillis();
+ VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) vHost.getManagedObject());
+
+ assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
+
+ mbean.createNewQueue(queueName, "test", false);
+ assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) != null);
+
+ mbean.deleteQueue(queueName);
+ assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ _queueRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry();
+ _exchangeRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry();
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
new file mode 100644
index 0000000000..aa7cbbdf3c
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -0,0 +1,275 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+
+import java.util.*;
+
+public class TxAckTest extends TestCase
+{
+ private Scenario individual;
+ private Scenario multiple;
+ private Scenario combined;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ //ack only 5th msg
+ individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l));
+ individual.update(5, false);
+
+ //ack all up to and including 5th msg
+ multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l));
+ multiple.update(5, true);
+
+ //leave only 8th and 9th unacked
+ combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l));
+ combined.update(3, false);
+ combined.update(5, true);
+ combined.update(7, true);
+ combined.update(2, true);//should be ignored
+ combined.update(1, false);//should be ignored
+ combined.update(10, false);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ individual.stop();
+ multiple.stop();
+ combined.stop();
+ }
+
+ public void testPrepare() throws AMQException
+ {
+ individual.prepare();
+ multiple.prepare();
+ combined.prepare();
+ }
+
+ public void testUndoPrepare() throws AMQException
+ {
+ individual.undoPrepare();
+ multiple.undoPrepare();
+ combined.undoPrepare();
+ }
+
+ public void testCommit() throws AMQException
+ {
+ individual.commit();
+ multiple.commit();
+ combined.commit();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TxAckTest.class);
+ }
+
+ private class Scenario
+ {
+ private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000);
+ private final TxAck _op = new TxAck(_map);
+ private final List<Long> _acked;
+ private final List<Long> _unacked;
+ private StoreContext _storeContext = new StoreContext();
+ private AMQQueue _queue;
+
+ Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
+ {
+ TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(),
+ _storeContext, null,
+ new LinkedList<RequiredDeliveryException>()
+ );
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("test", new MemoryMessageStore()),
+ null);
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ long deliveryTag = i + 1;
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+ _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message));
+ }
+ _acked = acked;
+ _unacked = unacked;
+ }
+
+ void update(long deliverytag, boolean multiple)
+ {
+ _op.update(deliverytag, multiple);
+ }
+
+ private void assertCount(List<Long> tags, int expected)
+ {
+ for (long tag : tags)
+ {
+ QueueEntry u = _map.get(tag);
+ assertTrue("Message not found for tag " + tag, u != null);
+ ((TestMessage) u.getMessage()).assertCountEquals(expected);
+ }
+ }
+
+ void prepare() throws AMQException
+ {
+ _op.consolidate();
+ _op.prepare(_storeContext);
+
+ assertCount(_acked, -1);
+ assertCount(_unacked, 0);
+
+ }
+
+ void undoPrepare()
+ {
+ _op.consolidate();
+ _op.undoPrepare();
+
+ assertCount(_acked, 1);
+ assertCount(_unacked, 0);
+ }
+
+ void commit()
+ {
+ _op.consolidate();
+ _op.commit(_storeContext);
+
+ //check acked messages are removed from map
+ Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags());
+ keys.retainAll(_acked);
+ assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
+ //check unacked messages are still in map
+ keys = new HashSet<Long>(_unacked);
+ keys.removeAll(_map.getDeliveryTags());
+ assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
+ }
+
+ public void stop()
+ {
+ _queue.stop();
+ }
+ }
+
+ private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
+ {
+ final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
+ null,
+ false);
+ try
+ {
+ amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
+ publishBody,
+ new ContentHeaderBody()
+ {
+ public int getSize()
+ {
+ return 1;
+ }
+ });
+ }
+ catch (AMQException e)
+ {
+ // won't happen
+ }
+
+
+ return amqMessageHandle;
+ }
+
+
+ private class TestMessage extends AMQMessage
+ {
+ private final long _tag;
+ private int _count;
+
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+ throws AMQException
+ {
+ super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
+ _tag = tag;
+ }
+
+
+ public boolean incrementReference()
+ {
+ _count++;
+ return true;
+ }
+
+ public void decrementReference(StoreContext context)
+ {
+ _count--;
+ }
+
+ void assertCountEquals(int expected)
+ {
+ assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ }
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
new file mode 100644
index 0000000000..6dcb187a37
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -0,0 +1,562 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.log4j.Logger;
+
+import java.util.*;
+
+public class AbstractHeadersExchangeTestBase extends TestCase
+{
+ private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class);
+
+ private final HeadersExchange exchange = new HeadersExchange();
+ protected final Set<TestQueue> queues = new HashSet<TestQueue>();
+
+ /**
+ * Not used in this test, just there to stub out the routing calls
+ */
+ private MessageStore _store = new MemoryMessageStore();
+
+ private StoreContext _storeContext = new StoreContext();
+
+ private MessageHandleFactory _handleFactory = new MessageHandleFactory();
+
+ private int count;
+
+ public void testDoNothing()
+ {
+ // this is here only to make junit under Eclipse happy
+ }
+
+ protected TestQueue bindDefault(String... bindings) throws AMQException
+ {
+ return bind("Queue" + (++count), bindings);
+ }
+
+ protected TestQueue bind(String queueName, String... bindings) throws AMQException
+ {
+ return bind(queueName, getHeaders(bindings));
+ }
+
+ protected TestQueue bind(String queue, FieldTable bindings) throws AMQException
+ {
+ return bind(new TestQueue(new AMQShortString(queue)), bindings);
+ }
+
+ protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException
+ {
+ return bind(queue, getHeaders(bindings));
+ }
+
+ protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException
+ {
+ queues.add(queue);
+ exchange.registerQueue(null, queue, bindings);
+ return queue;
+ }
+
+
+ protected void route(Message m) throws AMQException
+ {
+ m.route(exchange);
+ m.getIncomingMessage().routingComplete(_store, _handleFactory);
+ if(m.getIncomingMessage().allContentReceived())
+ {
+ m.getIncomingMessage().deliverToQueues();
+ }
+ }
+
+ protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
+ {
+ routeAndTest(m, false, Arrays.asList(expected));
+ }
+
+ protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException
+ {
+ routeAndTest(m, expectReturn, Arrays.asList(expected));
+ }
+
+ protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
+ {
+ routeAndTest(m, false, expected);
+ }
+
+ protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
+ {
+ try
+ {
+ route(m);
+ assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+ for (TestQueue q : queues)
+ {
+ if (expected.contains(q))
+ {
+ assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m));
+ //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
+ }
+ else
+ {
+ assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m));
+ //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+ }
+ }
+ }
+
+ catch (NoRouteException ex)
+ {
+ assertTrue("Expected "+m+" not to be returned",expectReturn);
+ }
+
+ }
+
+ static FieldTable getHeaders(String... entries)
+ {
+ FieldTable headers = FieldTableFactory.newFieldTable();
+ for (String s : entries)
+ {
+ String[] parts = s.split("=", 2);
+ headers.setObject(parts[0], parts.length > 1 ? parts[1] : "");
+ }
+ return headers;
+ }
+
+
+ static final class MessagePublishInfoImpl implements MessagePublishInfo
+ {
+ private AMQShortString _exchange;
+ private boolean _immediate;
+ private boolean _mandatory;
+ private AMQShortString _routingKey;
+
+ public MessagePublishInfoImpl(AMQShortString routingKey)
+ {
+ _routingKey = routingKey;
+ }
+
+ public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey)
+ {
+ _exchange = exchange;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _routingKey = routingKey;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public boolean isImmediate()
+ {
+ return _immediate;
+
+ }
+
+ public boolean isMandatory()
+ {
+ return _mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+
+ public void setExchange(AMQShortString exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public void setImmediate(boolean immediate)
+ {
+ _immediate = immediate;
+ }
+
+ public void setMandatory(boolean mandatory)
+ {
+ _mandatory = mandatory;
+ }
+
+ public void setRoutingKey(AMQShortString routingKey)
+ {
+ _routingKey = routingKey;
+ }
+ }
+
+ static MessagePublishInfo getPublishRequest(final String id)
+ {
+ return new MessagePublishInfoImpl(null, false, false, new AMQShortString(id));
+ }
+
+ static ContentHeaderBody getContentHeader(FieldTable headers)
+ {
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.properties = getProperties(headers);
+ return header;
+ }
+
+ static BasicContentHeaderProperties getProperties(FieldTable headers)
+ {
+ BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+ properties.setHeaders(headers);
+ return properties;
+ }
+
+ static class TestQueue extends SimpleAMQQueue
+ {
+ final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+
+ public TestQueue(AMQShortString name) throws AMQException
+ {
+ super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"));
+ ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry().registerQueue(this);
+ }
+
+ /**
+ * We override this method so that the default behaviour, which attempts to use a delivery manager, is
+ * not invoked. It is unnecessary since for this test we only care to know whether the message was
+ * sent to the queue; the queue processing logic is not being tested.
+ * @param msg
+ * @throws AMQException
+ */
+ @Override
+ public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
+ {
+ messages.add( new HeadersExchangeTest.Message(msg));
+ return new QueueEntry()
+ {
+
+ public AMQQueue getQueue()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public AMQMessage getMessage()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getSize()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean getDeliveredToConsumer()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean expired() throws AMQException
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isAcquired()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean acquire()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean acquire(Subscription sub)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean delete()
+ {
+ return false;
+ }
+
+ public boolean isDeleted()
+ {
+ return false;
+ }
+
+ public boolean acquiredBySubscription()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setDeliveredToSubscription()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void release()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public String debugIdentity()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean immediateAndNotDelivered()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setRedelivered(boolean b)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Subscription getDeliveredSubscription()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void reject()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void reject(Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isRejectedBy(Subscription subscription)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void requeue(StoreContext storeContext) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeue(final StoreContext storeContext) throws FailedDequeueException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dispose(final StoreContext storeContext) throws MessageCleanupException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void restoreCredit()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isQueueDeleted()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void addStateChangeListener(StateChangeListener listener)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean removeStateChangeListener(StateChangeListener listener)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int compareTo(final QueueEntry o)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+ };
+ }
+
+ boolean isInQueue(Message msg)
+ {
+ return messages.contains(msg);
+ }
+
+ }
+
+ /**
+ * Just add some extra utility methods to AMQMessage to aid testing.
+ */
+ static class Message extends AMQMessage
+ {
+ private class TestIncomingMessage extends IncomingMessage
+ {
+
+ public TestIncomingMessage(final long messageId,
+ final MessagePublishInfo info,
+ final TransactionalContext txnContext,
+ final AMQProtocolSession publisher)
+ {
+ super(messageId, info, txnContext, publisher);
+ }
+
+
+ public AMQMessage getUnderlyingMessage()
+ {
+ return Message.this;
+ }
+
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ try
+ {
+ return Message.this.getContentHeaderBody();
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private IncomingMessage _incoming;
+
+ private static MessageStore _messageStore = new SkeletonMessageStore();
+
+ private static StoreContext _storeContext = new StoreContext();
+
+
+ private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
+ null,
+ new LinkedList<RequiredDeliveryException>()
+ );
+
+ Message(String id, String... headers) throws AMQException
+ {
+ this(id, getHeaders(headers));
+ }
+
+ Message(String id, FieldTable headers) throws AMQException
+ {
+ this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+ }
+
+ public IncomingMessage getIncomingMessage()
+ {
+ return _incoming;
+ }
+
+ private Message(long messageId,
+ MessagePublishInfo publish,
+ ContentHeaderBody header,
+ List<ContentBody> bodies) throws AMQException
+ {
+ super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
+
+
+
+ _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
+ _incoming.setContentHeaderBody(header);
+
+
+ }
+
+ private static AMQMessageHandle createMessageHandle(final long messageId,
+ final MessagePublishInfo publish,
+ final ContentHeaderBody header)
+ {
+
+ final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
+ _messageStore,
+ true);
+
+ try
+ {
+ amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
+ }
+ catch (AMQException e)
+ {
+
+ }
+ return amqMessageHandle;
+ }
+
+ private Message(AMQMessage msg) throws AMQException
+ {
+ super(msg);
+ }
+
+
+
+ void route(Exchange exchange) throws AMQException
+ {
+ exchange.route(_incoming);
+ }
+
+
+ public int hashCode()
+ {
+ return getKey().hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o);
+ }
+
+ private boolean equals(HeadersExchangeTest.Message m)
+ {
+ return getKey().equals(m.getKey());
+ }
+
+ public String toString()
+ {
+ return getKey().toString();
+ }
+
+ private Object getKey()
+ {
+ try
+ {
+ return getMessagePublishInfo().getRoutingKey();
+ }
+ catch (AMQException e)
+ {
+ _log.error("Error getting routing key: " + e, e);
+ return null;
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
new file mode 100644
index 0000000000..fd11ddeae2
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.framing.BasicPublishBody;
+
+public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
+{
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
+ }
+
+ protected void tearDown()
+ {
+ ApplicationRegistry.remove(1);
+ }
+
+ public void testSimple() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000");
+ TestQueue q2 = bindDefault("F0000=Aardvark");
+ TestQueue q3 = bindDefault("F0001");
+ TestQueue q4 = bindDefault("F0001=Bear");
+ TestQueue q5 = bindDefault("F0000", "F0001");
+ TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear");
+ TestQueue q7 = bindDefault("F0000", "F0001=Bear");
+ TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
+
+ routeAndTest(new Message("Message1", "F0000"), q1);
+ routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2);
+ routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8);
+ routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7);
+ routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
+ q1, q2, q3, q4, q5, q6, q7, q8);
+ routeAndTest(new Message("Message6", "F0002"));
+
+ Message m7 = new Message("Message7", "XXXXX");
+
+ MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo());
+ pb7.setMandatory(true);
+ routeAndTest(m7,true);
+
+ Message m8 = new Message("Message8", "F0000");
+ MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo());
+ pb8.setMandatory(true);
+ routeAndTest(m8,false,q1);
+
+
+ }
+
+ public void testAny() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any");
+ TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any");
+ TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any");
+ TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
+ TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
+
+ routeAndTest(new Message("Message1", "F0000"), q1, q3);
+ routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4);
+ routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
+ routeAndTest(new Message("Message6", "F0002"));
+ }
+
+ public void testMandatory() throws AMQException
+ {
+ bindDefault("F0000");
+ Message m1 = new Message("Message1", "XXXXX");
+ Message m2 = new Message("Message2", "F0000");
+ MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo());
+ pb1.setMandatory(true);
+ MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo());
+ pb2.setMandatory(true);
+ routeAndTest(m1,true);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(HeadersExchangeTest.class);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/PluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/PluginTest.java
new file mode 100644
index 0000000000..0762a7a561
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/PluginTest.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.plugins;
+
+import java.util.Map;
+
+import org.apache.qpid.server.exchange.ExchangeType;
+
+import junit.framework.TestCase;
+
+public class PluginTest extends TestCase
+{
+
+ private static final String TEST_EXCHANGE_CLASS = "org.apache.qpid.extras.exchanges.example.TestExchangeType";
+ private static final String PLUGIN_DIRECTORY = System.getProperty("example.plugin.target");
+
+ public void testLoadExchanges() throws Exception
+ {
+ PluginManager manager = new PluginManager(PLUGIN_DIRECTORY);
+ Map<String, ExchangeType<?>> exchanges = manager.getExchanges();
+ assertNotNull("No exchanges found in "+PLUGIN_DIRECTORY, exchanges);
+ assertEquals("Wrong number of exchanges found in "+PLUGIN_DIRECTORY,
+ 2, exchanges.size());
+ assertNotNull("Wrong exchange found in "+PLUGIN_DIRECTORY,
+ exchanges.get(TEST_EXCHANGE_CLASS));
+ }
+
+ public void testNoExchanges() throws Exception
+ {
+ PluginManager manager = new PluginManager("/path/to/nowhere");
+ Map<String, ExchangeType<?>> exchanges = manager.getExchanges();
+ assertNull("Exchanges found", exchanges);
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
new file mode 100644
index 0000000000..8e7038eec3
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+
+import javax.management.JMException;
+
+/**
+ * Test class to test MBean operations for AMQMinaProtocolSession.
+ */
+public class AMQProtocolSessionMBeanTest extends TestCase
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class);
+
+ private MessageStore _messageStore = new SkeletonMessageStore();
+ private AMQMinaProtocolSession _protocolSession;
+ private AMQChannel _channel;
+ private AMQProtocolSessionMBean _mbean;
+
+ public void testChannels() throws Exception
+ {
+ // check the channel count is correct
+ int channelCount = _mbean.channels().size();
+ assertTrue(channelCount == 1);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue_" + System.currentTimeMillis()),
+ false,
+ new AMQShortString("test"),
+ true,
+ _protocolSession.getVirtualHost(), null);
+ AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore);
+ channel.setDefaultQueue(queue);
+ _protocolSession.addChannel(channel);
+ channelCount = _mbean.channels().size();
+ assertTrue(channelCount == 2);
+
+ // general properties test
+ _mbean.setMaximumNumberOfChannels(1000L);
+ assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
+
+ // check APIs
+ AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore);
+ channel3.setLocalTransactional();
+ _protocolSession.addChannel(channel3);
+ _mbean.rollbackTransactions(2);
+ _mbean.rollbackTransactions(3);
+ _mbean.commitTransactions(2);
+ _mbean.commitTransactions(3);
+
+ // This should throw exception, because the channel does't exist
+ try
+ {
+ _mbean.commitTransactions(4);
+ fail();
+ }
+ catch (JMException ex)
+ {
+ log.debug("expected exception is thrown :" + ex.getMessage());
+ }
+
+ // check if closing of session works
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore));
+ _mbean.closeConnection();
+ try
+ {
+ channelCount = _mbean.channels().size();
+ assertTrue(channelCount == 0);
+ // session is now closed so adding another channel should throw an exception
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore));
+ fail();
+ }
+ catch (AMQException ex)
+ {
+ log.debug("expected exception is thrown :" + ex.getMessage());
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ _protocolSession =
+ new AMQMinaProtocolSession(new MockIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true),
+ null);
+ _protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
+ _channel = new AMQChannel(_protocolSession, 1, _messageStore);
+ _protocolSession.addChannel(_channel);
+ _mbean = (AMQProtocolSessionMBean) _protocolSession.getManagedObject();
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
new file mode 100644
index 0000000000..307dcf66fe
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+/** Test class to test MBean operations for AMQMinaProtocolSession. */
+public class MaxChannelsTest extends TestCase
+{
+ private IApplicationRegistry _appRegistry;
+ private AMQMinaProtocolSession _session;
+
+ public void testChannels() throws Exception
+ {
+ _session = new AMQMinaProtocolSession(new MockIoSession(), _appRegistry
+ .getVirtualHostRegistry(), new AMQCodecFactory(true), null);
+ _session.setVirtualHost(_appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
+
+ // check the channel count is correct
+ int channelCount = _session.getChannels().size();
+ assertEquals("Initial channel count wrong", 0, channelCount);
+
+ long maxChannels = 10L;
+ _session.setMaximumNumberOfChannels(maxChannels);
+ assertEquals("Number of channels not correctly set.", new Long(maxChannels), _session.getMaximumNumberOfChannels());
+
+
+ try
+ {
+ for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
+ {
+ _session.addChannel(new AMQChannel(_session, (int) currentChannel, null));
+ }
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Wrong exception recevied.", e.getErrorCode(), AMQConstant.NOT_ALLOWED);
+ }
+ assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size()));
+ }
+
+ @Override
+ public void setUp()
+ {
+ _appRegistry = ApplicationRegistry.getInstance(1);
+ }
+
+ @Override
+ public void tearDown()
+ {
+ try {
+ _session.closeSession();
+ } catch (AMQException e) {
+ // Yikes
+ fail(e.getMessage());
+ }
+ ApplicationRegistry.remove(1);
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
new file mode 100644
index 0000000000..cf6366b513
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
@@ -0,0 +1,297 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.mina.common.*;
+import org.apache.mina.common.support.DefaultCloseFuture;
+import org.apache.mina.common.support.DefaultWriteFuture;
+
+import java.net.SocketAddress;
+import java.net.InetSocketAddress;
+import java.util.Set;
+
+public class MockIoSession implements IoSession
+{
+ private AMQProtocolSession _protocolSession;
+
+ /**
+ * Stores the last response written
+ */
+ private Object _lastWrittenObject;
+
+ private boolean _closing;
+
+ public MockIoSession()
+ {
+ }
+
+ public Object getLastWrittenObject()
+ {
+ return _lastWrittenObject;
+ }
+
+ public IoService getService()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoServiceConfig getServiceConfig()
+ {
+ return null;
+ }
+
+ public IoHandler getHandler()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public WriteFuture write(Object message)
+ {
+ WriteFuture wf = new DefaultWriteFuture(null);
+ _lastWrittenObject = message;
+ return wf;
+ }
+
+ public CloseFuture close()
+ {
+ _closing = true;
+ CloseFuture cf = new DefaultCloseFuture(null);
+ cf.setClosed();
+ return cf;
+ }
+
+ public Object getAttachment()
+ {
+ return _protocolSession;
+ }
+
+ public Object setAttachment(Object attachment)
+ {
+ Object current = _protocolSession;
+ _protocolSession = (AMQProtocolSession) attachment;
+ return current;
+ }
+
+ public Object getAttribute(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object setAttribute(String key, Object value)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object setAttribute(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object removeAttribute(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean containsAttribute(String key)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Set getAttributeKeys()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TransportType getTransportType()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isConnected()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isClosing()
+ {
+ return _closing;
+ }
+
+ public CloseFuture getCloseFuture()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return new InetSocketAddress("127.0.0.1", 1234); //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getIdleTime(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getIdleTimeInMillis(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setIdleTime(IdleStatus status, int idleTime)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getWriteTimeout()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getWriteTimeoutInMillis()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setWriteTimeout(int writeTimeout)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public TrafficMask getTrafficMask()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setTrafficMask(TrafficMask trafficMask)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void suspendRead()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void suspendWrite()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void resumeRead()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void resumeWrite()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getReadBytes()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getReadMessages()
+ {
+ return 0L;
+ }
+
+ public long getWrittenMessages()
+ {
+ return 0L;
+ }
+
+ public long getWrittenWriteRequests()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ return 0; //TODO
+ }
+
+ public long getCreationTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastIoTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastReadTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastWriteTime()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isIdle(IdleStatus status)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getIdleCount(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLastIdleTime(IdleStatus status)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
new file mode 100644
index 0000000000..9c2932c5e2
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -0,0 +1,420 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.flow.Pre0_10CreditManager;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.Collections;
+
+/**
+ * Tests that acknowledgements are handled correctly.
+ */
+public class AckTest extends TestCase
+{
+ private static final Logger _log = Logger.getLogger(AckTest.class);
+
+ private Subscription _subscription;
+
+ private MockProtocolSession _protocolSession;
+
+ private TestMemoryMessageStore _messageStore;
+
+ private StoreContext _storeContext = new StoreContext();
+
+ private AMQChannel _channel;
+
+ private AMQQueue _queue;
+
+ private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag");
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
+
+ _messageStore = new TestMemoryMessageStore();
+ _protocolSession = new MockProtocolSession(_messageStore);
+ _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
+
+ _protocolSession.addChannel(_channel);
+
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"),
+ null);
+ }
+
+ protected void tearDown()
+ {
+ ApplicationRegistry.remove(1);
+ }
+
+ private void publishMessages(int count) throws AMQException
+ {
+ publishMessages(count, false);
+ }
+
+ private void publishMessages(int count, boolean persistent) throws AMQException
+ {
+ TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
+ new LinkedList<RequiredDeliveryException>()
+ );
+ _queue.registerSubscription(_subscription,false);
+ MessageHandleFactory factory = new MessageHandleFactory();
+ for (int i = 1; i <= count; i++)
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Establish some way to determine the version for the test.
+ MessagePublishInfo publishBody = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return new AMQShortString("someExchange");
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("rk");
+ }
+ };
+ IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession);
+ //IncomingMessage msg2 = null;
+ if (persistent)
+ {
+ BasicContentHeaderProperties b = new BasicContentHeaderProperties();
+ //This is DeliveryMode.PERSISTENT
+ b.setDeliveryMode((byte) 2);
+ ContentHeaderBody cb = new ContentHeaderBody();
+ cb.properties = b;
+ msg.setContentHeaderBody(cb);
+ }
+ else
+ {
+ msg.setContentHeaderBody(new ContentHeaderBody());
+ }
+ // we increment the reference here since we are not delivering the messaging to any queues, which is where
+ // the reference is normally incremented. The test is easier to construct if we have direct access to the
+ // subscription
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+ msg.enqueue(qs);
+ msg.routingComplete(_messageStore, factory);
+ if(msg.allContentReceived())
+ {
+ msg.deliverToQueues();
+ }
+ // we manually send the message to the subscription
+ //_subscription.send(new QueueEntry(_queue,msg), _queue);
+ }
+ }
+
+ /**
+ * Tests that the acknowledgements are correctly associated with a channel and
+ * order is preserved when acks are enabled
+ */
+ public void testAckChannelAssociationTest() throws AMQException
+ {
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
+ final int msgCount = 10;
+ publishMessages(msgCount, true);
+
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == msgCount);
+ assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
+
+ Set<Long> deliveryTagSet = map.getDeliveryTags();
+ int i = 1;
+ for (long deliveryTag : deliveryTagSet)
+ {
+ assertTrue(deliveryTag == i);
+ i++;
+ QueueEntry unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getQueue() == _queue);
+ }
+
+ assertTrue(map.size() == msgCount);
+ assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
+ }
+
+ /**
+ * Tests that in no-ack mode no messages are retained
+ */
+ public void testNoAckMode() throws AMQException
+ {
+ // false arg means no acks expected
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 0);
+ assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
+ assertTrue(_messageStore.getContentBodyMap().size() == 0);
+
+ }
+
+ /**
+ * Tests that in no-ack mode no messages are retained
+ */
+ public void testPersistentNoAckMode() throws AMQException
+ {
+ // false arg means no acks expected
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
+ final int msgCount = 10;
+ publishMessages(msgCount, true);
+
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 0);
+ assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
+ assertTrue(_messageStore.getContentBodyMap().size() == 0);
+
+ }
+
+ /**
+ * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
+ * set case)
+ */
+ public void testSingleAckReceivedTest() throws AMQException
+ {
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ _channel.acknowledgeMessage(5, false);
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == msgCount - 1);
+
+ Set<Long> deliveryTagSet = map.getDeliveryTags();
+ int i = 1;
+ for (long deliveryTag : deliveryTagSet)
+ {
+ assertTrue(deliveryTag == i);
+ QueueEntry unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getQueue() == _queue);
+ // 5 is the delivery tag of the message that *should* be removed
+ if (++i == 5)
+ {
+ ++i;
+ }
+ }
+ }
+
+ /**
+ * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
+ * set case)
+ */
+ public void testMultiAckReceivedTest() throws AMQException
+ {
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ _channel.acknowledgeMessage(5, true);
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 5);
+
+ Set<Long> deliveryTagSet = map.getDeliveryTags();
+ int i = 1;
+ for (long deliveryTag : deliveryTagSet)
+ {
+ assertTrue(deliveryTag == i + 5);
+ QueueEntry unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getQueue() == _queue);
+ ++i;
+ }
+ }
+
+ /**
+ * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
+ */
+ public void testMultiAckAllReceivedTest() throws AMQException
+ {
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+ final int msgCount = 10;
+ publishMessages(msgCount);
+
+ _channel.acknowledgeMessage(0, true);
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 0);
+
+ Set<Long> deliveryTagSet = map.getDeliveryTags();
+ int i = 1;
+ for (long deliveryTag : deliveryTagSet)
+ {
+ assertTrue(deliveryTag == i + 5);
+ QueueEntry unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getQueue() == _queue);
+ ++i;
+ }
+ }
+
+ /**
+ * A regression fixing QPID-1136 showed this up
+ *
+ * @throws Exception
+ */
+ public void testMessageDequeueRestoresCreditTest() throws Exception
+ {
+ // Send 10 messages
+ Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
+
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
+ DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+ final int msgCount = 1;
+ publishMessages(msgCount);
+
+ _queue.deliverAsync(_subscription);
+
+ _channel.acknowledgeMessage(1, false);
+
+ // Check credit available
+ assertTrue("No credit available", creditManager.hasCredit());
+
+ }
+
+
+/*
+ public void testPrefetchHighLow() throws AMQException
+ {
+ int lowMark = 5;
+ int highMark = 10;
+
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+ _channel.setPrefetchLowMarkCount(lowMark);
+ _channel.setPrefetchHighMarkCount(highMark);
+
+ assertTrue(_channel.getPrefetchLowMarkCount() == lowMark);
+ assertTrue(_channel.getPrefetchHighMarkCount() == highMark);
+
+ publishMessages(highMark);
+
+ // at this point we should have sent out only highMark messages
+ // which have not bee received so will be queued up in the channel
+ // which should be suspended
+ assertTrue(_subscription.isSuspended());
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == highMark);
+
+ //acknowledge messages so we are just above lowMark
+ _channel.acknowledgeMessage(lowMark - 1, true);
+
+ //we should still be suspended
+ assertTrue(_subscription.isSuspended());
+ assertTrue(map.size() == lowMark + 1);
+
+ //acknowledge one more message
+ _channel.acknowledgeMessage(lowMark, true);
+
+ //and suspension should be lifted
+ assertTrue(!_subscription.isSuspended());
+
+ //pubilsh more msgs so we are just below the limit
+ publishMessages(lowMark - 1);
+
+ //we should not be suspended
+ assertTrue(!_subscription.isSuspended());
+
+ //acknowledge all messages
+ _channel.acknowledgeMessage(0, true);
+ try
+ {
+ Thread.sleep(3000);
+ }
+ catch (InterruptedException e)
+ {
+ _log.error("Error: " + e, e);
+ }
+ //map will be empty
+ assertTrue(map.size() == 0);
+ }
+
+*/
+/*
+ public void testPrefetch() throws AMQException
+ {
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+ _channel.setMessageCredit(5);
+
+ assertTrue(_channel.getPrefetchCount() == 5);
+
+ final int msgCount = 5;
+ publishMessages(msgCount);
+
+ // at this point we should have sent out only 5 messages with a further 5 queued
+ // up in the channel which should now be suspended
+ assertTrue(_subscription.isSuspended());
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == 5);
+ _channel.acknowledgeMessage(5, true);
+ assertTrue(!_subscription.isSuspended());
+ try
+ {
+ Thread.sleep(3000);
+ }
+ catch (InterruptedException e)
+ {
+ _log.error("Error: " + e, e);
+ }
+ assertTrue(map.size() == 0);
+ }
+
+*/
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(AckTest.class);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
new file mode 100644
index 0000000000..99c88fac3e
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -0,0 +1,262 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.transport.Sender;
+
+import javax.security.sasl.SaslServer;
+import java.util.HashMap;
+import java.util.Map;
+import java.security.Principal;
+
+/**
+ * A protocol session that can be used for testing purposes.
+ */
+public class MockProtocolSession implements AMQProtocolSession
+{
+ private MessageStore _messageStore;
+
+ private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+
+ public MockProtocolSession(MessageStore messageStore)
+ {
+ _messageStore = messageStore;
+ }
+
+ public void dataBlockReceived(AMQDataBlock message) throws Exception
+ {
+ }
+
+ public void writeFrame(AMQDataBlock frame)
+ {
+ }
+
+ public AMQShortString getContextKey()
+ {
+ return null;
+ }
+
+ public void setContextKey(AMQShortString contextKey)
+ {
+ }
+
+ public AMQChannel getChannel(int channelId)
+ {
+ AMQChannel channel = _channelMap.get(channelId);
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Invalid channel id: " + channelId);
+ }
+ else
+ {
+ return channel;
+ }
+ }
+
+ public void addChannel(AMQChannel channel)
+ {
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Channel must not be null");
+ }
+ else
+ {
+ _channelMap.put(channel.getChannelId(), channel);
+ }
+ }
+
+ public void closeChannel(int channelId) throws AMQException
+ {
+ }
+
+ public void closeChannelOk(int channelId)
+ {
+
+ }
+
+ public boolean channelAwaitingClosure(int channelId)
+ {
+ return false;
+ }
+
+ public void removeChannel(int channelId)
+ {
+ _channelMap.remove(channelId);
+ }
+
+ public void initHeartbeats(int delay)
+ {
+ }
+
+ public void closeSession() throws AMQException
+ {
+ }
+
+ public void closeConnection(int channelId, AMQConnectionException e, boolean closeIoSession) throws AMQException
+ {
+ }
+
+ public Object getKey()
+ {
+ return null;
+ }
+
+ public String getLocalFQDN()
+ {
+ return null;
+ }
+
+ public SaslServer getSaslServer()
+ {
+ return null;
+ }
+
+ public void setSaslServer(SaslServer saslServer)
+ {
+ }
+
+ public FieldTable getClientProperties()
+ {
+ return null;
+ }
+
+ public void setClientProperties(FieldTable clientProperties)
+ {
+ }
+
+ public Object getClientIdentifier()
+ {
+ return null;
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setVirtualHost(VirtualHost virtualHost)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void addSessionCloseTask(Task task)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeSessionCloseTask(Task task)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public ProtocolOutputConverter getProtocolOutputConverter()
+ {
+ return ProtocolOutputConverterRegistry.getConverter(this);
+ }
+
+ public void setAuthorizedID(Principal authorizedID)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Principal getAuthorizedID()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public MethodRegistry getMethodRegistry()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void methodFrameReceived(int channelId, AMQMethodBody body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void contentBodyReceived(int channelId, ContentBody body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public MethodDispatcher getMethodDispatcher()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public ProtocolSessionIdentifier getSessionIdentifier()
+ {
+ return null;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolVersion().getMajorVersion();
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolVersion().getMinorVersion();
+ }
+
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return ProtocolVersion.getLatestSupportedVersion(); //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ public VersionSpecificRegistry getRegistry()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setSender(Sender<java.nio.ByteBuffer> sender)
+ {
+ // FIXME AS TODO
+
+ }
+
+ public void init()
+ {
+ // TODO Auto-generated method stub
+
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index f76c652793..f45d887dec 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -35,6 +35,7 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase
public void test() throws AMQException
{
+ assertEquals("References exist before start!", 0, ReferenceCountingExecutorService.getInstance().getReferenceCount());
VirtualHost test = ApplicationRegistry.getInstance(1).getVirtualHostRegistry().getVirtualHost("test");
try
@@ -43,8 +44,8 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase
new AMQShortString("owner"),
false, test, null);
- assertTrue("Creation did not start Pool.", !ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
-
+ assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
+
queue.stop();
assertEquals("References still exist", 0, ReferenceCountingExecutorService.getInstance().getReferenceCount());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
new file mode 100644
index 0000000000..f08a15a8a7
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A message store that does nothing. Designed to be used in tests that do not want to use any message store
+ * functionality.
+ */
+public class SkeletonMessageStore implements MessageStore
+{
+ private final AtomicLong _messageId = new AtomicLong(1);
+
+ public void configure(String base, Configuration config) throws Exception
+ {
+ }
+
+ public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void close() throws Exception
+ {
+ }
+
+ public void removeMessage(StoreContext s, Long messageId)
+ {
+ }
+
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ }
+
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ {
+ }
+
+ public void beginTran(StoreContext s) throws AMQException
+ {
+ }
+
+ public boolean inTran(StoreContext sc)
+ {
+ return false;
+ }
+
+ public void commitTran(StoreContext storeContext) throws AMQException
+ {
+ }
+
+ public void abortTran(StoreContext storeContext) throws AMQException
+ {
+ }
+
+ public List<AMQQueue> createQueues() throws AMQException
+ {
+ return null;
+ }
+
+ public Long getNewMessageId()
+ {
+ return _messageId.getAndIncrement();
+ }
+
+ public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ {
+
+ }
+
+ public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ {
+
+ }
+
+ public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) throws AMQException
+ {
+ return null;
+ }
+
+ public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
+ {
+ return null;
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ public void removeQueue(final AMQQueue queue) throws AMQException
+ {
+
+ }
+
+ public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ {
+
+ }
+
+ public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ {
+
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
new file mode 100644
index 0000000000..4e48435962
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.List;
+
+/**
+ * Adds some extra methods to the memory message store for testing purposes.
+ */
+public class TestMemoryMessageStore extends MemoryMessageStore
+{
+ public TestMemoryMessageStore()
+ {
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
+ }
+
+ public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+ {
+ return _metaDataMap;
+ }
+
+ public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
+ {
+ return _contentBodyMap;
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
new file mode 100644
index 0000000000..2346660d25
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -0,0 +1,169 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+
+/**
+ * Tests that reference counting works correctly with AMQMessage and the message store
+ */
+public class TestReferenceCounting extends TestCase
+{
+ private TestMemoryMessageStore _store;
+
+ private StoreContext _storeContext = new StoreContext();
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _store = new TestMemoryMessageStore();
+ }
+
+ /**
+ * Check that when the reference count is decremented the message removes itself from the store
+ */
+ public void testMessageGetsRemoved() throws AMQException
+ {
+ ContentHeaderBody chb = createPersistentContentHeader();
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+
+ final long messageId = _store.getNewMessageId();
+ AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
+ messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb);
+ AMQMessage message = new AMQMessage(messageHandle,
+ _storeContext,info);
+
+ message = message.takeReference();
+
+ // we call routing complete to set up the handle
+ // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
+
+
+ assertEquals(1, _store.getMessageMetaDataMap().size());
+ message.decrementReference(_storeContext);
+ assertEquals(1, _store.getMessageMetaDataMap().size());
+ }
+
+ private ContentHeaderBody createPersistentContentHeader()
+ {
+ ContentHeaderBody chb = new ContentHeaderBody();
+ BasicContentHeaderProperties bchp = new BasicContentHeaderProperties();
+ bchp.setDeliveryMode((byte)2);
+ chb.properties = bchp;
+ return chb;
+ }
+
+ public void testMessageRemains() throws AMQException
+ {
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ final Long messageId = _store.getNewMessageId();
+ final ContentHeaderBody chb = createPersistentContentHeader();
+ AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
+ messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb);
+ AMQMessage message = new AMQMessage(messageHandle,
+ _storeContext,
+ info);
+
+
+ message = message.takeReference();
+ // we call routing complete to set up the handle
+ // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
+
+
+
+ assertEquals(1, _store.getMessageMetaDataMap().size());
+ message = message.takeReference();
+ message.decrementReference(_storeContext);
+ assertEquals(1, _store.getMessageMetaDataMap().size());
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TestReferenceCounting.class);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
new file mode 100644
index 0000000000..84d3d313d1
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -0,0 +1,306 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+
+public class TxnBufferTest extends TestCase
+{
+ private final LinkedList<MockOp> ops = new LinkedList<MockOp>();
+
+ public void testCommit() throws AMQException
+ {
+ MockStore store = new MockStore();
+
+ TxnBuffer buffer = new TxnBuffer();
+ buffer.enlist(new MockOp().expectPrepare().expectCommit());
+ //check relative ordering
+ MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit();
+ buffer.enlist(op);
+ buffer.enlist(op);
+ buffer.enlist(new MockOp().expectPrepare().expectCommit());
+
+ buffer.commit(null);
+
+ validateOps();
+ store.validate();
+ }
+
+ public void testRollback() throws AMQException
+ {
+ MockStore store = new MockStore();
+
+ TxnBuffer buffer = new TxnBuffer();
+ buffer.enlist(new MockOp().expectRollback());
+ buffer.enlist(new MockOp().expectRollback());
+ buffer.enlist(new MockOp().expectRollback());
+
+ buffer.rollback(null);
+
+ validateOps();
+ store.validate();
+ }
+
+ public void testCommitWithFailureDuringPrepare() throws AMQException
+ {
+ MockStore store = new MockStore();
+ store.beginTran(null);
+
+ TxnBuffer buffer = new TxnBuffer();
+ buffer.enlist(new StoreMessageOperation(store));
+ buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
+ buffer.enlist(new TxnTester(store));
+ buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
+ buffer.enlist(new FailedPrepare());
+ buffer.enlist(new MockOp());
+
+ try
+ {
+ buffer.commit(null);
+ }
+ catch (NoSuchElementException e)
+ {
+
+ }
+
+ validateOps();
+ store.validate();
+ }
+
+ public void testCommitWithPersistance() throws AMQException
+ {
+ MockStore store = new MockStore();
+ store.beginTran(null);
+ store.expectCommit();
+
+ TxnBuffer buffer = new TxnBuffer();
+ buffer.enlist(new MockOp().expectPrepare().expectCommit());
+ buffer.enlist(new MockOp().expectPrepare().expectCommit());
+ buffer.enlist(new MockOp().expectPrepare().expectCommit());
+ buffer.enlist(new StoreMessageOperation(store));
+ buffer.enlist(new TxnTester(store));
+
+ buffer.commit(null);
+ validateOps();
+ store.validate();
+ }
+
+ private void validateOps()
+ {
+ for (MockOp op : ops)
+ {
+ op.validate();
+ }
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TxnBufferTest.class);
+ }
+
+ class MockOp implements TxnOp
+ {
+ final Object PREPARE = "PREPARE";
+ final Object COMMIT = "COMMIT";
+ final Object UNDO_PREPARE = "UNDO_PREPARE";
+ final Object ROLLBACK = "ROLLBACK";
+
+ private final LinkedList expected = new LinkedList();
+
+ MockOp()
+ {
+ ops.add(this);
+ }
+
+ public void prepare(StoreContext context)
+ {
+ assertEquals(expected.removeLast(), PREPARE);
+ }
+
+ public void commit(StoreContext context)
+ {
+ assertEquals(expected.removeLast(), COMMIT);
+ }
+
+ public void undoPrepare()
+ {
+ assertEquals(expected.removeLast(), UNDO_PREPARE);
+ }
+
+ public void rollback(StoreContext context)
+ {
+ assertEquals(expected.removeLast(), ROLLBACK);
+ }
+
+ private MockOp expect(Object optype)
+ {
+ expected.addFirst(optype);
+ return this;
+ }
+
+ MockOp expectPrepare()
+ {
+ return expect(PREPARE);
+ }
+
+ MockOp expectCommit()
+ {
+ return expect(COMMIT);
+ }
+
+ MockOp expectUndoPrepare()
+ {
+ return expect(UNDO_PREPARE);
+ }
+
+ MockOp expectRollback()
+ {
+ return expect(ROLLBACK);
+ }
+
+ void validate()
+ {
+ assertEquals("Expected ops were not all invoked", new LinkedList(), expected);
+ }
+
+ void clear()
+ {
+ expected.clear();
+ }
+ }
+
+ class MockStore extends TestMemoryMessageStore
+ {
+ final Object BEGIN = "BEGIN";
+ final Object ABORT = "ABORT";
+ final Object COMMIT = "COMMIT";
+
+ private final LinkedList expected = new LinkedList();
+ private boolean inTran;
+
+ public void beginTran(StoreContext context) throws AMQException
+ {
+ inTran = true;
+ }
+
+ public void commitTran(StoreContext context) throws AMQException
+ {
+ assertEquals(expected.removeLast(), COMMIT);
+ inTran = false;
+ }
+
+ public void abortTran(StoreContext context) throws AMQException
+ {
+ assertEquals(expected.removeLast(), ABORT);
+ inTran = false;
+ }
+
+ public boolean inTran(StoreContext context)
+ {
+ return inTran;
+ }
+
+ private MockStore expect(Object optype)
+ {
+ expected.addFirst(optype);
+ return this;
+ }
+
+ MockStore expectBegin()
+ {
+ return expect(BEGIN);
+ }
+
+ MockStore expectCommit()
+ {
+ return expect(COMMIT);
+ }
+
+ MockStore expectAbort()
+ {
+ return expect(ABORT);
+ }
+
+ void clear()
+ {
+ expected.clear();
+ }
+
+ void validate()
+ {
+ assertEquals("Expected ops were not all invoked", new LinkedList(), expected);
+ }
+ }
+
+ class NullOp implements TxnOp
+ {
+ public void prepare(StoreContext context) throws AMQException
+ {
+ }
+ public void commit(StoreContext context)
+ {
+ }
+ public void undoPrepare()
+ {
+ }
+ public void rollback(StoreContext context)
+ {
+ }
+ }
+
+ class FailedPrepare extends NullOp
+ {
+ public void prepare() throws AMQException
+ {
+ throw new AMQException(null, "Fail!", null);
+ }
+ }
+
+ class TxnTester extends NullOp
+ {
+ private final MessageStore store;
+
+ private final StoreContext context = new StoreContext();
+
+ TxnTester(MessageStore store)
+ {
+ this.store = store;
+ }
+
+ public void prepare() throws AMQException
+ {
+ assertTrue("Expected prepare to be performed under txn", store.inTran(context));
+ }
+
+ public void commit()
+ {
+ assertTrue("Expected commit not to be performed under txn", !store.inTran(context));
+ }
+ }
+
+}