diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-15 23:32:24 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-15 23:32:24 +0000 |
| commit | 58fb658d72dd0a0d750fdd3f97e607cf3d2e9134 (patch) | |
| tree | e3abe7865d6fcd2ecd55db22254b6f957deb69d3 /qpid/java/broker/src/test | |
| parent | 91782b4b801528ca5fb4a3f9e3c2879d1f02c3a1 (diff) | |
| download | qpid-python-58fb658d72dd0a0d750fdd3f97e607cf3d2e9134.tar.gz | |
QPID-4659 : [Java Broker] tidy up amqp 0-8 implementation, reduce unnecessary usage in tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503523 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
20 files changed, 254 insertions, 284 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index b2d7237737..f2a64381df 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -20,33 +20,31 @@ */ package org.apache.qpid.server.exchange; +import java.util.List; import junit.framework.Assert; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TopicExchangeTest extends QpidTestCase { private TopicExchange _exchange; private VirtualHost _vhost; - private MessageStore _store; @Override @@ -56,7 +54,6 @@ public class TopicExchangeTest extends QpidTestCase BrokerTestHelper.setUp(); _exchange = new TopicExchange(); _vhost = BrokerTestHelper.createVirtualHost(getName()); - _store = new TestMemoryMessageStore(); } @Override @@ -82,8 +79,7 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b"); - routeMessage(message); + routeMessage("a.b", 0l); Assert.assertEquals(0, queue.getMessageCount()); } @@ -94,21 +90,16 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b"); - - routeMessage(message); + routeMessage("a.b",0l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - - message = createMessage("a.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.c",1l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -121,34 +112,26 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b"); - - routeMessage(message); + routeMessage("a.b",0l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.c"); - - int queueCount = routeMessage(message); + routeMessage("a.c",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - - message = createMessage("a"); - - - queueCount = routeMessage(message); + int queueCount = routeMessage("a",2l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -160,57 +143,45 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b.c"); - - int queueCount = routeMessage(message); + routeMessage("a.b.c",0l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - - message = createMessage("a.b"); - - queueCount = routeMessage(message); + routeMessage("a.b",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.c"); - - queueCount = routeMessage(message); + routeMessage("a.c",2l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 2l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a"); - - queueCount = routeMessage(message); + routeMessage("a",3l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 3l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("b"); - - - queueCount = routeMessage(message); + int queueCount = routeMessage("b", 4l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -222,25 +193,20 @@ public class TopicExchangeTest extends QpidTestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); - - IncomingMessage message = createMessage("a.c.d.b"); - - routeMessage(message); + routeMessage("a.c.d.b",0l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.c.b"); - - routeMessage(message); + routeMessage("a.c.b",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); @@ -253,39 +219,31 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null)); - IncomingMessage message = createMessage("a.c.b.b"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.c.b.b",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.a.b.c"); - - routeMessage(message); + routeMessage("a.a.b.c",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.b.c.b"); - - queueCount = routeMessage(message); + queueCount = routeMessage("a.b.c.b",2l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.b.c.b.c"); - - routeMessage(message); + routeMessage("a.b.c.b.c",3l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 3l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); @@ -298,22 +256,16 @@ public class TopicExchangeTest extends QpidTestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null)); - - IncomingMessage message = createMessage("a.c.b.b.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.c.b.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); - - message = createMessage("a.a.b.c.d"); - - routeMessage(message); + routeMessage("a.a.b.c.d",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); @@ -325,21 +277,16 @@ public class TopicExchangeTest extends QpidTestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null)); - - IncomingMessage message = createMessage("a.c.b.b.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.c.b.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.a.b.c.d"); - - routeMessage(message); + routeMessage("a.a.b.c.d",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); @@ -351,29 +298,30 @@ public class TopicExchangeTest extends QpidTestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null)); - - IncomingMessage message = createMessage("a.b.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } - private int routeMessage(final IncomingMessage message) - throws AMQException + private int routeMessage(String routingKey, long messageNumber) throws AMQException { - MessageMetaData mmd = message.headersReceived(System.currentTimeMillis()); - message.setStoredMessage(_store.addMessage(mmd)); - - message.enqueue(_exchange.route(message)); - AMQMessage msg = new AMQMessage(message.getStoredMessage()); - for(BaseQueue q : message.getDestinationQueues()) + InboundMessage inboundMessage = mock(InboundMessage.class); + when(inboundMessage.getRoutingKey()).thenReturn(routingKey); + when(inboundMessage.getRoutingKeyShortString()).thenReturn(new AMQShortString(routingKey)); + List<? extends BaseQueue> queues = _exchange.route(inboundMessage); + ServerMessage message = mock(ServerMessage.class); + MessageReference ref = mock(MessageReference.class); + when(ref.getMessage()).thenReturn(message); + when(message.newReference()).thenReturn(ref); + when(message.getMessageNumber()).thenReturn(messageNumber); + for(BaseQueue q : queues) { - q.enqueue(msg); + q.enqueue(message); } - return message.getDestinationQueues().size(); + + return queues.size(); } public void testMoreRouting() throws AMQException @@ -382,9 +330,7 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -397,62 +343,11 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); - IncomingMessage message = createMessage("a"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } - private IncomingMessage createMessage(String s) throws AMQException - { - MessagePublishInfo info = new PublishInfo(new AMQShortString(s)); - - IncomingMessage message = new IncomingMessage(info); - final ContentHeaderBody chb = new ContentHeaderBody(); - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - chb.setProperties(props); - message.setContentHeaderBody(chb); - - - return message; - } - - - class PublishInfo implements MessagePublishInfo - { - private AMQShortString _routingkey; - - PublishInfo(AMQShortString routingkey) - { - _routingkey = routingkey; - } - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return true; - } - - public AMQShortString getRoutingKey() - { - return _routingkey; - } - } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java index 055197c23e..41b42fac78 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.logging.actors; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.util.BrokerTestHelper; import java.util.List; @@ -45,7 +45,7 @@ public class AMQPChannelActorTest extends BaseConnectionActorTestCase private void setUpNow() throws Exception { super.setUp(); - AMQChannel channel = BrokerTestHelper.createChannel(1, getSession()); + AMQSessionModel channel = BrokerTestHelper.createSession(1, getConnection()); setAmqpActor(new AMQPChannelActor(channel, getRootLogger())); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java index 09dd48e4d3..1cb6474e41 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java @@ -20,31 +20,43 @@ */ package org.apache.qpid.server.logging.actors; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; public class BaseConnectionActorTestCase extends BaseActorTestCase { - private AMQProtocolSession _session; + private AMQConnectionModel _session; + private VirtualHost _virtualHost; @Override public void setUp() throws Exception { super.setUp(); BrokerTestHelper.setUp(); - _session = BrokerTestHelper.createSession(); - + _session = BrokerTestHelper.createConnection(); + _virtualHost = BrokerTestHelper.createVirtualHost("test"); setAmqpActor(new AMQPConnectionActor(_session, getRootLogger())); } + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + @Override public void tearDown() throws Exception { try { + if(_virtualHost != null) + { + _virtualHost.close(); + } if (_session != null) { - _session.getVirtualHost().close(); + _session.close(AMQConstant.CONNECTION_FORCED, ""); } } finally @@ -54,7 +66,7 @@ public class BaseConnectionActorTestCase extends BaseActorTestCase } } - public AMQProtocolSession getSession() + public AMQConnectionModel getConnection() { return _session; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java index 0c64ce837e..d413c4d4c9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java @@ -23,9 +23,11 @@ package org.apache.qpid.server.logging.actors; import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.AMQException; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.NullRootMessageLogger; +import org.apache.qpid.server.util.BrokerTestHelper; /** * Test : CurrentActorTest @@ -71,7 +73,7 @@ public class CurrentActorTest extends BaseConnectionActorTestCase public void testLIFO() throws AMQException, ConfigurationException { assertTrue("Unexpected actor: " + CurrentActor.get(), CurrentActor.get() instanceof TestLogActor); - AMQPConnectionActor connectionActor = new AMQPConnectionActor(getSession(), + AMQPConnectionActor connectionActor = new AMQPConnectionActor(getConnection(), new NullRootMessageLogger()); /* @@ -98,7 +100,7 @@ public class CurrentActorTest extends BaseConnectionActorTestCase * */ - AMQChannel channel = new AMQChannel(getSession(), 1, getSession().getVirtualHost().getMessageStore()); + AMQSessionModel channel = BrokerTestHelper.createSession(1, getConnection()); AMQPChannelActor channelActor = new AMQPChannelActor(channel, new NullRootMessageLogger()); @@ -214,7 +216,7 @@ public class CurrentActorTest extends BaseConnectionActorTestCase { LogActor defaultActor = CurrentActor.get(); - AMQPConnectionActor actor = new AMQPConnectionActor(getSession(), + AMQPConnectionActor actor = new AMQPConnectionActor(getConnection(), new NullRootMessageLogger()); CurrentActor.set(actor); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java index 2dc44c58ce..55153b7389 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java @@ -31,7 +31,7 @@ public class QueueActorTest extends BaseConnectionActorTestCase public void setUp() throws Exception { super.setUp(); - setAmqpActor(new QueueActor(BrokerTestHelper.createQueue(getName(), getSession().getVirtualHost()), getRootLogger())); + setAmqpActor(new QueueActor(BrokerTestHelper.createQueue(getName(), getVirtualHost()), getRootLogger())); } /** diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java index 58fca488c4..92915e7092 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java @@ -44,7 +44,7 @@ public class SubscriptionActorTest extends BaseConnectionActorTestCase MockSubscription mockSubscription = new MockSubscription(); - mockSubscription.setQueue(BrokerTestHelper.createQueue(getName(), getSession().getVirtualHost()), false); + mockSubscription.setQueue(BrokerTestHelper.createQueue(getName(), getVirtualHost()), false); setAmqpActor(new SubscriptionActor(getRootLogger(), mockSubscription)); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java index 193e8a490d..f779295cd4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java @@ -113,7 +113,7 @@ public abstract class AbstractTestLogSubject extends QpidTestCase // This should return us MockProtocolSessionUser@null/test String connectionSlice = getSlice("con:" + connectionID, message); - assertNotNull("Unable to find connection 'con:" + connectionID + "'", + assertNotNull("Unable to find connection 'con:" + connectionID + "' in '"+message+"'", connectionSlice); // Exract the userName @@ -131,7 +131,7 @@ public abstract class AbstractTestLogSubject extends QpidTestCase // We will have three sections assertEquals("Unable to split IP from rest of Connection:" - + userNameParts[1], 3, ipParts.length); + + userNameParts[1] + " in '"+message+"'", 3, ipParts.length); // We need to skip the first '/' split will be empty so validate 1 as IP assertEquals("IP not as expected", ipString, ipParts[1]); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java index a6701301f9..a3d96c6d12 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; +import org.apache.qpid.server.protocol.AMQSessionModel; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Validate ChannelLogSubjects are logged as expected @@ -34,10 +37,10 @@ public class ChannelLogSubjectTest extends ConnectionLogSubjectTest { super.setUp(); - - AMQChannel channel = new AMQChannel(getSession(), _channelID, getSession().getVirtualHost().getMessageStore()); - - _subject = new ChannelLogSubject(channel); + AMQSessionModel session = mock(AMQSessionModel.class); + when(session.getConnectionModel()).thenReturn(getConnection()); + when(session.getChannelId()).thenReturn(_channelID); + _subject = new ChannelLogSubject(session); } /** diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java index 835bcb3e97..e9a9317102 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java @@ -20,8 +20,10 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; -import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.protocol.AMQConnectionModel; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Validate ConnectionLogSubjects are logged as expected @@ -29,25 +31,24 @@ import org.apache.qpid.server.util.BrokerTestHelper; public class ConnectionLogSubjectTest extends AbstractTestLogSubject { - private InternalTestProtocolSession _session; + private static final long CONNECTION_ID = 456l; + private static final String USER = "InternalTestProtocolSession"; + private static final String IP_STRING = "127.0.0.1:1"; + private static final String VHOST = "test"; + + private AMQConnectionModel _connection; @Override public void setUp() throws Exception { super.setUp(); - _session = BrokerTestHelper.createSession("test"); - _subject = new ConnectionLogSubject(_session); - } - - @Override - public void tearDown() throws Exception - { - if (_session != null) - { - _session.getVirtualHost().close(); - } - super.tearDown(); + _connection = mock(AMQConnectionModel.class); + when(_connection.getConnectionId()).thenReturn(CONNECTION_ID); + when(_connection.getPrincipalAsString()).thenReturn(USER); + when(_connection.getRemoteAddressString()).thenReturn("/"+IP_STRING); + when(_connection.getVirtualHostName()).thenReturn(VHOST); + _subject = new ConnectionLogSubject(_connection); } /** @@ -57,12 +58,12 @@ public class ConnectionLogSubjectTest extends AbstractTestLogSubject */ protected void validateLogStatement(String message) { - verifyConnection(_session.getSessionID(), "InternalTestProtocolSession", "127.0.0.1:1", "test", message); + verifyConnection(CONNECTION_ID, USER, IP_STRING, VHOST, message); } - public InternalTestProtocolSession getSession() + public AMQConnectionModel getConnection() { - return _session; + return _connection; } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java index a5e5529ed5..b358c7c5c5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java @@ -35,7 +35,6 @@ import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolEngineTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java index b523979387..f5e58cfd02 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolEngineTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java @@ -1,4 +1,4 @@ -package org.apache.qpid.server.protocol; +package org.apache.qpid.server.protocol.v0_8; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -11,7 +11,6 @@ import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.network.NetworkConnection; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 60b6b215c6..4ab64ca100 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -28,9 +28,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; @@ -67,7 +65,7 @@ public class AckTest extends QpidTestCase { super.setUp(); BrokerTestHelper.setUp(); - _channel = BrokerTestHelper.createChannel(5); + _channel = BrokerTestHelper_0_8.createChannel(5); _protocolSession = _channel.getProtocolSession(); _virtualHost = _protocolSession.getVirtualHost(); _queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index 0e4dbe87a3..a9eb0ebfe7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -18,14 +18,12 @@ * under the License. * */ -package org.apache.qpid.server.ack; +package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; @@ -47,7 +45,7 @@ public class AcknowledgeTest extends QpidTestCase { super.setUp(); BrokerTestHelper.setUp(); - _channel = BrokerTestHelper.createChannel(); + _channel = BrokerTestHelper_0_8.createChannel(); VirtualHost virtualHost = _channel.getVirtualHost(); _queueName = getTestName(); _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); @@ -125,7 +123,10 @@ public class AcknowledgeTest extends QpidTestCase checkStoreContents(0); //Send required messsages to the queue - BrokerTestHelper.publishMessages(getChannel(), sendMessageCount, _queueName, ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); + BrokerTestHelper_0_8.publishMessages(getChannel(), + sendMessageCount, + _queueName, + ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); if (getChannel().isTransactional()) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java new file mode 100644 index 0000000000..0919607bd7 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class BrokerTestHelper_0_8 extends BrokerTestHelper +{ + + public static AMQChannel createChannel(int channelId, AMQProtocolSession session) throws AMQException + { + AMQChannel channel = new AMQChannel(session, channelId, session.getVirtualHost().getMessageStore()); + session.addChannel(channel); + return channel; + } + + public static AMQChannel createChannel(int channelId) throws Exception + { + InternalTestProtocolSession session = createProtocolSession(); + return createChannel(channelId, session); + } + + public static AMQChannel createChannel() throws Exception + { + return createChannel(1); + } + + public static InternalTestProtocolSession createProtocolSession() throws Exception + { + return createProtocolSession("test"); + } + + public static InternalTestProtocolSession createProtocolSession(String hostName) throws Exception + { + VirtualHost virtualHost = createVirtualHost(hostName); + return new InternalTestProtocolSession(virtualHost, createBrokerMock()); + } + + public static void publishMessages(AMQChannel channel, int numberOfMessages, String queueName, String exchangeName) throws AMQException + { + AMQShortString rouningKey = new AMQShortString(queueName); + AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName); + MessagePublishInfo info = mock(MessagePublishInfo.class); + when(info.getExchange()).thenReturn(exchangeNameAsShortString); + when(info.getRoutingKey()).thenReturn(rouningKey); + + Exchange exchange = channel.getVirtualHost().getExchange(exchangeName); + for (int count = 0; count < numberOfMessages; count++) + { + channel.setPublishFrame(info, exchange); + + // Set the body size + ContentHeaderBody _headerBody = new ContentHeaderBody(); + _headerBody.setBodySize(0); + + // Set Minimum properties + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + + properties.setExpiration(0L); + properties.setTimestamp(System.currentTimeMillis()); + + // Make Message Persistent + properties.setDeliveryMode((byte) 2); + + _headerBody.setProperties(properties); + + channel.publishContentHeader(_headerBody); + } + channel.sync(); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java index 1cc872e177..a77475c05f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java @@ -22,8 +22,6 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; @@ -37,7 +35,7 @@ public class MaxChannelsTest extends QpidTestCase { super.setUp(); BrokerTestHelper.setUp(); - _session = BrokerTestHelper.createSession(); + _session = BrokerTestHelper_0_8.createProtocolSession(); } public void testChannels() throws Exception diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index d9c0b7055f..21142e7ab6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -18,19 +18,18 @@ * under the License. * */ -package org.apache.qpid.server.subscription; +package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -49,7 +48,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase { super.setUp(); BrokerTestHelper.setUp(); - _channel = BrokerTestHelper.createChannel(); + _channel = BrokerTestHelper_0_8.createChannel(); VirtualHost virtualHost = _channel.getVirtualHost(); _queueName = getTestName(); _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); @@ -97,7 +96,10 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase checkStoreContents(0); //Send required messsages to the queue - BrokerTestHelper.publishMessages(getChannel(), sendMessageCount, _queueName, ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); + BrokerTestHelper_0_8.publishMessages(getChannel(), + sendMessageCount, + _queueName, + ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); //Ensure they are stored checkStoreContents(sendMessageCount); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java index 731b1aadc4..e98dd63450 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java @@ -18,16 +18,14 @@ * under the License. * */ -package org.apache.qpid.server.subscription; +package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.logging.UnitTestMessageLogger; import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; @@ -41,7 +39,7 @@ public class SubscriptionFactoryImplTest extends QpidTestCase { super.setUp(); BrokerTestHelper.setUp(); - _channel = BrokerTestHelper.createChannel(); + _channel = BrokerTestHelper_0_8.createChannel(); _session = _channel.getProtocolSession(); GenericActor.setDefaultMessageLogger(new UnitTestMessageLogger(false)); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionLogSubjectTest.java index e314361a89..c44fdebc03 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionLogSubjectTest.java @@ -18,17 +18,15 @@ * under the License. * */ -package org.apache.qpid.server.logging.subjects; +package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; +import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject; +import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; import org.apache.qpid.server.flow.LimitlessCreditManager; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactory; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -48,7 +46,7 @@ public class SubscriptionLogSubjectTest extends AbstractTestLogSubject { super.setUp(); - InternalTestProtocolSession session = BrokerTestHelper.createSession(); + InternalTestProtocolSession session = BrokerTestHelper_0_8.createProtocolSession(); _testVhost = session.getVirtualHost(); _queue = new MockAMQQueue("SubscriptionLogSubjectTest"); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 87f0c06c2b..07e72d3535 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -39,6 +39,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.protocol.v0_8.AMQMessage; +import org.apache.qpid.server.protocol.v0_8.IncomingMessage; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index bc4e67aefe..caf6acb4bb 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -31,11 +31,8 @@ import java.util.UUID; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.configuration.store.JsonConfigurationEntryStore; import org.apache.qpid.server.exchange.DefaultExchangeFactory; @@ -47,8 +44,6 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.security.SecurityManager; @@ -143,33 +138,35 @@ public class BrokerTestHelper return vhostConfig; } - public static AMQChannel createChannel(int channelId, AMQProtocolSession session) throws AMQException + public static AMQSessionModel createSession(int channelId, AMQConnectionModel connection) throws AMQException { - AMQChannel channel = new AMQChannel(session, channelId, session.getVirtualHost().getMessageStore()); - session.addChannel(channel); - return channel; + AMQSessionModel session = mock(AMQSessionModel.class); + when(session.getConnectionModel()).thenReturn(connection); + when(session.getChannelId()).thenReturn(channelId); + return session; } - public static AMQChannel createChannel(int channelId) throws Exception + public static AMQSessionModel createSession(int channelId) throws Exception { - InternalTestProtocolSession session = createSession(); - return createChannel(channelId, session); + AMQConnectionModel session = createConnection(); + return createSession(channelId, session); } - public static AMQChannel createChannel() throws Exception + public static AMQSessionModel createSession() throws Exception { - return createChannel(1); + return createSession(1); } - public static InternalTestProtocolSession createSession() throws Exception + public static AMQConnectionModel createConnection() throws Exception { - return createSession("test"); + return createConnection("test"); } - public static InternalTestProtocolSession createSession(String hostName) throws Exception + public static AMQConnectionModel createConnection(String hostName) throws Exception { VirtualHost virtualHost = createVirtualHost(hostName); - return new InternalTestProtocolSession(virtualHost, createBrokerMock()); + AMQConnectionModel connection = mock(AMQConnectionModel.class); + return connection; } public static Exchange createExchange(String hostName) throws Exception @@ -182,39 +179,6 @@ public class BrokerTestHelper return factory.createExchange("amp.direct", "direct", false, false); } - public static void publishMessages(AMQChannel channel, int numberOfMessages, String queueName, String exchangeName) throws AMQException - { - AMQShortString rouningKey = new AMQShortString(queueName); - AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName); - MessagePublishInfo info = mock(MessagePublishInfo.class); - when(info.getExchange()).thenReturn(exchangeNameAsShortString); - when(info.getRoutingKey()).thenReturn(rouningKey); - - Exchange exchange = channel.getVirtualHost().getExchange(exchangeName); - for (int count = 0; count < numberOfMessages; count++) - { - channel.setPublishFrame(info, exchange); - - // Set the body size - ContentHeaderBody _headerBody = new ContentHeaderBody(); - _headerBody.setBodySize(0); - - // Set Minimum properties - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - - properties.setExpiration(0L); - properties.setTimestamp(System.currentTimeMillis()); - - // Make Message Persistent - properties.setDeliveryMode((byte) 2); - - _headerBody.setProperties(properties); - - channel.publishContentHeader(_headerBody); - } - channel.sync(); - } - public static SimpleAMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException { SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, null, |
