diff options
| author | Aidan Skinner <aidan@apache.org> | 2009-01-19 15:53:43 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2009-01-19 15:53:43 +0000 |
| commit | 0269fb662e844aa90ec659288fde3cd86643e6e4 (patch) | |
| tree | c5ad6c34d6a46507d46ddcfc4c3c6e9facf91613 /qpid/java/broker | |
| parent | b1f26965fd674c21cbbe5d7fa121d95d43c2aa39 (diff) | |
| download | qpid-python-0269fb662e844aa90ec659288fde3cd86643e6e4.tar.gz | |
QPID-1573: Move unit tests that were living in systests into appropriate module. Fix up a few bugs in other tests that this exposed.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@735735 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
15 files changed, 2961 insertions, 2 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java new file mode 100644 index 0000000000..b94f2ef76f --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + +import junit.framework.TestCase; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.ManagedBroker; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class AMQBrokerManagerMBeanTest extends TestCase +{ + private QueueRegistry _queueRegistry; + private ExchangeRegistry _exchangeRegistry; + + public void testExchangeOperations() throws Exception + { + String exchange1 = "testExchange1_" + System.currentTimeMillis(); + String exchange2 = "testExchange2_" + System.currentTimeMillis(); + String exchange3 = "testExchange3_" + System.currentTimeMillis(); + + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) == null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null); + + VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + + ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) vHost.getManagedObject()); + mbean.createNewExchange(exchange1, "direct", false); + mbean.createNewExchange(exchange2, "topic", false); + mbean.createNewExchange(exchange3, "headers", false); + + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) != null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) != null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) != null); + + mbean.unregisterExchange(exchange1); + mbean.unregisterExchange(exchange2); + mbean.unregisterExchange(exchange3); + + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) == null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null); + assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null); + } + + public void testQueueOperations() throws Exception + { + String queueName = "testQueue_" + System.currentTimeMillis(); + VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + + ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) vHost.getManagedObject()); + + assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null); + + mbean.createNewQueue(queueName, "test", false); + assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) != null); + + mbean.deleteQueue(queueName); + assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null); + } + + @Override + protected void setUp() throws Exception + { + super.setUp(); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + _queueRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry(); + _exchangeRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry(); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java new file mode 100644 index 0000000000..aa7cbbdf3c --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -0,0 +1,275 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.ack; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.AMQMessageHandle; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.TestMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; + +import java.util.*; + +public class TxAckTest extends TestCase +{ + private Scenario individual; + private Scenario multiple; + private Scenario combined; + + protected void setUp() throws Exception + { + super.setUp(); + + //ack only 5th msg + individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l)); + individual.update(5, false); + + //ack all up to and including 5th msg + multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l)); + multiple.update(5, true); + + //leave only 8th and 9th unacked + combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l)); + combined.update(3, false); + combined.update(5, true); + combined.update(7, true); + combined.update(2, true);//should be ignored + combined.update(1, false);//should be ignored + combined.update(10, false); + } + + @Override + protected void tearDown() throws Exception + { + individual.stop(); + multiple.stop(); + combined.stop(); + } + + public void testPrepare() throws AMQException + { + individual.prepare(); + multiple.prepare(); + combined.prepare(); + } + + public void testUndoPrepare() throws AMQException + { + individual.undoPrepare(); + multiple.undoPrepare(); + combined.undoPrepare(); + } + + public void testCommit() throws AMQException + { + individual.commit(); + multiple.commit(); + combined.commit(); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TxAckTest.class); + } + + private class Scenario + { + private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000); + private final TxAck _op = new TxAck(_map); + private final List<Long> _acked; + private final List<Long> _unacked; + private StoreContext _storeContext = new StoreContext(); + private AMQQueue _queue; + + Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception + { + TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(), + _storeContext, null, + new LinkedList<RequiredDeliveryException>() + ); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("test", new MemoryMessageStore()), + null); + + for (int i = 0; i < messageCount; i++) + { + long deliveryTag = i + 1; + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public void setExchange(AMQShortString exchange) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); + _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message)); + } + _acked = acked; + _unacked = unacked; + } + + void update(long deliverytag, boolean multiple) + { + _op.update(deliverytag, multiple); + } + + private void assertCount(List<Long> tags, int expected) + { + for (long tag : tags) + { + QueueEntry u = _map.get(tag); + assertTrue("Message not found for tag " + tag, u != null); + ((TestMessage) u.getMessage()).assertCountEquals(expected); + } + } + + void prepare() throws AMQException + { + _op.consolidate(); + _op.prepare(_storeContext); + + assertCount(_acked, -1); + assertCount(_unacked, 0); + + } + + void undoPrepare() + { + _op.consolidate(); + _op.undoPrepare(); + + assertCount(_acked, 1); + assertCount(_unacked, 0); + } + + void commit() + { + _op.consolidate(); + _op.commit(_storeContext); + + //check acked messages are removed from map + Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags()); + keys.retainAll(_acked); + assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty()); + //check unacked messages are still in map + keys = new HashSet<Long>(_unacked); + keys.removeAll(_map.getDeliveryTags()); + assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty()); + } + + public void stop() + { + _queue.stop(); + } + } + + private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) + { + final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, + null, + false); + try + { + amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), + publishBody, + new ContentHeaderBody() + { + public int getSize() + { + return 1; + } + }); + } + catch (AMQException e) + { + // won't happen + } + + + return amqMessageHandle; + } + + + private class TestMessage extends AMQMessage + { + private final long _tag; + private int _count; + + TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) + throws AMQException + { + super(createMessageHandle(messageId, publishBody), storeContext, publishBody); + _tag = tag; + } + + + public boolean incrementReference() + { + _count++; + return true; + } + + public void decrementReference(StoreContext context) + { + _count--; + } + + void assertCountEquals(int expected) + { + assertEquals("Wrong count for message with tag " + _tag, expected, _count); + } + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java new file mode 100644 index 0000000000..6dcb187a37 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -0,0 +1,562 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.queue.*; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.log4j.Logger; + +import java.util.*; + +public class AbstractHeadersExchangeTestBase extends TestCase +{ + private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class); + + private final HeadersExchange exchange = new HeadersExchange(); + protected final Set<TestQueue> queues = new HashSet<TestQueue>(); + + /** + * Not used in this test, just there to stub out the routing calls + */ + private MessageStore _store = new MemoryMessageStore(); + + private StoreContext _storeContext = new StoreContext(); + + private MessageHandleFactory _handleFactory = new MessageHandleFactory(); + + private int count; + + public void testDoNothing() + { + // this is here only to make junit under Eclipse happy + } + + protected TestQueue bindDefault(String... bindings) throws AMQException + { + return bind("Queue" + (++count), bindings); + } + + protected TestQueue bind(String queueName, String... bindings) throws AMQException + { + return bind(queueName, getHeaders(bindings)); + } + + protected TestQueue bind(String queue, FieldTable bindings) throws AMQException + { + return bind(new TestQueue(new AMQShortString(queue)), bindings); + } + + protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException + { + return bind(queue, getHeaders(bindings)); + } + + protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException + { + queues.add(queue); + exchange.registerQueue(null, queue, bindings); + return queue; + } + + + protected void route(Message m) throws AMQException + { + m.route(exchange); + m.getIncomingMessage().routingComplete(_store, _handleFactory); + if(m.getIncomingMessage().allContentReceived()) + { + m.getIncomingMessage().deliverToQueues(); + } + } + + protected void routeAndTest(Message m, TestQueue... expected) throws AMQException + { + routeAndTest(m, false, Arrays.asList(expected)); + } + + protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException + { + routeAndTest(m, expectReturn, Arrays.asList(expected)); + } + + protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException + { + routeAndTest(m, false, expected); + } + + protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException + { + try + { + route(m); + assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn); + for (TestQueue q : queues) + { + if (expected.contains(q)) + { + assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m)); + //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; + } + else + { + assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m)); + //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + } + } + } + + catch (NoRouteException ex) + { + assertTrue("Expected "+m+" not to be returned",expectReturn); + } + + } + + static FieldTable getHeaders(String... entries) + { + FieldTable headers = FieldTableFactory.newFieldTable(); + for (String s : entries) + { + String[] parts = s.split("=", 2); + headers.setObject(parts[0], parts.length > 1 ? parts[1] : ""); + } + return headers; + } + + + static final class MessagePublishInfoImpl implements MessagePublishInfo + { + private AMQShortString _exchange; + private boolean _immediate; + private boolean _mandatory; + private AMQShortString _routingKey; + + public MessagePublishInfoImpl(AMQShortString routingKey) + { + _routingKey = routingKey; + } + + public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey) + { + _exchange = exchange; + _immediate = immediate; + _mandatory = mandatory; + _routingKey = routingKey; + } + + public AMQShortString getExchange() + { + return _exchange; + } + + public boolean isImmediate() + { + return _immediate; + + } + + public boolean isMandatory() + { + return _mandatory; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + + + public void setExchange(AMQShortString exchange) + { + _exchange = exchange; + } + + public void setImmediate(boolean immediate) + { + _immediate = immediate; + } + + public void setMandatory(boolean mandatory) + { + _mandatory = mandatory; + } + + public void setRoutingKey(AMQShortString routingKey) + { + _routingKey = routingKey; + } + } + + static MessagePublishInfo getPublishRequest(final String id) + { + return new MessagePublishInfoImpl(null, false, false, new AMQShortString(id)); + } + + static ContentHeaderBody getContentHeader(FieldTable headers) + { + ContentHeaderBody header = new ContentHeaderBody(); + header.properties = getProperties(headers); + return header; + } + + static BasicContentHeaderProperties getProperties(FieldTable headers) + { + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + properties.setHeaders(headers); + return properties; + } + + static class TestQueue extends SimpleAMQQueue + { + final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); + + public TestQueue(AMQShortString name) throws AMQException + { + super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test")); + ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry().registerQueue(this); + } + + /** + * We override this method so that the default behaviour, which attempts to use a delivery manager, is + * not invoked. It is unnecessary since for this test we only care to know whether the message was + * sent to the queue; the queue processing logic is not being tested. + * @param msg + * @throws AMQException + */ + @Override + public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException + { + messages.add( new HeadersExchangeTest.Message(msg)); + return new QueueEntry() + { + + public AMQQueue getQueue() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public AMQMessage getMessage() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getSize() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean getDeliveredToConsumer() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean expired() throws AMQException + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isAcquired() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean acquire() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean acquire(Subscription sub) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean delete() + { + return false; + } + + public boolean isDeleted() + { + return false; + } + + public boolean acquiredBySubscription() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setDeliveredToSubscription() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void release() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public String debugIdentity() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean immediateAndNotDelivered() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setRedelivered(boolean b) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public Subscription getDeliveredSubscription() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void reject() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void reject(Subscription subscription) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isRejectedBy(Subscription subscription) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void requeue(StoreContext storeContext) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void dequeue(final StoreContext storeContext) throws FailedDequeueException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void dispose(final StoreContext storeContext) throws MessageCleanupException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void restoreCredit() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isQueueDeleted() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void addStateChangeListener(StateChangeListener listener) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean removeStateChangeListener(StateChangeListener listener) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public int compareTo(final QueueEntry o) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + }; + } + + boolean isInQueue(Message msg) + { + return messages.contains(msg); + } + + } + + /** + * Just add some extra utility methods to AMQMessage to aid testing. + */ + static class Message extends AMQMessage + { + private class TestIncomingMessage extends IncomingMessage + { + + public TestIncomingMessage(final long messageId, + final MessagePublishInfo info, + final TransactionalContext txnContext, + final AMQProtocolSession publisher) + { + super(messageId, info, txnContext, publisher); + } + + + public AMQMessage getUnderlyingMessage() + { + return Message.this; + } + + + public ContentHeaderBody getContentHeaderBody() + { + try + { + return Message.this.getContentHeaderBody(); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + } + + private IncomingMessage _incoming; + + private static MessageStore _messageStore = new SkeletonMessageStore(); + + private static StoreContext _storeContext = new StoreContext(); + + + private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, + null, + new LinkedList<RequiredDeliveryException>() + ); + + Message(String id, String... headers) throws AMQException + { + this(id, getHeaders(headers)); + } + + Message(String id, FieldTable headers) throws AMQException + { + this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null); + } + + public IncomingMessage getIncomingMessage() + { + return _incoming; + } + + private Message(long messageId, + MessagePublishInfo publish, + ContentHeaderBody header, + List<ContentBody> bodies) throws AMQException + { + super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish); + + + + _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore)); + _incoming.setContentHeaderBody(header); + + + } + + private static AMQMessageHandle createMessageHandle(final long messageId, + final MessagePublishInfo publish, + final ContentHeaderBody header) + { + + final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, + _messageStore, + true); + + try + { + amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header); + } + catch (AMQException e) + { + + } + return amqMessageHandle; + } + + private Message(AMQMessage msg) throws AMQException + { + super(msg); + } + + + + void route(Exchange exchange) throws AMQException + { + exchange.route(_incoming); + } + + + public int hashCode() + { + return getKey().hashCode(); + } + + public boolean equals(Object o) + { + return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); + } + + private boolean equals(HeadersExchangeTest.Message m) + { + return getKey().equals(m.getKey()); + } + + public String toString() + { + return getKey().toString(); + } + + private Object getKey() + { + try + { + return getMessagePublishInfo().getRoutingKey(); + } + catch (AMQException e) + { + _log.error("Error getting routing key: " + e, e); + return null; + } + } + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java new file mode 100644 index 0000000000..fd11ddeae2 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.NullApplicationRegistry; +import org.apache.qpid.framing.BasicPublishBody; + +public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase +{ + protected void setUp() throws Exception + { + super.setUp(); + ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); + } + + protected void tearDown() + { + ApplicationRegistry.remove(1); + } + + public void testSimple() throws AMQException + { + TestQueue q1 = bindDefault("F0000"); + TestQueue q2 = bindDefault("F0000=Aardvark"); + TestQueue q3 = bindDefault("F0001"); + TestQueue q4 = bindDefault("F0001=Bear"); + TestQueue q5 = bindDefault("F0000", "F0001"); + TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear"); + TestQueue q7 = bindDefault("F0000", "F0001=Bear"); + TestQueue q8 = bindDefault("F0000=Aardvark", "F0001"); + + routeAndTest(new Message("Message1", "F0000"), q1); + routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2); + routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8); + routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7); + routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), + q1, q2, q3, q4, q5, q6, q7, q8); + routeAndTest(new Message("Message6", "F0002")); + + Message m7 = new Message("Message7", "XXXXX"); + + MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo()); + pb7.setMandatory(true); + routeAndTest(m7,true); + + Message m8 = new Message("Message8", "F0000"); + MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo()); + pb8.setMandatory(true); + routeAndTest(m8,false,q1); + + + } + + public void testAny() throws AMQException + { + TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any"); + TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any"); + TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any"); + TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any"); + TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any"); + + routeAndTest(new Message("Message1", "F0000"), q1, q3); + routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4); + routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6); + routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6); + routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6); + routeAndTest(new Message("Message6", "F0002")); + } + + public void testMandatory() throws AMQException + { + bindDefault("F0000"); + Message m1 = new Message("Message1", "XXXXX"); + Message m2 = new Message("Message2", "F0000"); + MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo()); + pb1.setMandatory(true); + MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo()); + pb2.setMandatory(true); + routeAndTest(m1,true); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(HeadersExchangeTest.class); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/PluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/PluginTest.java new file mode 100644 index 0000000000..0762a7a561 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/PluginTest.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.plugins; + +import java.util.Map; + +import org.apache.qpid.server.exchange.ExchangeType; + +import junit.framework.TestCase; + +public class PluginTest extends TestCase +{ + + private static final String TEST_EXCHANGE_CLASS = "org.apache.qpid.extras.exchanges.example.TestExchangeType"; + private static final String PLUGIN_DIRECTORY = System.getProperty("example.plugin.target"); + + public void testLoadExchanges() throws Exception + { + PluginManager manager = new PluginManager(PLUGIN_DIRECTORY); + Map<String, ExchangeType<?>> exchanges = manager.getExchanges(); + assertNotNull("No exchanges found in "+PLUGIN_DIRECTORY, exchanges); + assertEquals("Wrong number of exchanges found in "+PLUGIN_DIRECTORY, + 2, exchanges.size()); + assertNotNull("Wrong exchange found in "+PLUGIN_DIRECTORY, + exchanges.get(TEST_EXCHANGE_CLASS)); + } + + public void testNoExchanges() throws Exception + { + PluginManager manager = new PluginManager("/path/to/nowhere"); + Map<String, ExchangeType<?>> exchanges = manager.getExchanges(); + assertNull("Exchanges found", exchanges); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java new file mode 100644 index 0000000000..8e7038eec3 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import junit.framework.TestCase; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; + +import javax.management.JMException; + +/** + * Test class to test MBean operations for AMQMinaProtocolSession. + */ +public class AMQProtocolSessionMBeanTest extends TestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class); + + private MessageStore _messageStore = new SkeletonMessageStore(); + private AMQMinaProtocolSession _protocolSession; + private AMQChannel _channel; + private AMQProtocolSessionMBean _mbean; + + public void testChannels() throws Exception + { + // check the channel count is correct + int channelCount = _mbean.channels().size(); + assertTrue(channelCount == 1); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue_" + System.currentTimeMillis()), + false, + new AMQShortString("test"), + true, + _protocolSession.getVirtualHost(), null); + AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore); + channel.setDefaultQueue(queue); + _protocolSession.addChannel(channel); + channelCount = _mbean.channels().size(); + assertTrue(channelCount == 2); + + // general properties test + _mbean.setMaximumNumberOfChannels(1000L); + assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L); + + // check APIs + AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore); + channel3.setLocalTransactional(); + _protocolSession.addChannel(channel3); + _mbean.rollbackTransactions(2); + _mbean.rollbackTransactions(3); + _mbean.commitTransactions(2); + _mbean.commitTransactions(3); + + // This should throw exception, because the channel does't exist + try + { + _mbean.commitTransactions(4); + fail(); + } + catch (JMException ex) + { + log.debug("expected exception is thrown :" + ex.getMessage()); + } + + // check if closing of session works + _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore)); + _mbean.closeConnection(); + try + { + channelCount = _mbean.channels().size(); + assertTrue(channelCount == 0); + // session is now closed so adding another channel should throw an exception + _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore)); + fail(); + } + catch (AMQException ex) + { + log.debug("expected exception is thrown :" + ex.getMessage()); + } + } + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + _protocolSession = + new AMQMinaProtocolSession(new MockIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true), + null); + _protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test")); + _channel = new AMQChannel(_protocolSession, 1, _messageStore); + _protocolSession.addChannel(_channel); + _mbean = (AMQProtocolSessionMBean) _protocolSession.getManagedObject(); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java new file mode 100644 index 0000000000..307dcf66fe --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; + +/** Test class to test MBean operations for AMQMinaProtocolSession. */ +public class MaxChannelsTest extends TestCase +{ + private IApplicationRegistry _appRegistry; + private AMQMinaProtocolSession _session; + + public void testChannels() throws Exception + { + _session = new AMQMinaProtocolSession(new MockIoSession(), _appRegistry + .getVirtualHostRegistry(), new AMQCodecFactory(true), null); + _session.setVirtualHost(_appRegistry.getVirtualHostRegistry().getVirtualHost("test")); + + // check the channel count is correct + int channelCount = _session.getChannels().size(); + assertEquals("Initial channel count wrong", 0, channelCount); + + long maxChannels = 10L; + _session.setMaximumNumberOfChannels(maxChannels); + assertEquals("Number of channels not correctly set.", new Long(maxChannels), _session.getMaximumNumberOfChannels()); + + + try + { + for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++) + { + _session.addChannel(new AMQChannel(_session, (int) currentChannel, null)); + } + } + catch (AMQException e) + { + assertEquals("Wrong exception recevied.", e.getErrorCode(), AMQConstant.NOT_ALLOWED); + } + assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size())); + } + + @Override + public void setUp() + { + _appRegistry = ApplicationRegistry.getInstance(1); + } + + @Override + public void tearDown() + { + try { + _session.closeSession(); + } catch (AMQException e) { + // Yikes + fail(e.getMessage()); + } + ApplicationRegistry.remove(1); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java new file mode 100644 index 0000000000..cf6366b513 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java @@ -0,0 +1,297 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.mina.common.*; +import org.apache.mina.common.support.DefaultCloseFuture; +import org.apache.mina.common.support.DefaultWriteFuture; + +import java.net.SocketAddress; +import java.net.InetSocketAddress; +import java.util.Set; + +public class MockIoSession implements IoSession +{ + private AMQProtocolSession _protocolSession; + + /** + * Stores the last response written + */ + private Object _lastWrittenObject; + + private boolean _closing; + + public MockIoSession() + { + } + + public Object getLastWrittenObject() + { + return _lastWrittenObject; + } + + public IoService getService() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public IoServiceConfig getServiceConfig() + { + return null; + } + + public IoHandler getHandler() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public IoSessionConfig getConfig() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public IoFilterChain getFilterChain() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public WriteFuture write(Object message) + { + WriteFuture wf = new DefaultWriteFuture(null); + _lastWrittenObject = message; + return wf; + } + + public CloseFuture close() + { + _closing = true; + CloseFuture cf = new DefaultCloseFuture(null); + cf.setClosed(); + return cf; + } + + public Object getAttachment() + { + return _protocolSession; + } + + public Object setAttachment(Object attachment) + { + Object current = _protocolSession; + _protocolSession = (AMQProtocolSession) attachment; + return current; + } + + public Object getAttribute(String key) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Object setAttribute(String key, Object value) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Object setAttribute(String key) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Object removeAttribute(String key) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean containsAttribute(String key) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public Set getAttributeKeys() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public TransportType getTransportType() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isConnected() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isClosing() + { + return _closing; + } + + public CloseFuture getCloseFuture() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public SocketAddress getRemoteAddress() + { + return new InetSocketAddress("127.0.0.1", 1234); //To change body of implemented methods use File | Settings | File Templates. + } + + public SocketAddress getLocalAddress() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public SocketAddress getServiceAddress() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public int getIdleTime(IdleStatus status) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getIdleTimeInMillis(IdleStatus status) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setIdleTime(IdleStatus status, int idleTime) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public int getWriteTimeout() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getWriteTimeoutInMillis() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setWriteTimeout(int writeTimeout) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public TrafficMask getTrafficMask() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setTrafficMask(TrafficMask trafficMask) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void suspendRead() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void suspendWrite() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void resumeRead() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void resumeWrite() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public long getReadBytes() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getWrittenBytes() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getReadMessages() + { + return 0L; + } + + public long getWrittenMessages() + { + return 0L; + } + + public long getWrittenWriteRequests() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public int getScheduledWriteRequests() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public int getScheduledWriteBytes() + { + return 0; //TODO + } + + public long getCreationTime() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getLastIoTime() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getLastReadTime() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getLastWriteTime() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isIdle(IdleStatus status) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public int getIdleCount(IdleStatus status) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getLastIdleTime(IdleStatus status) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java new file mode 100644 index 0000000000..9c2932c5e2 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -0,0 +1,420 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.flow.LimitlessCreditManager; +import org.apache.qpid.server.flow.Pre0_10CreditManager; +import org.apache.qpid.server.ack.UnacknowledgedMessageMap; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.TestMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.util.NullApplicationRegistry; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.Set; +import java.util.Collections; + +/** + * Tests that acknowledgements are handled correctly. + */ +public class AckTest extends TestCase +{ + private static final Logger _log = Logger.getLogger(AckTest.class); + + private Subscription _subscription; + + private MockProtocolSession _protocolSession; + + private TestMemoryMessageStore _messageStore; + + private StoreContext _storeContext = new StoreContext(); + + private AMQChannel _channel; + + private AMQQueue _queue; + + private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag"); + + protected void setUp() throws Exception + { + super.setUp(); + ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); + + _messageStore = new TestMemoryMessageStore(); + _protocolSession = new MockProtocolSession(_messageStore); + _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); + + _protocolSession.addChannel(_channel); + + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), + null); + } + + protected void tearDown() + { + ApplicationRegistry.remove(1); + } + + private void publishMessages(int count) throws AMQException + { + publishMessages(count, false); + } + + private void publishMessages(int count, boolean persistent) throws AMQException + { + TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, + new LinkedList<RequiredDeliveryException>() + ); + _queue.registerSubscription(_subscription,false); + MessageHandleFactory factory = new MessageHandleFactory(); + for (int i = 1; i <= count; i++) + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Establish some way to determine the version for the test. + MessagePublishInfo publishBody = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return new AMQShortString("someExchange"); + } + + public void setExchange(AMQShortString exchange) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return new AMQShortString("rk"); + } + }; + IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession); + //IncomingMessage msg2 = null; + if (persistent) + { + BasicContentHeaderProperties b = new BasicContentHeaderProperties(); + //This is DeliveryMode.PERSISTENT + b.setDeliveryMode((byte) 2); + ContentHeaderBody cb = new ContentHeaderBody(); + cb.properties = b; + msg.setContentHeaderBody(cb); + } + else + { + msg.setContentHeaderBody(new ContentHeaderBody()); + } + // we increment the reference here since we are not delivering the messaging to any queues, which is where + // the reference is normally incremented. The test is easier to construct if we have direct access to the + // subscription + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + msg.enqueue(qs); + msg.routingComplete(_messageStore, factory); + if(msg.allContentReceived()) + { + msg.deliverToQueues(); + } + // we manually send the message to the subscription + //_subscription.send(new QueueEntry(_queue,msg), _queue); + } + } + + /** + * Tests that the acknowledgements are correctly associated with a channel and + * order is preserved when acks are enabled + */ + public void testAckChannelAssociationTest() throws AMQException + { + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); + final int msgCount = 10; + publishMessages(msgCount, true); + + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == msgCount); + assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); + + Set<Long> deliveryTagSet = map.getDeliveryTags(); + int i = 1; + for (long deliveryTag : deliveryTagSet) + { + assertTrue(deliveryTag == i); + i++; + QueueEntry unackedMsg = map.get(deliveryTag); + assertTrue(unackedMsg.getQueue() == _queue); + } + + assertTrue(map.size() == msgCount); + assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); + } + + /** + * Tests that in no-ack mode no messages are retained + */ + public void testNoAckMode() throws AMQException + { + // false arg means no acks expected + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager()); + final int msgCount = 10; + publishMessages(msgCount); + + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == 0); + assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); + assertTrue(_messageStore.getContentBodyMap().size() == 0); + + } + + /** + * Tests that in no-ack mode no messages are retained + */ + public void testPersistentNoAckMode() throws AMQException + { + // false arg means no acks expected + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager()); + final int msgCount = 10; + publishMessages(msgCount, true); + + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == 0); + assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); + assertTrue(_messageStore.getContentBodyMap().size() == 0); + + } + + /** + * Tests that a single acknowledgement is handled correctly (i.e multiple flag not + * set case) + */ + public void testSingleAckReceivedTest() throws AMQException + { + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + final int msgCount = 10; + publishMessages(msgCount); + + _channel.acknowledgeMessage(5, false); + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == msgCount - 1); + + Set<Long> deliveryTagSet = map.getDeliveryTags(); + int i = 1; + for (long deliveryTag : deliveryTagSet) + { + assertTrue(deliveryTag == i); + QueueEntry unackedMsg = map.get(deliveryTag); + assertTrue(unackedMsg.getQueue() == _queue); + // 5 is the delivery tag of the message that *should* be removed + if (++i == 5) + { + ++i; + } + } + } + + /** + * Tests that a single acknowledgement is handled correctly (i.e multiple flag not + * set case) + */ + public void testMultiAckReceivedTest() throws AMQException + { + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + final int msgCount = 10; + publishMessages(msgCount); + + _channel.acknowledgeMessage(5, true); + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == 5); + + Set<Long> deliveryTagSet = map.getDeliveryTags(); + int i = 1; + for (long deliveryTag : deliveryTagSet) + { + assertTrue(deliveryTag == i + 5); + QueueEntry unackedMsg = map.get(deliveryTag); + assertTrue(unackedMsg.getQueue() == _queue); + ++i; + } + } + + /** + * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs. + */ + public void testMultiAckAllReceivedTest() throws AMQException + { + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + final int msgCount = 10; + publishMessages(msgCount); + + _channel.acknowledgeMessage(0, true); + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == 0); + + Set<Long> deliveryTagSet = map.getDeliveryTags(); + int i = 1; + for (long deliveryTag : deliveryTagSet) + { + assertTrue(deliveryTag == i + 5); + QueueEntry unackedMsg = map.get(deliveryTag); + assertTrue(unackedMsg.getQueue() == _queue); + ++i; + } + } + + /** + * A regression fixing QPID-1136 showed this up + * + * @throws Exception + */ + public void testMessageDequeueRestoresCreditTest() throws Exception + { + // Send 10 messages + Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); + + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, + DEFAULT_CONSUMER_TAG, true, null, false, creditManager); + final int msgCount = 1; + publishMessages(msgCount); + + _queue.deliverAsync(_subscription); + + _channel.acknowledgeMessage(1, false); + + // Check credit available + assertTrue("No credit available", creditManager.hasCredit()); + + } + + +/* + public void testPrefetchHighLow() throws AMQException + { + int lowMark = 5; + int highMark = 10; + + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + _channel.setPrefetchLowMarkCount(lowMark); + _channel.setPrefetchHighMarkCount(highMark); + + assertTrue(_channel.getPrefetchLowMarkCount() == lowMark); + assertTrue(_channel.getPrefetchHighMarkCount() == highMark); + + publishMessages(highMark); + + // at this point we should have sent out only highMark messages + // which have not bee received so will be queued up in the channel + // which should be suspended + assertTrue(_subscription.isSuspended()); + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == highMark); + + //acknowledge messages so we are just above lowMark + _channel.acknowledgeMessage(lowMark - 1, true); + + //we should still be suspended + assertTrue(_subscription.isSuspended()); + assertTrue(map.size() == lowMark + 1); + + //acknowledge one more message + _channel.acknowledgeMessage(lowMark, true); + + //and suspension should be lifted + assertTrue(!_subscription.isSuspended()); + + //pubilsh more msgs so we are just below the limit + publishMessages(lowMark - 1); + + //we should not be suspended + assertTrue(!_subscription.isSuspended()); + + //acknowledge all messages + _channel.acknowledgeMessage(0, true); + try + { + Thread.sleep(3000); + } + catch (InterruptedException e) + { + _log.error("Error: " + e, e); + } + //map will be empty + assertTrue(map.size() == 0); + } + +*/ +/* + public void testPrefetch() throws AMQException + { + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + _channel.setMessageCredit(5); + + assertTrue(_channel.getPrefetchCount() == 5); + + final int msgCount = 5; + publishMessages(msgCount); + + // at this point we should have sent out only 5 messages with a further 5 queued + // up in the channel which should now be suspended + assertTrue(_subscription.isSuspended()); + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == 5); + _channel.acknowledgeMessage(5, true); + assertTrue(!_subscription.isSuspended()); + try + { + Thread.sleep(3000); + } + catch (InterruptedException e) + { + _log.error("Error: " + e, e); + } + assertTrue(map.size() == 0); + } + +*/ + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(AckTest.class); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java new file mode 100644 index 0000000000..99c88fac3e --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -0,0 +1,262 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.framing.*; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.transport.Sender; + +import javax.security.sasl.SaslServer; +import java.util.HashMap; +import java.util.Map; +import java.security.Principal; + +/** + * A protocol session that can be used for testing purposes. + */ +public class MockProtocolSession implements AMQProtocolSession +{ + private MessageStore _messageStore; + + private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); + + public MockProtocolSession(MessageStore messageStore) + { + _messageStore = messageStore; + } + + public void dataBlockReceived(AMQDataBlock message) throws Exception + { + } + + public void writeFrame(AMQDataBlock frame) + { + } + + public AMQShortString getContextKey() + { + return null; + } + + public void setContextKey(AMQShortString contextKey) + { + } + + public AMQChannel getChannel(int channelId) + { + AMQChannel channel = _channelMap.get(channelId); + if (channel == null) + { + throw new IllegalArgumentException("Invalid channel id: " + channelId); + } + else + { + return channel; + } + } + + public void addChannel(AMQChannel channel) + { + if (channel == null) + { + throw new IllegalArgumentException("Channel must not be null"); + } + else + { + _channelMap.put(channel.getChannelId(), channel); + } + } + + public void closeChannel(int channelId) throws AMQException + { + } + + public void closeChannelOk(int channelId) + { + + } + + public boolean channelAwaitingClosure(int channelId) + { + return false; + } + + public void removeChannel(int channelId) + { + _channelMap.remove(channelId); + } + + public void initHeartbeats(int delay) + { + } + + public void closeSession() throws AMQException + { + } + + public void closeConnection(int channelId, AMQConnectionException e, boolean closeIoSession) throws AMQException + { + } + + public Object getKey() + { + return null; + } + + public String getLocalFQDN() + { + return null; + } + + public SaslServer getSaslServer() + { + return null; + } + + public void setSaslServer(SaslServer saslServer) + { + } + + public FieldTable getClientProperties() + { + return null; + } + + public void setClientProperties(FieldTable clientProperties) + { + } + + public Object getClientIdentifier() + { + return null; + } + + public VirtualHost getVirtualHost() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setVirtualHost(VirtualHost virtualHost) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void addSessionCloseTask(Task task) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void removeSessionCloseTask(Task task) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public ProtocolOutputConverter getProtocolOutputConverter() + { + return ProtocolOutputConverterRegistry.getConverter(this); + } + + public void setAuthorizedID(Principal authorizedID) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public Principal getAuthorizedID() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public MethodRegistry getMethodRegistry() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void methodFrameReceived(int channelId, AMQMethodBody body) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void contentHeaderReceived(int channelId, ContentHeaderBody body) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void contentBodyReceived(int channelId, ContentBody body) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public MethodDispatcher getMethodDispatcher() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public ProtocolSessionIdentifier getSessionIdentifier() + { + return null; + } + + public byte getProtocolMajorVersion() + { + return getProtocolVersion().getMajorVersion(); + } + + public byte getProtocolMinorVersion() + { + return getProtocolVersion().getMinorVersion(); + } + + + public ProtocolVersion getProtocolVersion() + { + return ProtocolVersion.getLatestSupportedVersion(); //To change body of implemented methods use File | Settings | File Templates. + } + + + public VersionSpecificRegistry getRegistry() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setSender(Sender<java.nio.ByteBuffer> sender) + { + // FIXME AS TODO + + } + + public void init() + { + // TODO Auto-generated method stub + + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java index f76c652793..f45d887dec 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java @@ -35,6 +35,7 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase public void test() throws AMQException { + assertEquals("References exist before start!", 0, ReferenceCountingExecutorService.getInstance().getReferenceCount()); VirtualHost test = ApplicationRegistry.getInstance(1).getVirtualHostRegistry().getVirtualHost("test"); try @@ -43,8 +44,8 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase new AMQShortString("owner"), false, test, null); - assertTrue("Creation did not start Pool.", !ReferenceCountingExecutorService.getInstance().getPool().isShutdown()); - + assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown()); + queue.stop(); assertEquals("References still exist", 0, ReferenceCountingExecutorService.getInstance().getReferenceCount()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java new file mode 100644 index 0000000000..f08a15a8a7 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -0,0 +1,155 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.Exchange; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A message store that does nothing. Designed to be used in tests that do not want to use any message store + * functionality. + */ +public class SkeletonMessageStore implements MessageStore +{ + private final AtomicLong _messageId = new AtomicLong(1); + + public void configure(String base, Configuration config) throws Exception + { + } + + public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void close() throws Exception + { + } + + public void removeMessage(StoreContext s, Long messageId) + { + } + + public void createExchange(Exchange exchange) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void removeExchange(Exchange exchange) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void createQueue(AMQQueue queue) throws AMQException + { + } + + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + { + } + + public void beginTran(StoreContext s) throws AMQException + { + } + + public boolean inTran(StoreContext sc) + { + return false; + } + + public void commitTran(StoreContext storeContext) throws AMQException + { + } + + public void abortTran(StoreContext storeContext) throws AMQException + { + } + + public List<AMQQueue> createQueues() throws AMQException + { + return null; + } + + public Long getNewMessageId() + { + return _messageId.getAndIncrement(); + } + + public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + { + + } + + public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException + { + + } + + public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) throws AMQException + { + return null; + } + + public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException + { + return null; + } + + public boolean isPersistent() + { + return false; + } + + public void removeQueue(final AMQQueue queue) throws AMQException + { + + } + + public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + { + + } + + public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + { + + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java new file mode 100644 index 0000000000..4e48435962 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.abstraction.ContentChunk; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.List; + +/** + * Adds some extra methods to the memory message store for testing purposes. + */ +public class TestMemoryMessageStore extends MemoryMessageStore +{ + public TestMemoryMessageStore() + { + _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); + _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); + } + + public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() + { + return _metaDataMap; + } + + public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() + { + return _contentBodyMap; + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java new file mode 100644 index 0000000000..2346660d25 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -0,0 +1,169 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.AMQMessageHandle; + +/** + * Tests that reference counting works correctly with AMQMessage and the message store + */ +public class TestReferenceCounting extends TestCase +{ + private TestMemoryMessageStore _store; + + private StoreContext _storeContext = new StoreContext(); + + + protected void setUp() throws Exception + { + super.setUp(); + _store = new TestMemoryMessageStore(); + } + + /** + * Check that when the reference count is decremented the message removes itself from the store + */ + public void testMessageGetsRemoved() throws AMQException + { + ContentHeaderBody chb = createPersistentContentHeader(); + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public void setExchange(AMQShortString exchange) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + + final long messageId = _store.getNewMessageId(); + AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); + messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb); + AMQMessage message = new AMQMessage(messageHandle, + _storeContext,info); + + message = message.takeReference(); + + // we call routing complete to set up the handle + // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); + + + assertEquals(1, _store.getMessageMetaDataMap().size()); + message.decrementReference(_storeContext); + assertEquals(1, _store.getMessageMetaDataMap().size()); + } + + private ContentHeaderBody createPersistentContentHeader() + { + ContentHeaderBody chb = new ContentHeaderBody(); + BasicContentHeaderProperties bchp = new BasicContentHeaderProperties(); + bchp.setDeliveryMode((byte)2); + chb.properties = bchp; + return chb; + } + + public void testMessageRemains() throws AMQException + { + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public void setExchange(AMQShortString exchange) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + final Long messageId = _store.getNewMessageId(); + final ContentHeaderBody chb = createPersistentContentHeader(); + AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); + messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb); + AMQMessage message = new AMQMessage(messageHandle, + _storeContext, + info); + + + message = message.takeReference(); + // we call routing complete to set up the handle + // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); + + + + assertEquals(1, _store.getMessageMetaDataMap().size()); + message = message.takeReference(); + message.decrementReference(_storeContext); + assertEquals(1, _store.getMessageMetaDataMap().size()); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TestReferenceCounting.class); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java new file mode 100644 index 0000000000..84d3d313d1 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -0,0 +1,306 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; + +import java.util.LinkedList; +import java.util.NoSuchElementException; + +public class TxnBufferTest extends TestCase +{ + private final LinkedList<MockOp> ops = new LinkedList<MockOp>(); + + public void testCommit() throws AMQException + { + MockStore store = new MockStore(); + + TxnBuffer buffer = new TxnBuffer(); + buffer.enlist(new MockOp().expectPrepare().expectCommit()); + //check relative ordering + MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit(); + buffer.enlist(op); + buffer.enlist(op); + buffer.enlist(new MockOp().expectPrepare().expectCommit()); + + buffer.commit(null); + + validateOps(); + store.validate(); + } + + public void testRollback() throws AMQException + { + MockStore store = new MockStore(); + + TxnBuffer buffer = new TxnBuffer(); + buffer.enlist(new MockOp().expectRollback()); + buffer.enlist(new MockOp().expectRollback()); + buffer.enlist(new MockOp().expectRollback()); + + buffer.rollback(null); + + validateOps(); + store.validate(); + } + + public void testCommitWithFailureDuringPrepare() throws AMQException + { + MockStore store = new MockStore(); + store.beginTran(null); + + TxnBuffer buffer = new TxnBuffer(); + buffer.enlist(new StoreMessageOperation(store)); + buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare()); + buffer.enlist(new TxnTester(store)); + buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare()); + buffer.enlist(new FailedPrepare()); + buffer.enlist(new MockOp()); + + try + { + buffer.commit(null); + } + catch (NoSuchElementException e) + { + + } + + validateOps(); + store.validate(); + } + + public void testCommitWithPersistance() throws AMQException + { + MockStore store = new MockStore(); + store.beginTran(null); + store.expectCommit(); + + TxnBuffer buffer = new TxnBuffer(); + buffer.enlist(new MockOp().expectPrepare().expectCommit()); + buffer.enlist(new MockOp().expectPrepare().expectCommit()); + buffer.enlist(new MockOp().expectPrepare().expectCommit()); + buffer.enlist(new StoreMessageOperation(store)); + buffer.enlist(new TxnTester(store)); + + buffer.commit(null); + validateOps(); + store.validate(); + } + + private void validateOps() + { + for (MockOp op : ops) + { + op.validate(); + } + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TxnBufferTest.class); + } + + class MockOp implements TxnOp + { + final Object PREPARE = "PREPARE"; + final Object COMMIT = "COMMIT"; + final Object UNDO_PREPARE = "UNDO_PREPARE"; + final Object ROLLBACK = "ROLLBACK"; + + private final LinkedList expected = new LinkedList(); + + MockOp() + { + ops.add(this); + } + + public void prepare(StoreContext context) + { + assertEquals(expected.removeLast(), PREPARE); + } + + public void commit(StoreContext context) + { + assertEquals(expected.removeLast(), COMMIT); + } + + public void undoPrepare() + { + assertEquals(expected.removeLast(), UNDO_PREPARE); + } + + public void rollback(StoreContext context) + { + assertEquals(expected.removeLast(), ROLLBACK); + } + + private MockOp expect(Object optype) + { + expected.addFirst(optype); + return this; + } + + MockOp expectPrepare() + { + return expect(PREPARE); + } + + MockOp expectCommit() + { + return expect(COMMIT); + } + + MockOp expectUndoPrepare() + { + return expect(UNDO_PREPARE); + } + + MockOp expectRollback() + { + return expect(ROLLBACK); + } + + void validate() + { + assertEquals("Expected ops were not all invoked", new LinkedList(), expected); + } + + void clear() + { + expected.clear(); + } + } + + class MockStore extends TestMemoryMessageStore + { + final Object BEGIN = "BEGIN"; + final Object ABORT = "ABORT"; + final Object COMMIT = "COMMIT"; + + private final LinkedList expected = new LinkedList(); + private boolean inTran; + + public void beginTran(StoreContext context) throws AMQException + { + inTran = true; + } + + public void commitTran(StoreContext context) throws AMQException + { + assertEquals(expected.removeLast(), COMMIT); + inTran = false; + } + + public void abortTran(StoreContext context) throws AMQException + { + assertEquals(expected.removeLast(), ABORT); + inTran = false; + } + + public boolean inTran(StoreContext context) + { + return inTran; + } + + private MockStore expect(Object optype) + { + expected.addFirst(optype); + return this; + } + + MockStore expectBegin() + { + return expect(BEGIN); + } + + MockStore expectCommit() + { + return expect(COMMIT); + } + + MockStore expectAbort() + { + return expect(ABORT); + } + + void clear() + { + expected.clear(); + } + + void validate() + { + assertEquals("Expected ops were not all invoked", new LinkedList(), expected); + } + } + + class NullOp implements TxnOp + { + public void prepare(StoreContext context) throws AMQException + { + } + public void commit(StoreContext context) + { + } + public void undoPrepare() + { + } + public void rollback(StoreContext context) + { + } + } + + class FailedPrepare extends NullOp + { + public void prepare() throws AMQException + { + throw new AMQException(null, "Fail!", null); + } + } + + class TxnTester extends NullOp + { + private final MessageStore store; + + private final StoreContext context = new StoreContext(); + + TxnTester(MessageStore store) + { + this.store = store; + } + + public void prepare() throws AMQException + { + assertTrue("Expected prepare to be performed under txn", store.inTran(context)); + } + + public void commit() + { + assertTrue("Expected commit not to be performed under txn", !store.inTran(context)); + } + } + +} |
