summaryrefslogtreecommitdiff
path: root/java/broker/src/test
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-04-23 23:53:55 +0000
committerAidan Skinner <aidan@apache.org>2008-04-23 23:53:55 +0000
commit85723f9a0c9bdffad07b56dbbc5ec39dc8a70ba7 (patch)
treefd21d4ed90b3f134d3cacf2e9f6682c0d8cd1eae /java/broker/src/test
parentd947dab3de62830cbfb41dc989ec7a5ff6af8f55 (diff)
downloadqpid-python-85723f9a0c9bdffad07b56dbbc5ec39dc8a70ba7.tar.gz
QPID-832 copy the M2.x broker
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@651112 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/test')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java132
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java128
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/TestPropertyUtils.java50
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java610
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java138
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java199
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java295
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java52
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java305
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java304
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java73
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/LoggingProxyTest.java88
12 files changed, 2374 insertions, 0 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java b/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java
new file mode 100644
index 0000000000..d8b5f5f7e6
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.Level;
+
+import java.io.InputStream;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+public class RunBrokerWithCommand
+{
+ public static void main(String[] args)
+ {
+ //Start broker
+ try
+ {
+ String[] fudge = args.clone();
+
+ // Override the first value which is the command we are going to run later.
+ fudge[0] = "-v";
+ new Main(fudge).startup();
+ }
+ catch (Exception e)
+ {
+ System.err.println("Unable to start broker due to: " + e.getMessage());
+
+ e.printStackTrace();
+ exit(1);
+ }
+
+ Logger.getRootLogger().setLevel(Level.ERROR);
+
+ //run command
+ try
+ {
+ Process task = Runtime.getRuntime().exec(args[0]);
+ System.err.println("Started Proccess: " + args[0]);
+
+ InputStream inputStream = task.getInputStream();
+
+ InputStream errorStream = task.getErrorStream();
+
+ Thread out = new Thread(new Outputter("[OUT]", new BufferedReader(new InputStreamReader(inputStream))));
+ Thread err = new Thread(new Outputter("[ERR]", new BufferedReader(new InputStreamReader(errorStream))));
+
+ out.start();
+ err.start();
+
+ out.join();
+ err.join();
+
+ System.err.println("Waiting for process to exit: " + args[0]);
+ task.waitFor();
+ System.err.println("Done Proccess: " + args[0]);
+
+ }
+ catch (IOException e)
+ {
+ System.err.println("Proccess had problems: " + e.getMessage());
+ e.printStackTrace(System.err);
+ exit(1);
+ }
+ catch (InterruptedException e)
+ {
+ System.err.println("Proccess had problems: " + e.getMessage());
+ e.printStackTrace(System.err);
+
+ exit(1);
+ }
+
+
+ exit(0);
+ }
+
+ private static void exit(int i)
+ {
+ Logger.getRootLogger().setLevel(Level.INFO);
+ System.exit(i);
+ }
+
+ static class Outputter implements Runnable
+ {
+
+ BufferedReader reader;
+ String prefix;
+
+ Outputter(String s, BufferedReader r)
+ {
+ prefix = s;
+ reader = r;
+ }
+
+ public void run()
+ {
+ String line;
+ try
+ {
+ while ((line = reader.readLine()) != null)
+ {
+ System.out.println(prefix + line);
+ }
+ }
+ catch (IOException e)
+ {
+ System.out.println("Error occured reading; " + e.getMessage());
+ }
+ }
+
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java b/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java
new file mode 100644
index 0000000000..a0304a7b01
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java
@@ -0,0 +1,128 @@
+package org.apache.qpid.server;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.AMQException;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+public class SelectorParserTest extends TestCase
+{
+ public void testSelectorWithHyphen()
+ {
+ testPass("Cost = 2 AND \"property-with-hyphen\" = 'wibble'");
+ }
+
+ public void testLike()
+ {
+ testFail("Cost LIKE 2");
+ testPass("Cost LIKE 'Hello'");
+ }
+
+ public void testStringQuoted()
+ {
+ testPass("string = 'Test'");
+ }
+
+ public void testProperty()
+ {
+ testPass("prop1 = prop2");
+ }
+
+ public void testPropertyNames()
+ {
+ testPass("$min= TRUE AND _max= FALSE AND Prop_2 = true AND prop$3 = false");
+ }
+
+ public void testProtected()
+ {
+ testFail("NULL = 0 ");
+ testFail("TRUE = 0 ");
+ testFail("FALSE = 0 ");
+ testFail("NOT = 0 ");
+ testFail("AND = 0 ");
+ testFail("OR = 0 ");
+ testFail("BETWEEN = 0 ");
+ testFail("LIKE = 0 ");
+ testFail("IN = 0 ");
+ testFail("IS = 0 ");
+ testFail("ESCAPE = 0 ");
+ }
+
+
+ public void testBoolean()
+ {
+ testPass("min= TRUE AND max= FALSE ");
+ testPass("min= true AND max= false");
+ }
+
+ public void testDouble()
+ {
+ testPass("positive=31E2 AND negative=-31.4E3");
+ testPass("min=" + Double.MIN_VALUE + " AND max=" + Double.MAX_VALUE);
+ }
+
+ public void testLong()
+ {
+ testPass("minLong=" + Long.MIN_VALUE + "L AND maxLong=" + Long.MAX_VALUE + "L");
+ }
+
+ public void testInt()
+ {
+ testPass("minInt=" + Integer.MIN_VALUE + " AND maxInt=" + Integer.MAX_VALUE);
+ }
+
+ public void testSigned()
+ {
+ testPass("negative=-42 AND positive=+42");
+ }
+
+ public void testOctal()
+ {
+ testPass("octal=042");
+ }
+
+
+ private void testPass(String selector)
+ {
+ try
+ {
+ new JMSSelectorFilter(selector);
+ }
+ catch (AMQException e)
+ {
+ fail("Selector '" + selector + "' was not parsed :" + e.getMessage());
+ }
+ }
+
+ private void testFail(String selector)
+ {
+ try
+ {
+ new JMSSelectorFilter(selector);
+ fail("Selector '" + selector + "' was parsed ");
+ }
+ catch (AMQException e)
+ {
+ //normal path
+ }
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/TestPropertyUtils.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/TestPropertyUtils.java
new file mode 100644
index 0000000000..3b83190e42
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/TestPropertyUtils.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.qpid.configuration.PropertyException;
+import org.apache.qpid.configuration.PropertyUtils;
+
+import junit.framework.TestCase;
+
+// TODO: This belongs in the "common" module.
+public class TestPropertyUtils extends TestCase
+{
+ public void testSimpleExpansion() throws PropertyException
+ {
+ System.setProperty("banana", "fruity");
+ String expandedProperty = PropertyUtils.replaceProperties("${banana}");
+ assertEquals(expandedProperty, "fruity");
+ }
+
+ public void testDualExpansion() throws PropertyException
+ {
+ System.setProperty("banana", "fruity");
+ System.setProperty("concrete", "horrible");
+ String expandedProperty = PropertyUtils.replaceProperties("${banana}xyz${concrete}");
+ assertEquals(expandedProperty, "fruityxyzhorrible");
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TestPropertyUtils.class);
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
new file mode 100644
index 0000000000..2898cb38a6
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import junit.framework.Assert;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+import java.util.LinkedList;
+
+public class DestWildExchangeTest extends TestCase
+{
+
+ DestWildExchange _exchange;
+
+ VirtualHost _vhost;
+ MessageStore _store;
+ StoreContext _context;
+
+
+ public void setUp() throws AMQException
+ {
+ _exchange = new DestWildExchange();
+ _vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
+ _store = new MemoryMessageStore();
+ _context = new StoreContext();
+ }
+
+
+ public void testNoRoute() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a*#b"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
+
+
+ MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b"));
+
+ AMQMessage message = new AMQMessage(0L, info, null);
+
+ try
+ {
+ _exchange.route(message);
+ fail("Message has no route and shouldn't be routed");
+ }
+ catch (NoRouteException nre)
+ {
+ //normal
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+ }
+
+ public void testDirectMatch() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("ab"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+
+
+ AMQMessage message = createMessage("a.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has no route and should fail to be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+ }
+
+
+ public void testStarMatch() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a*"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.*"), queue, null);
+
+
+ AMQMessage message = createMessage("a.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has no route and should fail to be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+ }
+
+ public void testHashMatch() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.#"), queue, null);
+
+
+ AMQMessage message = createMessage("a.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ message = createMessage("a");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has no route and should fail to be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+ }
+
+
+ public void testMidHash() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
+
+
+ AMQMessage message = createMessage("a.c.d.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has no route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ message = createMessage("a.c.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has no route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+ public void testMatchafterHash() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.*.#.b.c"), queue, null);
+
+
+ AMQMessage message = createMessage("a.c.b.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a.a.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has no route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ message = createMessage("a.b.c.b");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ message = createMessage("a.b.c.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has no route and should be routed");
+
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+
+ public void testHashAfterHash() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.*.#.b.c.#.d"), queue, null);
+
+
+ AMQMessage message = createMessage("a.c.b.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+
+ message = createMessage("a.a.b.c.d");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has no route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+ public void testHashHash() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.#.*.#.d"), queue, null);
+
+
+ AMQMessage message = createMessage("a.c.b.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ message = createMessage("a.a.b.c.d");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ }
+ catch (AMQException nre)
+ {
+ fail("Message has no route and should be routed");
+ }
+
+ Assert.assertEquals(1, queue.getMessageCount());
+
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
+
+ queue.deleteMessageFromTop(_context);
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+ public void testSubMatchFails() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.b.c.d"), queue, null);
+
+
+ AMQMessage message = createMessage("a.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+ public void testMoreRouting() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+
+
+ AMQMessage message = createMessage("a.b.c");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+ public void testMoreQueue() throws AMQException
+ {
+ AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
+ _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
+
+
+ AMQMessage message = createMessage("a");
+
+ try
+ {
+ _exchange.route(message);
+ message.routingComplete(_store, _context, new MessageHandleFactory());
+ fail("Message has route and should not be routed");
+ }
+ catch (AMQException nre)
+ {
+ }
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ }
+
+ private AMQMessage createMessage(String s) throws AMQException
+ {
+ MessagePublishInfo info = new PublishInfo(new AMQShortString(s));
+
+ TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null,
+ new LinkedList<RequiredDeliveryException>()
+ );
+
+ AMQMessage message = new AMQMessage(0L, info, trancontext);
+ message.setContentHeaderBody(new ContentHeaderBody());
+
+ return message;
+ }
+
+
+ class PublishInfo implements MessagePublishInfo
+ {
+ AMQShortString _routingkey;
+
+ PublishInfo(AMQShortString routingkey)
+ {
+ _routingkey = routingkey;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return true;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingkey;
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
new file mode 100644
index 0000000000..18d8592817
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import java.util.ArrayList;
+
+/**
+ * Unit test class for testing different Exchange MBean operations
+ */
+public class ExchangeMBeanTest extends TestCase
+{
+ private AMQQueue _queue;
+ private QueueRegistry _queueRegistry;
+ private VirtualHost _virtualHost;
+
+ /**
+ * Test for direct exchange mbean
+ * @throws Exception
+ */
+
+ public void testDirectExchangeMBean() throws Exception
+ {
+ DestNameExchange exchange = new DestNameExchange();
+ exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
+ ManagedObject managedObj = exchange.getManagedObject();
+ ManagedExchange mbean = (ManagedExchange)managedObj;
+
+ mbean.createNewBinding(_queue.getName().toString(), "binding1");
+ mbean.createNewBinding(_queue.getName().toString(), "binding2");
+
+ TabularData data = mbean.bindings();
+ ArrayList<Object> list = new ArrayList<Object>(data.values());
+ assertTrue(list.size() == 2);
+
+ // test general exchange properties
+ assertEquals(mbean.getName(), "amq.direct");
+ assertEquals(mbean.getExchangeType(), "direct");
+ assertTrue(mbean.getTicketNo() == 0);
+ assertTrue(!mbean.isDurable());
+ assertTrue(mbean.isAutoDelete());
+ }
+
+ /**
+ * Test for "topic" exchange mbean
+ * @throws Exception
+ */
+
+ public void testTopicExchangeMBean() throws Exception
+ {
+ DestWildExchange exchange = new DestWildExchange();
+ exchange.initialise(_virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true);
+ ManagedObject managedObj = exchange.getManagedObject();
+ ManagedExchange mbean = (ManagedExchange)managedObj;
+
+ mbean.createNewBinding(_queue.getName().toString(), "binding1");
+ mbean.createNewBinding(_queue.getName().toString(), "binding2");
+
+ TabularData data = mbean.bindings();
+ ArrayList<Object> list = new ArrayList<Object>(data.values());
+ assertTrue(list.size() == 2);
+
+ // test general exchange properties
+ assertEquals(mbean.getName(), "amq.topic");
+ assertEquals(mbean.getExchangeType(), "topic");
+ assertTrue(mbean.getTicketNo() == 0);
+ assertTrue(!mbean.isDurable());
+ assertTrue(mbean.isAutoDelete());
+ }
+
+ /**
+ * Test for "Headers" exchange mbean
+ * @throws Exception
+ */
+
+ public void testHeadersExchangeMBean() throws Exception
+ {
+ HeadersExchange exchange = new HeadersExchange();
+ exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
+ ManagedObject managedObj = exchange.getManagedObject();
+ ManagedExchange mbean = (ManagedExchange)managedObj;
+
+ mbean.createNewBinding(_queue.getName().toString(), "key1=binding1,key2=binding2");
+ mbean.createNewBinding(_queue.getName().toString(), "key3=binding3");
+
+ TabularData data = mbean.bindings();
+ ArrayList<Object> list = new ArrayList<Object>(data.values());
+ assertTrue(list.size() == 2);
+
+ // test general exchange properties
+ assertEquals(mbean.getName(), "amq.match");
+ assertEquals(mbean.getExchangeType(), "headers");
+ assertTrue(mbean.getTicketNo() == 0);
+ assertTrue(!mbean.isDurable());
+ assertTrue(mbean.isAutoDelete());
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+ _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _queueRegistry = _virtualHost.getQueueRegistry();
+ _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("ExchangeMBeanTest"), false, _virtualHost);
+ _queueRegistry.registerQueue(_queue);
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
new file mode 100644
index 0000000000..86ba96bf5d
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -0,0 +1,199 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import junit.framework.TestCase;
+import org.apache.qpid.framing.FieldTable;
+
+/**
+ */
+public class HeadersBindingTest extends TestCase
+{
+ private FieldTable bindHeaders = new FieldTable();
+ private FieldTable matchHeaders = new FieldTable();
+
+ public void testDefault_1()
+ {
+ bindHeaders.setString("A", "Value of A");
+
+ matchHeaders.setString("A", "Value of A");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testDefault_2()
+ {
+ bindHeaders.setString("A", "Value of A");
+
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Value of B");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testDefault_3()
+ {
+ bindHeaders.setString("A", "Value of A");
+
+ matchHeaders.setString("A", "Altered value of A");
+
+ assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testAll_1()
+ {
+ bindHeaders.setString("X-match", "all");
+ bindHeaders.setString("A", "Value of A");
+
+ matchHeaders.setString("A", "Value of A");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testAll_2()
+ {
+ bindHeaders.setString("X-match", "all");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Value of A");
+
+ assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testAll_3()
+ {
+ bindHeaders.setString("X-match", "all");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Value of B");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testAll_4()
+ {
+ bindHeaders.setString("X-match", "all");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Value of B");
+ matchHeaders.setString("C", "Value of C");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testAll_5()
+ {
+ bindHeaders.setString("X-match", "all");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Altered value of B");
+ matchHeaders.setString("C", "Value of C");
+
+ assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testAny_1()
+ {
+ bindHeaders.setString("X-match", "any");
+ bindHeaders.setString("A", "Value of A");
+
+ matchHeaders.setString("A", "Value of A");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testAny_2()
+ {
+ bindHeaders.setString("X-match", "any");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Value of A");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testAny_3()
+ {
+ bindHeaders.setString("X-match", "any");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Value of B");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testAny_4()
+ {
+ bindHeaders.setString("X-match", "any");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Value of B");
+ matchHeaders.setString("C", "Value of C");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testAny_5()
+ {
+ bindHeaders.setString("X-match", "any");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Value of A");
+ matchHeaders.setString("B", "Altered value of B");
+ matchHeaders.setString("C", "Value of C");
+
+ assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public void testAny_6()
+ {
+ bindHeaders.setString("X-match", "any");
+ bindHeaders.setString("A", "Value of A");
+ bindHeaders.setString("B", "Value of B");
+
+ matchHeaders.setString("A", "Altered value of A");
+ matchHeaders.setString("B", "Altered value of B");
+ matchHeaders.setString("C", "Value of C");
+
+ assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(HeadersBindingTest.class);
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
new file mode 100644
index 0000000000..ff4d3ed9fb
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
@@ -0,0 +1,295 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.mina.common.*;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+
+import java.net.SocketAddress;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
+ * so if this class is being used and some methods are to be used, then please update those.
+ */
+public class TestIoSession implements IoSession
+{
+ private final ConcurrentMap attributes = new ConcurrentHashMap();
+
+ public TestIoSession()
+ {
+ }
+
+ public IoService getService()
+ {
+ return null;
+ }
+
+ public IoServiceConfig getServiceConfig()
+ {
+ return new TestIoConfig();
+ }
+
+ public IoHandler getHandler()
+ {
+ return null;
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return null;
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return null;
+ }
+
+ public WriteFuture write(Object message)
+ {
+ return null;
+ }
+
+ public CloseFuture close()
+ {
+ return null;
+ }
+
+ public Object getAttachment()
+ {
+ return getAttribute("");
+ }
+
+ public Object setAttachment(Object attachment)
+ {
+ return setAttribute("",attachment);
+ }
+
+ public Object getAttribute(String key)
+ {
+ return attributes.get(key);
+ }
+
+ public Object setAttribute(String key, Object value)
+ {
+ return attributes.put(key,value);
+ }
+
+ public Object setAttribute(String key)
+ {
+ return attributes.put(key, Boolean.TRUE);
+ }
+
+ public Object removeAttribute(String key)
+ {
+ return attributes.remove(key);
+ }
+
+ public boolean containsAttribute(String key)
+ {
+ return attributes.containsKey(key);
+ }
+
+ public Set getAttributeKeys()
+ {
+ return attributes.keySet();
+ }
+
+ public TransportType getTransportType()
+ {
+ return null;
+ }
+
+ public boolean isConnected()
+ {
+ return false;
+ }
+
+ public boolean isClosing()
+ {
+ return false;
+ }
+
+ public CloseFuture getCloseFuture()
+ {
+ return null;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return new InetSocketAddress("127.0.0.1", 1234);
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return null;
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return null;
+ }
+
+ public int getIdleTime(IdleStatus status)
+ {
+ return 0;
+ }
+
+ public long getIdleTimeInMillis(IdleStatus status)
+ {
+ return 0;
+ }
+
+ public void setIdleTime(IdleStatus status, int idleTime)
+ {
+
+ }
+
+ public int getWriteTimeout()
+ {
+ return 0;
+ }
+
+ public long getWriteTimeoutInMillis()
+ {
+ return 0;
+ }
+
+ public void setWriteTimeout(int writeTimeout)
+ {
+
+ }
+
+ public TrafficMask getTrafficMask()
+ {
+ return null;
+ }
+
+ public void setTrafficMask(TrafficMask trafficMask)
+ {
+
+ }
+
+ public void suspendRead()
+ {
+
+ }
+
+ public void suspendWrite()
+ {
+
+ }
+
+ public void resumeRead()
+ {
+
+ }
+
+ public void resumeWrite()
+ {
+
+ }
+
+ public long getReadBytes()
+ {
+ return 0;
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0;
+ }
+
+ public long getReadMessages()
+ {
+ return 0;
+ }
+
+ public long getWrittenMessages()
+ {
+ return 0;
+ }
+
+ public long getWrittenWriteRequests()
+ {
+ return 0;
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ return 0;
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ return 0;
+ }
+
+ public long getCreationTime()
+ {
+ return 0;
+ }
+
+ public long getLastIoTime()
+ {
+ return 0;
+ }
+
+ public long getLastReadTime()
+ {
+ return 0;
+ }
+
+ public long getLastWriteTime()
+ {
+ return 0;
+ }
+
+ public boolean isIdle(IdleStatus status)
+ {
+ return false;
+ }
+
+ public int getIdleCount(IdleStatus status)
+ {
+ return 0;
+ }
+
+ public long getLastIdleTime(IdleStatus status)
+ {
+ return 0;
+ }
+
+ /**
+ * Test implementation of IoServiceConfig
+ */
+ private class TestIoConfig extends SocketAcceptorConfig
+ {
+ public ThreadModel getThreadModel()
+ {
+ return ReadWriteThreadModel.getInstance();
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
new file mode 100644
index 0000000000..0c0d8f471e
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
+
+public class TestMinaProtocolSession extends AMQMinaProtocolSession
+{
+ public TestMinaProtocolSession() throws AMQException
+ {
+ super(new TestIoSession(),
+ ApplicationRegistry.getInstance().getVirtualHostRegistry(),
+ new AMQCodecFactory(true));
+ }
+
+ public ProtocolOutputConverter getProtocolOutputConverter()
+ {
+ return ProtocolOutputConverterRegistry.getConverter(this);
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return (byte)8;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return (byte)0;
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
new file mode 100644
index 0000000000..65db2a6425
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -0,0 +1,305 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+import javax.management.Notification;
+import java.util.LinkedList;
+
+/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
+public class AMQQueueAlertTest extends TestCase
+{
+ private final static long MAX_MESSAGE_COUNT = 50;
+ private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec
+ private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB
+ private final static long MAX_QUEUE_DEPTH = 10000; // 10 KB
+ private AMQQueue _queue;
+ private AMQQueueMBean _queueMBean;
+ private VirtualHost _virtualHost;
+ private AMQMinaProtocolSession protocolSession = null;
+ private MessageStore _messageStore = new MemoryMessageStore();
+ private StoreContext _storeContext = new StoreContext();
+ private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+ null,
+ new LinkedList<RequiredDeliveryException>()
+ );
+
+ /**
+ * Tests if the alert gets thrown when message count increases the threshold limit
+ *
+ * @throws Exception
+ */
+ public void testMessageCountAlert() throws Exception
+ {
+ _queue = new AMQQueue(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"),
+ false, _virtualHost);
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+
+ _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+
+ sendMessages(MAX_MESSAGE_COUNT, 256l);
+ assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
+
+ Notification lastNotification = _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_COUNT_ALERT.name()));
+ }
+
+ /**
+ * Tests if the Message Size alert gets thrown when message of higher than threshold limit is sent
+ *
+ * @throws Exception
+ */
+ public void testMessageSizeAlert() throws Exception
+ {
+ _queue = new AMQQueue(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"),
+ false, _virtualHost);
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+ _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+ _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
+
+ sendMessages(1, MAX_MESSAGE_SIZE * 2);
+ assertTrue(_queueMBean.getMessageCount() == 1);
+
+ Notification lastNotification = _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_SIZE_ALERT.name()));
+ }
+
+ /**
+ * Tests if Queue Depth alert is thrown when queue depth reaches the threshold value
+ *
+ * Based on FT-402 subbmitted by client
+ *
+ * @throws Exception
+ */
+ public void testQueueDepthAlertNoSubscriber() throws Exception
+ {
+ _queue = new AMQQueue(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"),
+ false, _virtualHost);
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+ _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+ _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
+
+ while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
+ {
+ sendMessages(1, MAX_MESSAGE_SIZE);
+ }
+
+ Notification lastNotification = _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
+ }
+
+ /**
+ * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than threshold value of
+ * message age
+ *
+ * Alternative test to FT-401 provided by client
+ *
+ * @throws Exception
+ */
+ public void testMessageAgeAlert() throws Exception
+ {
+ _queue = new AMQQueue(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"),
+ false, _virtualHost);
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+ _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+ _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
+
+ sendMessages(1, MAX_MESSAGE_SIZE);
+
+ // Ensure message sits on queue long enough to age.
+ Thread.sleep(MAX_MESSAGE_AGE * 2);
+
+ sendMessages(1, MAX_MESSAGE_SIZE);
+ assertTrue(_queueMBean.getMessageCount() == 2);
+
+ Notification lastNotification = _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_AGE_ALERT.name()));
+ }
+
+ /*
+ This test sends some messages to the queue with subscribers needing message to be acknowledged.
+ The messages will not be acknowledged and will be required twice. Why we are checking this is because
+ the bug reported said that the queueDepth keeps increasing when messages are requeued.
+ The QueueDepth should decrease when messages are delivered from the queue (QPID-408)
+ */
+ public void testQueueDepthAlertWithSubscribers() throws Exception
+ {
+ protocolSession = new TestMinaProtocolSession();
+ AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore);
+ protocolSession.addChannel(channel);
+
+ // Create queue
+ _queue = getNewQueue();
+ _queue.registerProtocolSession(protocolSession, channel.getChannelId(),
+ new AMQShortString("consumer_tag"), true, null, false, false);
+
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+ _queueMBean.setMaximumMessageCount(9999l); // Set a high value, because this is not being tested
+ _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
+
+ // Send messages(no of message to be little more than what can cause a Queue_Depth alert)
+ int messageCount = Math.round(MAX_QUEUE_DEPTH / MAX_MESSAGE_SIZE) + 10;
+ long totalSize = (messageCount * MAX_MESSAGE_SIZE) >> 10;
+ sendMessages(messageCount, MAX_MESSAGE_SIZE);
+
+ // Check queueDepth. There should be no messages on the queue and as the subscriber is listening
+ // so there should be no Queue_Deoth alert raised
+ assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
+ Notification lastNotification = _queueMBean.getLastNotification();
+ assertNull(lastNotification);
+
+ // Kill the subscriber and check for the queue depth values.
+ // Messages are unacknowledged, so those should get requeued. All messages should be on the Queue
+ _queue.unregisterProtocolSession(protocolSession, channel.getChannelId(), new AMQShortString("consumer_tag"));
+ channel.requeue();
+
+ assertEquals(new Long(totalSize), new Long(_queueMBean.getQueueDepth()));
+
+ lastNotification = _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
+
+ // Connect a consumer again and check QueueDepth values. The queue should get emptied.
+ // Messages will get delivered but still are unacknowledged.
+ _queue.registerProtocolSession(protocolSession, channel.getChannelId(),
+ new AMQShortString("consumer_tag"), true, null, false, false);
+ _queue.deliverAsync();
+ while (_queue.getMessageCount() != 0)
+ {
+ Thread.sleep(100);
+ }
+ assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
+
+ // Kill the subscriber again. Now those messages should get requeued again. Check if the queue depth
+ // value is correct.
+ _queue.unregisterProtocolSession(protocolSession, channel.getChannelId(), new AMQShortString("consumer_tag"));
+ channel.requeue();
+
+ assertEquals(new Long(totalSize), new Long(_queueMBean.getQueueDepth()));
+ protocolSession.closeSession();
+
+ // Check the clear queue
+ _queueMBean.clearQueue();
+ assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
+ }
+
+ protected AMQMessage message(final boolean immediate, long size) throws AMQException
+ {
+ MessagePublishInfo publish = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.bodySize = size; // in bytes
+ AMQMessage message = new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext);
+ message.setContentHeaderBody(contentHeaderBody);
+ message.setPublisher(protocolSession);
+ return message;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+ _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ }
+
+ private void sendMessages(long messageCount, long size) throws AMQException
+ {
+ AMQMessage[] messages = new AMQMessage[(int) messageCount];
+ for (int i = 0; i < messages.length; i++)
+ {
+ messages[i] = message(false, size);
+ messages[i].enqueue(_queue);
+ messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+ }
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ _queue.process(_storeContext, new QueueEntry(_queue,messages[i]), false);
+ }
+ }
+
+ private AMQQueue getNewQueue() throws AMQException
+ {
+ return new AMQQueue(new AMQShortString("testQueue" + Math.random()),
+ false,
+ new AMQShortString("AMQueueAlertTest"),
+ false,
+ _virtualHost);
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
new file mode 100644
index 0000000000..9b874d63e8
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -0,0 +1,304 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.management.JMException;
+import java.util.LinkedList;
+
+/**
+ * Test class to test AMQQueueMBean attribtues and operations
+ */
+public class AMQQueueMBeanTest extends TestCase
+{
+ private static long MESSAGE_SIZE = 1000;
+ private AMQQueue _queue;
+ private AMQQueueMBean _queueMBean;
+ private MessageStore _messageStore;
+ private StoreContext _storeContext = new StoreContext();
+ private TransactionalContext _transactionalContext;
+ private VirtualHost _virtualHost;
+ private AMQProtocolSession _protocolSession;
+
+ public void testMessageCountTransient() throws Exception
+ {
+ int messageCount = 10;
+ sendMessages(messageCount, false);
+ assertTrue(_queueMBean.getMessageCount() == messageCount);
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+ long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+ assertTrue(_queueMBean.getQueueDepth() == queueDepth);
+
+ _queueMBean.deleteMessageFromTop();
+ assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ _queueMBean.clearQueue();
+ assertTrue(_queueMBean.getMessageCount() == 0);
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ //Ensure that the data has been removed from the Store
+ verifyBrokerState();
+ }
+
+ public void testMessageCountPersistent() throws Exception
+ {
+ int messageCount = 10;
+ sendMessages(messageCount, true);
+ assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+ long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+ assertTrue(_queueMBean.getQueueDepth() == queueDepth);
+
+ _queueMBean.deleteMessageFromTop();
+ assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ _queueMBean.clearQueue();
+ assertTrue(_queueMBean.getMessageCount() == 0);
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ //Ensure that the data has been removed from the Store
+ verifyBrokerState();
+ }
+
+ // todo: collect to a general testing class -duplicated from Systest/MessageReturntest
+ private void verifyBrokerState()
+ {
+
+ TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+
+ // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
+ assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
+ assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
+ assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
+ assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+ }
+
+ public void testConsumerCount() throws AMQException
+ {
+ SubscriptionManager mgr = _queue.getSubscribers();
+ assertFalse(mgr.hasActiveSubscribers());
+ assertTrue(_queueMBean.getActiveConsumerCount() == 0);
+
+
+ TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
+ AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
+ protocolSession.addChannel(channel);
+
+ _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false);
+ assertTrue(_queueMBean.getActiveConsumerCount() == 1);
+
+ SubscriptionSet _subscribers = (SubscriptionSet) mgr;
+ SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory();
+ Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(),
+ protocolSession,
+ new AMQShortString("S1"),
+ false,
+ null,
+ true,
+ _queue);
+
+ Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(),
+ protocolSession,
+ new AMQShortString("S2"),
+ false,
+ null,
+ true,
+ _queue);
+ _subscribers.addSubscriber(s1);
+ _subscribers.addSubscriber(s2);
+ assertTrue(_queueMBean.getActiveConsumerCount() == 3);
+ assertTrue(_queueMBean.getConsumerCount() == 3);
+
+ s1.close();
+ assertTrue(_queueMBean.getActiveConsumerCount() == 2);
+ assertTrue(_queueMBean.getConsumerCount() == 3);
+ }
+
+ public void testGeneralProperties()
+ {
+ long maxQueueDepth = 1000; // in bytes
+ _queueMBean.setMaximumMessageCount(50000l);
+ _queueMBean.setMaximumMessageSize(2000l);
+ _queueMBean.setMaximumQueueDepth(maxQueueDepth);
+
+ assertTrue(_queueMBean.getMaximumMessageCount() == 50000);
+ assertTrue(_queueMBean.getMaximumMessageSize() == 2000);
+ assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10));
+
+ assertTrue(_queueMBean.getName().equals("testQueue"));
+ assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
+ assertFalse(_queueMBean.isAutoDelete());
+ assertFalse(_queueMBean.isDurable());
+ }
+
+ public void testExceptions() throws Exception
+ {
+ try
+ {
+ _queueMBean.viewMessages(0, 3);
+ fail();
+ }
+ catch (JMException ex)
+ {
+
+ }
+
+ try
+ {
+ _queueMBean.viewMessages(2, 1);
+ fail();
+ }
+ catch (JMException ex)
+ {
+
+ }
+
+ try
+ {
+ _queueMBean.viewMessages(-1, 1);
+ fail();
+ }
+ catch (JMException ex)
+ {
+
+ }
+
+ AMQMessage msg = message(false, false);
+ long id = msg.getMessageId();
+ _queue.clearQueue(_storeContext);
+
+ msg.enqueue(_queue);
+ msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+ _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
+ _queueMBean.viewMessageContent(id);
+ try
+ {
+ _queueMBean.viewMessageContent(id + 1);
+ fail();
+ }
+ catch (JMException ex)
+ {
+
+ }
+ }
+
+ private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException
+ {
+ MessagePublishInfo publish = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
+ return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+ _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _messageStore = _virtualHost.getMessageStore();
+
+ _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+ null,
+ new LinkedList<RequiredDeliveryException>()
+ );
+
+ _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
+ _queueMBean = new AMQQueueMBean(_queue);
+
+ _protocolSession = new TestMinaProtocolSession();
+ }
+
+ private void sendMessages(int messageCount, boolean persistent) throws AMQException
+ {
+ for (int i = 0; i < messageCount; i++)
+ {
+ AMQMessage currentMessage = message(false, persistent);
+ currentMessage.enqueue(_queue);
+
+ // route header
+ currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+
+ // Add the body so we have somthing to test later
+ currentMessage.addContentBodyFrame(_storeContext,
+ _protocolSession.getMethodRegistry()
+ .getProtocolVersionMethodConverter()
+ .convertToContentChunk(
+ new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
+ MESSAGE_SIZE)));
+
+
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
new file mode 100644
index 0000000000..48d808142c
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.List;
+
+/**
+ * Adds some extra methods to the memory message store for testing purposes.
+ */
+public class TestableMemoryMessageStore extends MemoryMessageStore
+{
+
+ MemoryMessageStore _mms = null;
+
+ public TestableMemoryMessageStore(MemoryMessageStore mms)
+ {
+ _mms = mms;
+ }
+
+ public TestableMemoryMessageStore()
+ {
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
+ }
+
+ public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+ {
+ if (_mms != null)
+ {
+ return _mms._metaDataMap;
+ }
+ else
+ {
+ return _metaDataMap;
+ }
+ }
+
+ public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
+ {
+ if (_mms != null)
+ {
+ return _mms._contentBodyMap;
+ }
+ else
+ {
+ return _contentBodyMap;
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/LoggingProxyTest.java b/java/broker/src/test/java/org/apache/qpid/server/util/LoggingProxyTest.java
new file mode 100644
index 0000000000..c7db51016e
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/LoggingProxyTest.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+public class LoggingProxyTest extends TestCase
+{
+ static interface IFoo {
+ void foo();
+ void foo(int i, Collection c);
+ String bar();
+ String bar(String s, List l);
+ }
+
+ static class Foo implements IFoo {
+ public void foo()
+ {
+ }
+
+ public void foo(int i, Collection c)
+ {
+ }
+
+ public String bar()
+ {
+ return null;
+ }
+
+ public String bar(String s, List l)
+ {
+ return "ha";
+ }
+ }
+
+ public void testSimple() {
+ LoggingProxy proxy = new LoggingProxy(new Foo(), 20);
+ IFoo foo = (IFoo)proxy.getProxy(IFoo.class);
+ foo.foo();
+ assertEquals(2, proxy.getBufferSize());
+ assertTrue(proxy.getBuffer().get(0).toString().matches(".*: foo\\(\\) entered$"));
+ assertTrue(proxy.getBuffer().get(1).toString().matches(".*: foo\\(\\) returned$"));
+
+ foo.foo(3, Arrays.asList(0, 1, 2));
+ assertEquals(4, proxy.getBufferSize());
+ assertTrue(proxy.getBuffer().get(2).toString().matches(".*: foo\\(\\[3, \\[0, 1, 2\\]\\]\\) entered$"));
+ assertTrue(proxy.getBuffer().get(3).toString().matches(".*: foo\\(\\) returned$"));
+
+ foo.bar();
+ assertEquals(6, proxy.getBufferSize());
+ assertTrue(proxy.getBuffer().get(4).toString().matches(".*: bar\\(\\) entered$"));
+ assertTrue(proxy.getBuffer().get(5).toString().matches(".*: bar\\(\\) returned null$"));
+
+ foo.bar("hello", Arrays.asList(1, 2, 3));
+ assertEquals(8, proxy.getBufferSize());
+ assertTrue(proxy.getBuffer().get(6).toString().matches(".*: bar\\(\\[hello, \\[1, 2, 3\\]\\]\\) entered$"));
+ assertTrue(proxy.getBuffer().get(7).toString().matches(".*: bar\\(\\) returned ha$"));
+
+ proxy.dump();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(LoggingProxyTest.class);
+ }
+}