summaryrefslogtreecommitdiff
path: root/java/broker/src/test
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-06-19 09:01:59 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-06-19 09:01:59 +0000
commit913d1a55b290f9a8295d5e8396c696d3cee73bc0 (patch)
treea4dde827f8b825e6535197cc12df347bd8d064db /java/broker/src/test
parentf3fc904893b8c345b1aa358816d118fd0aad7d8b (diff)
downloadqpid-python-913d1a55b290f9a8295d5e8396c696d3cee73bc0.tar.gz
QPID-950 : Broker refactoring, copied / merged from branch
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@669431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/test')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java176
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java11
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java110
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java83
5 files changed, 220 insertions, 162 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index 2898cb38a6..3e8b1d0998 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -22,9 +22,7 @@ package org.apache.qpid.server.exchange;
import junit.framework.TestCase;
import junit.framework.Assert;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -33,6 +31,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.protocol.TestMinaProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -43,57 +42,51 @@ import java.util.LinkedList;
public class DestWildExchangeTest extends TestCase
{
- DestWildExchange _exchange;
+ TopicExchange _exchange;
VirtualHost _vhost;
MessageStore _store;
StoreContext _context;
+ TestMinaProtocolSession _protocolSession;
+
public void setUp() throws AMQException
{
- _exchange = new DestWildExchange();
+ _exchange = new TopicExchange();
_vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
_store = new MemoryMessageStore();
_context = new StoreContext();
+ _protocolSession = new TestMinaProtocolSession();
}
public void testNoRoute() throws AMQException
{
- AMQQueue queue = new AMQQueue(new AMQShortString("a*#b"), false, null, false, _vhost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b"));
- AMQMessage message = new AMQMessage(0L, info, null);
+ IncomingMessage message = new IncomingMessage(0L, info, null, _protocolSession);
- try
- {
- _exchange.route(message);
- fail("Message has no route and shouldn't be routed");
- }
- catch (NoRouteException nre)
- {
- //normal
- }
+ _exchange.route(message);
Assert.assertEquals(0, queue.getMessageCount());
}
public void testDirectMatch() throws AMQException
{
- AMQQueue queue = new AMQQueue(new AMQShortString("ab"), false, null, false, _vhost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("ab"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.b"), queue, null);
- AMQMessage message = createMessage("a.b");
+ IncomingMessage message = createMessage("a.b");
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -102,7 +95,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -112,8 +105,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
fail("Message has no route and should fail to be routed");
}
catch (AMQException nre)
@@ -126,16 +118,15 @@ public class DestWildExchangeTest extends TestCase
public void testStarMatch() throws AMQException
{
- AMQQueue queue = new AMQQueue(new AMQShortString("a*"), false, null, false, _vhost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.*"), queue, null);
- AMQMessage message = createMessage("a.b");
+ IncomingMessage message = createMessage("a.b");
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -144,7 +135,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -154,8 +145,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -164,7 +154,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -174,8 +164,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
fail("Message has no route and should fail to be routed");
}
catch (AMQException nre)
@@ -187,16 +176,15 @@ public class DestWildExchangeTest extends TestCase
public void testHashMatch() throws AMQException
{
- AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.#"), queue, null);
- AMQMessage message = createMessage("a.b.c");
+ IncomingMessage message = createMessage("a.b.c");
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -205,7 +193,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -215,8 +203,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -225,7 +212,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -235,8 +222,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -245,7 +231,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -254,8 +240,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -264,7 +249,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -274,8 +259,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
fail("Message has no route and should fail to be routed");
}
catch (AMQException nre)
@@ -288,16 +272,15 @@ public class DestWildExchangeTest extends TestCase
public void testMidHash() throws AMQException
{
- AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
- AMQMessage message = createMessage("a.c.d.b");
+ IncomingMessage message = createMessage("a.c.d.b");
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -306,7 +289,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -315,8 +298,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -325,7 +307,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -334,16 +316,15 @@ public class DestWildExchangeTest extends TestCase
public void testMatchafterHash() throws AMQException
{
- AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.*.#.b.c"), queue, null);
- AMQMessage message = createMessage("a.c.b.b");
+ IncomingMessage message = createMessage("a.c.b.b");
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
fail("Message has route and should not be routed");
}
catch (AMQException nre)
@@ -357,8 +338,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -367,7 +347,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -376,8 +356,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
fail("Message has route and should not be routed");
}
catch (AMQException nre)
@@ -390,8 +369,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -401,7 +379,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -411,16 +389,15 @@ public class DestWildExchangeTest extends TestCase
public void testHashAfterHash() throws AMQException
{
- AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.*.#.b.c.#.d"), queue, null);
- AMQMessage message = createMessage("a.c.b.b.c");
+ IncomingMessage message = createMessage("a.c.b.b.c");
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
fail("Message has route and should not be routed");
}
catch (AMQException nre)
@@ -434,8 +411,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -444,7 +420,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -453,16 +429,15 @@ public class DestWildExchangeTest extends TestCase
public void testHashHash() throws AMQException
{
- AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a#"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.#.*.#.d"), queue, null);
- AMQMessage message = createMessage("a.c.b.b.c");
+ IncomingMessage message = createMessage("a.c.b.b.c");
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
fail("Message has route and should not be routed");
}
catch (AMQException nre)
@@ -475,8 +450,7 @@ public class DestWildExchangeTest extends TestCase
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
}
catch (AMQException nre)
{
@@ -485,7 +459,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -494,16 +468,15 @@ public class DestWildExchangeTest extends TestCase
public void testSubMatchFails() throws AMQException
{
- AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.b.c.d"), queue, null);
- AMQMessage message = createMessage("a.b.c");
+ IncomingMessage message = createMessage("a.b.c");
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
fail("Message has route and should not be routed");
}
catch (AMQException nre)
@@ -514,18 +487,25 @@ public class DestWildExchangeTest extends TestCase
}
+ private void routeMessage(final IncomingMessage message)
+ throws AMQException
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, new MessageHandleFactory());
+ message.deliverToQueues();
+ }
+
public void testMoreRouting() throws AMQException
{
- AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.b"), queue, null);
- AMQMessage message = createMessage("a.b.c");
+ IncomingMessage message = createMessage("a.b.c");
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
fail("Message has route and should not be routed");
}
catch (AMQException nre)
@@ -538,16 +518,15 @@ public class DestWildExchangeTest extends TestCase
public void testMoreQueue() throws AMQException
{
- AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.b"), queue, null);
- AMQMessage message = createMessage("a");
+ IncomingMessage message = createMessage("a");
try
{
- _exchange.route(message);
- message.routingComplete(_store, _context, new MessageHandleFactory());
+ routeMessage(message);
fail("Message has route and should not be routed");
}
catch (AMQException nre)
@@ -558,7 +537,7 @@ public class DestWildExchangeTest extends TestCase
}
- private AMQMessage createMessage(String s) throws AMQException
+ private IncomingMessage createMessage(String s) throws AMQException
{
MessagePublishInfo info = new PublishInfo(new AMQShortString(s));
@@ -566,8 +545,9 @@ public class DestWildExchangeTest extends TestCase
new LinkedList<RequiredDeliveryException>()
);
- AMQMessage message = new AMQMessage(0L, info, trancontext);
- message.setContentHeaderBody(new ContentHeaderBody());
+ IncomingMessage message = new IncomingMessage(0L, info, trancontext,_protocolSession);
+ message.setContentHeaderBody( new ContentHeaderBody());
+
return message;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
index 18d8592817..2a2bc72950 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
@@ -21,8 +21,9 @@
package org.apache.qpid.server.exchange;
import junit.framework.TestCase;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.management.ManagedObject;
@@ -30,7 +31,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import java.util.ArrayList;
@@ -50,7 +50,7 @@ public class ExchangeMBeanTest extends TestCase
public void testDirectExchangeMBean() throws Exception
{
- DestNameExchange exchange = new DestNameExchange();
+ DirectExchange exchange = new DirectExchange();
exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -77,7 +77,7 @@ public class ExchangeMBeanTest extends TestCase
public void testTopicExchangeMBean() throws Exception
{
- DestWildExchange exchange = new DestWildExchange();
+ TopicExchange exchange = new TopicExchange();
exchange.initialise(_virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -132,7 +132,8 @@ public class ExchangeMBeanTest extends TestCase
IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
_queueRegistry = _virtualHost.getQueueRegistry();
- _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("ExchangeMBeanTest"), false, _virtualHost);
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("ExchangeMBeanTest"), false, _virtualHost,
+ null);
_queueRegistry.registerQueue(_queue);
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
index 0c0d8f471e..113944cf7e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
@@ -30,9 +30,11 @@ public class TestMinaProtocolSession extends AMQMinaProtocolSession
{
public TestMinaProtocolSession() throws AMQException
{
+
super(new TestIoSession(),
ApplicationRegistry.getInstance().getVirtualHostRegistry(),
new AMQCodecFactory(true));
+
}
public ProtocolOutputConverter getProtocolOutputConverter()
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 65db2a6425..ca614e053a 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -32,18 +32,23 @@ import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.TestMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.mina.common.ByteBuffer;
import javax.management.Notification;
import java.util.LinkedList;
+import java.util.Collections;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
-{
+{
private final static long MAX_MESSAGE_COUNT = 50;
private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec
private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB
@@ -51,13 +56,14 @@ public class AMQQueueAlertTest extends TestCase
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
private VirtualHost _virtualHost;
- private AMQMinaProtocolSession protocolSession = null;
+ private AMQMinaProtocolSession _protocolSession;
private MessageStore _messageStore = new MemoryMessageStore();
private StoreContext _storeContext = new StoreContext();
private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
null,
new LinkedList<RequiredDeliveryException>()
);
+ private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
/**
* Tests if the alert gets thrown when message count increases the threshold limit
@@ -66,8 +72,9 @@ public class AMQQueueAlertTest extends TestCase
*/
public void testMessageCountAlert() throws Exception
{
- _queue = new AMQQueue(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"),
- false, _virtualHost);
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"),
+ false, _virtualHost,
+ null);
_queueMBean = (AMQQueueMBean) _queue.getManagedObject();
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
@@ -89,8 +96,9 @@ public class AMQQueueAlertTest extends TestCase
*/
public void testMessageSizeAlert() throws Exception
{
- _queue = new AMQQueue(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"),
- false, _virtualHost);
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"),
+ false, _virtualHost,
+ null);
_queueMBean = (AMQQueueMBean) _queue.getManagedObject();
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
_queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
@@ -114,8 +122,9 @@ public class AMQQueueAlertTest extends TestCase
*/
public void testQueueDepthAlertNoSubscriber() throws Exception
{
- _queue = new AMQQueue(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"),
- false, _virtualHost);
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"),
+ false, _virtualHost,
+ null);
_queueMBean = (AMQQueueMBean) _queue.getManagedObject();
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
_queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
@@ -142,8 +151,9 @@ public class AMQQueueAlertTest extends TestCase
*/
public void testMessageAgeAlert() throws Exception
{
- _queue = new AMQQueue(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"),
- false, _virtualHost);
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"),
+ false, _virtualHost,
+ null);
_queueMBean = (AMQQueueMBean) _queue.getManagedObject();
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
_queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
@@ -167,18 +177,23 @@ public class AMQQueueAlertTest extends TestCase
This test sends some messages to the queue with subscribers needing message to be acknowledged.
The messages will not be acknowledged and will be required twice. Why we are checking this is because
the bug reported said that the queueDepth keeps increasing when messages are requeued.
+ // TODO - queue depth now includes unacknowledged messages so does not go down when messages are delivered
+
The QueueDepth should decrease when messages are delivered from the queue (QPID-408)
*/
public void testQueueDepthAlertWithSubscribers() throws Exception
{
- protocolSession = new TestMinaProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore);
- protocolSession.addChannel(channel);
+ _protocolSession = new TestMinaProtocolSession();
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+ _protocolSession.addChannel(channel);
// Create queue
_queue = getNewQueue();
- _queue.registerProtocolSession(protocolSession, channel.getChannelId(),
- new AMQShortString("consumer_tag"), true, null, false, false);
+ Subscription subscription =
+ SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), _protocolSession, new AMQShortString("consumer_tag"), true, null, false, channel.getCreditManager());
+
+ _queue.registerSubscription(
+ subscription, false);
_queueMBean = (AMQQueueMBean) _queue.getManagedObject();
_queueMBean.setMaximumMessageCount(9999l); // Set a high value, because this is not being tested
@@ -191,13 +206,13 @@ public class AMQQueueAlertTest extends TestCase
// Check queueDepth. There should be no messages on the queue and as the subscriber is listening
// so there should be no Queue_Deoth alert raised
- assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
+ assertEquals(new Long(totalSize), new Long(_queueMBean.getQueueDepth()));
Notification lastNotification = _queueMBean.getLastNotification();
- assertNull(lastNotification);
+// assertNull(lastNotification);
// Kill the subscriber and check for the queue depth values.
// Messages are unacknowledged, so those should get requeued. All messages should be on the Queue
- _queue.unregisterProtocolSession(protocolSession, channel.getChannelId(), new AMQShortString("consumer_tag"));
+ _queue.unregisterSubscription(subscription);
channel.requeue();
assertEquals(new Long(totalSize), new Long(_queueMBean.getQueueDepth()));
@@ -209,29 +224,32 @@ public class AMQQueueAlertTest extends TestCase
// Connect a consumer again and check QueueDepth values. The queue should get emptied.
// Messages will get delivered but still are unacknowledged.
- _queue.registerProtocolSession(protocolSession, channel.getChannelId(),
- new AMQShortString("consumer_tag"), true, null, false, false);
- _queue.deliverAsync();
- while (_queue.getMessageCount() != 0)
+ Subscription subscription2 =
+ SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), _protocolSession, new AMQShortString("consumer_tag"), true, null, false, channel.getCreditManager());
+
+ _queue.registerSubscription(
+ subscription2, false);
+
+ while (_queue.getUndeliveredMessageCount()!= 0)
{
Thread.sleep(100);
}
- assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
+// assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
// Kill the subscriber again. Now those messages should get requeued again. Check if the queue depth
// value is correct.
- _queue.unregisterProtocolSession(protocolSession, channel.getChannelId(), new AMQShortString("consumer_tag"));
+ _queue.unregisterSubscription(subscription2);
channel.requeue();
assertEquals(new Long(totalSize), new Long(_queueMBean.getQueueDepth()));
- protocolSession.closeSession();
+ _protocolSession.closeSession();
// Check the clear queue
_queueMBean.clearQueue();
assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
}
- protected AMQMessage message(final boolean immediate, long size) throws AMQException
+ protected IncomingMessage message(final boolean immediate, long size) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
{
@@ -264,9 +282,9 @@ public class AMQQueueAlertTest extends TestCase
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = size; // in bytes
- AMQMessage message = new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext);
+ IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
message.setContentHeaderBody(contentHeaderBody);
- message.setPublisher(protocolSession);
+
return message;
}
@@ -276,30 +294,52 @@ public class AMQQueueAlertTest extends TestCase
super.setUp();
IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _protocolSession = new TestMinaProtocolSession();
+
}
- private void sendMessages(long messageCount, long size) throws AMQException
+ private void sendMessages(long messageCount, final long size) throws AMQException
{
- AMQMessage[] messages = new AMQMessage[(int) messageCount];
+ IncomingMessage[] messages = new IncomingMessage[(int) messageCount];
for (int i = 0; i < messages.length; i++)
{
messages[i] = message(false, size);
- messages[i].enqueue(_queue);
- messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+ messages[i].enqueue(Collections.singleton(_queue));
+ messages[i].routingComplete(_messageStore, new MessageHandleFactory());
+
}
for (int i = 0; i < messageCount; i++)
{
- _queue.process(_storeContext, new QueueEntry(_queue,messages[i]), false);
+ messages[i].addContentBodyFrame(new ContentChunk(){
+
+ ByteBuffer _data = ByteBuffer.allocate((int)size);
+
+ public int getSize()
+ {
+ return (int) size;
+ }
+
+ public ByteBuffer getData()
+ {
+ return _data;
+ }
+
+ public void reduceToFit()
+ {
+
+ }
+ });
+ messages[i].deliverToQueues();
}
}
private AMQQueue getNewQueue() throws AMQException
{
- return new AMQQueue(new AMQShortString("testQueue" + Math.random()),
+ return AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue" + Math.random()),
false,
new AMQShortString("AMQueueAlertTest"),
false,
- _virtualHost);
+ _virtualHost, null);
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 9b874d63e8..bf0a8a6d90 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -27,8 +27,12 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactory;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.TestMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -44,6 +48,7 @@ import org.apache.mina.common.ByteBuffer;
import javax.management.JMException;
import java.util.LinkedList;
+import java.util.Collections;
/**
* Test class to test AMQQueueMBean attribtues and operations
@@ -58,6 +63,7 @@ public class AMQQueueMBeanTest extends TestCase
private TransactionalContext _transactionalContext;
private VirtualHost _virtualHost;
private AMQProtocolSession _protocolSession;
+ private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
public void testMessageCountTransient() throws Exception
{
@@ -73,7 +79,7 @@ public class AMQQueueMBeanTest extends TestCase
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
_queueMBean.clearQueue();
- assertTrue(_queueMBean.getMessageCount() == 0);
+ assertEquals(0,(int)_queueMBean.getMessageCount());
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
//Ensure that the data has been removed from the Store
@@ -116,8 +122,8 @@ public class AMQQueueMBeanTest extends TestCase
public void testConsumerCount() throws AMQException
{
- SubscriptionManager mgr = _queue.getSubscribers();
- assertFalse(mgr.hasActiveSubscribers());
+
+ assertTrue(_queue.getActiveConsumerCount() == 0);
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
@@ -125,18 +131,21 @@ public class AMQQueueMBeanTest extends TestCase
AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
protocolSession.addChannel(channel);
- _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false);
- assertTrue(_queueMBean.getActiveConsumerCount() == 1);
+ Subscription subscription =
+ SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), protocolSession, new AMQShortString("test"), false, null, false, channel.getCreditManager());
+
+ _queue.registerSubscription(subscription, false);
+ assertEquals(1,(int)_queueMBean.getActiveConsumerCount());
+
- SubscriptionSet _subscribers = (SubscriptionSet) mgr;
- SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory();
+ SubscriptionFactory subscriptionFactory = SUBSCRIPTION_FACTORY;
Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(),
protocolSession,
new AMQShortString("S1"),
false,
null,
true,
- _queue);
+ channel.getCreditManager());
Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(),
protocolSession,
@@ -144,14 +153,14 @@ public class AMQQueueMBeanTest extends TestCase
false,
null,
true,
- _queue);
- _subscribers.addSubscriber(s1);
- _subscribers.addSubscriber(s2);
+ channel.getCreditManager());
+ _queue.registerSubscription(s1,false);
+ _queue.registerSubscription(s2,false);
assertTrue(_queueMBean.getActiveConsumerCount() == 3);
assertTrue(_queueMBean.getConsumerCount() == 3);
s1.close();
- assertTrue(_queueMBean.getActiveConsumerCount() == 2);
+ assertEquals(2, (int) _queueMBean.getActiveConsumerCount());
assertTrue(_queueMBean.getConsumerCount() == 3);
}
@@ -204,13 +213,34 @@ public class AMQQueueMBeanTest extends TestCase
}
- AMQMessage msg = message(false, false);
+ IncomingMessage msg = message(false, false);
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
- msg.enqueue(_queue);
- msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
- _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
+ msg.enqueue(Collections.singleton(_queue));
+ msg.routingComplete(_messageStore, new MessageHandleFactory());
+
+ msg.addContentBodyFrame(new ContentChunk()
+ {
+ ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE);
+
+ public int getSize()
+ {
+ return (int) MESSAGE_SIZE;
+ }
+
+ public ByteBuffer getData()
+ {
+ return _data;
+ }
+
+ public void reduceToFit()
+ {
+
+ }
+ });
+ msg.deliverToQueues();
+// _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
_queueMBean.viewMessageContent(id);
try
{
@@ -223,7 +253,7 @@ public class AMQQueueMBeanTest extends TestCase
}
}
- private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException
+ private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
{
@@ -258,7 +288,10 @@ public class AMQQueueMBeanTest extends TestCase
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
- return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
+ IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+ msg.setContentHeaderBody(contentHeaderBody);
+ return msg;
+
}
@Override
@@ -274,7 +307,8 @@ public class AMQQueueMBeanTest extends TestCase
new LinkedList<RequiredDeliveryException>()
);
- _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost,
+ null);
_queueMBean = new AMQQueueMBean(_queue);
_protocolSession = new TestMinaProtocolSession();
@@ -284,19 +318,20 @@ public class AMQQueueMBeanTest extends TestCase
{
for (int i = 0; i < messageCount; i++)
{
- AMQMessage currentMessage = message(false, persistent);
- currentMessage.enqueue(_queue);
+ IncomingMessage currentMessage = message(false, persistent);
+ currentMessage.enqueue(Collections.singleton(_queue));
// route header
- currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+ currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
// Add the body so we have somthing to test later
- currentMessage.addContentBodyFrame(_storeContext,
- _protocolSession.getMethodRegistry()
+ currentMessage.addContentBodyFrame(
+ _protocolSession.getMethodRegistry()
.getProtocolVersionMethodConverter()
.convertToContentChunk(
new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
MESSAGE_SIZE)));
+ currentMessage.deliverToQueues();
}