diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-16 10:49:37 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-16 10:49:37 +0000 |
| commit | 42b37635a7b0252c7cdaae11b431d8d3c67cbae9 (patch) | |
| tree | 263350daf0522ba805f736b86185a96ad95f7824 /qpid/java/broker/src/test | |
| parent | 403007b67551a262b62ada16429f4f305c2d93fd (diff) | |
| download | qpid-python-42b37635a7b0252c7cdaae11b431d8d3c67cbae9.tar.gz | |
QPID-4659 : [Java Broker] move amqp 0-8 implementation into a plugin
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
18 files changed, 0 insertions, 2356 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java index d413c4d4c9..701ccaab47 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java @@ -24,7 +24,6 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.AMQException; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.NullRootMessageLogger; import org.apache.qpid.server.util.BrokerTestHelper; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java deleted file mode 100644 index 0d4b4ba9f8..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ /dev/null @@ -1,205 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.protocol; - -import static org.mockito.Mockito.when; - -import java.nio.ByteBuffer; -import java.util.EnumSet; -import java.util.Set; - -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.TestNetworkConnection; - -public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase -{ - private VirtualHost _virtualHost; - private Broker _broker; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _broker = BrokerTestHelper.createBrokerMock(); - VirtualHostRegistry virtualHostRegistry = _broker.getVirtualHostRegistry(); - when(_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)).thenReturn("default"); - - // AMQP 1-0 connection needs default vhost to be present - _virtualHost = BrokerTestHelper.createVirtualHost("default", virtualHostRegistry); - } - - @Override - protected void tearDown() throws Exception - { - try - { - _virtualHost.close(); - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - private static final byte[] AMQP_0_8_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 1, - (byte) 1, - (byte) 8, - (byte) 0 - }; - - private static final byte[] AMQP_0_9_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 1, - (byte) 1, - (byte) 0, - (byte) 9 - }; - - private static final byte[] AMQP_0_9_1_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 0, - (byte) 0, - (byte) 9, - (byte) 1 - }; - - - private static final byte[] AMQP_0_10_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 1, - (byte) 1, - (byte) 0, - (byte) 10 - }; - - - private static final byte[] AMQP_1_0_0_HEADER = - new byte[] { - (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte) 0, - (byte) 1, - (byte) 0, - (byte) 0 - }; - - - private byte[] getAmqpHeader(final AmqpProtocolVersion version) - { - switch(version) - { - case v0_8: - return AMQP_0_8_HEADER; - case v0_9: - return AMQP_0_9_HEADER; - case v0_9_1: - return AMQP_0_9_1_HEADER; - case v0_10: - return AMQP_0_10_HEADER; - case v1_0_0: - return AMQP_1_0_0_HEADER; - default: - fail("unknown AMQP version, appropriate header must be added for new protocol version"); - return null; - } - } - - /** - * Test to verify that connections established using a MultiVersionProtocolEngine are assigned - * IDs from a common sequence, independent of the protocol version under use. - */ - public void testDifferentProtocolVersionsShareCommonIDNumberingSequence() - { - Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class); - - MultiVersionProtocolEngineFactory factory = - new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, null, null, - org.apache.qpid.server.model.Transport.TCP); - - //create a dummy to retrieve the 'current' ID number - long previousId = factory.newProtocolEngine().getConnectionId(); - - //create a protocol engine and send the AMQP header for all supported AMQP verisons, - //ensuring the ID assigned increases as expected - for(AmqpProtocolVersion version : versions) - { - long expectedID = previousId + 1; - byte[] header = getAmqpHeader(version); - assertNotNull("protocol header should not be null", header); - - ServerProtocolEngine engine = factory.newProtocolEngine(); - TestNetworkConnection conn = new TestNetworkConnection(); - engine.setNetworkConnection(conn, conn.getSender()); - assertEquals("ID did not increment as expected", expectedID, engine.getConnectionId()); - - //actually feed in the AMQP header for this protocol version, and ensure the ID remains consistent - engine.received(ByteBuffer.wrap(header)); - assertEquals("ID was not as expected following receipt of the AMQP version header", expectedID, engine.getConnectionId()); - - previousId = expectedID; - engine.closed(); - } - } - - /** - * Test to verify that when requesting a ProtocolEngineFactory to produce engines having a default reply to unsupported - * version initiations, there is enforcement that the default reply is itself a supported protocol version. - */ - public void testUnsupportedDefaultReplyCausesIllegalArgumentException() - { - Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class); - versions.remove(AmqpProtocolVersion.v0_9); - - try - { - new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, AmqpProtocolVersion.v0_9, null, - org.apache.qpid.server.model.Transport.TCP); - fail("should not have been allowed to create the factory"); - } - catch(IllegalArgumentException iae) - { - //expected - } - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java deleted file mode 100644 index b358c7c5c5..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -public class AMQChannelTest extends QpidTestCase -{ - private VirtualHost _virtualHost; - private AMQProtocolSession _protocolSession; - private Map<Integer,String> _replies; - private Broker _broker; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _virtualHost = BrokerTestHelper.createVirtualHost(getTestName()); - _broker = BrokerTestHelper.createBrokerMock(); - _protocolSession = new InternalTestProtocolSession(_virtualHost, _broker) - { - @Override - public void writeReturn(MessagePublishInfo messagePublishInfo, - ContentHeaderBody header, - MessageContentSource msgContent, - int channelId, - int replyCode, - AMQShortString replyText) throws AMQException - { - _replies.put(replyCode, replyText.asString()); - } - }; - _replies = new HashMap<Integer, String>(); - } - - @Override - public void tearDown() throws Exception - { - try - { - _virtualHost.close(); - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - public void testCompareTo() throws Exception - { - AMQChannel channel1 = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore()); - - // create a channel with the same channelId but on a different session - AMQChannel channel2 = new AMQChannel(new InternalTestProtocolSession(_virtualHost, _broker), 1, _virtualHost.getMessageStore()); - assertFalse("Unexpected compare result", channel1.compareTo(channel2) == 0); - assertEquals("Unexpected compare result", 0, channel1.compareTo(channel1)); - } - - public void testPublishContentHeaderWhenMessageAuthorizationFails() throws Exception - { - setTestSystemProperty(BrokerProperties.PROPERTY_MSG_AUTH, "true"); - AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore()); - channel.setLocalTransactional(); - - MessagePublishInfo info = mock(MessagePublishInfo.class); - Exchange e = mock(Exchange.class); - ContentHeaderBody contentHeaderBody= mock(ContentHeaderBody.class); - BasicContentHeaderProperties properties = mock(BasicContentHeaderProperties.class); - - when(contentHeaderBody.getProperties()).thenReturn(properties); - when(info.getExchange()).thenReturn(new AMQShortString("test")); - when(properties.getUserId()).thenReturn(new AMQShortString(_protocolSession.getAuthorizedPrincipal().getName() + "_incorrect")); - - channel.setPublishFrame(info, e); - channel.publishContentHeader(contentHeaderBody); - channel.commit(); - - assertEquals("Unexpected number of replies", 1, _replies.size()); - assertEquals("Message authorization passed", "Access Refused", _replies.get(403)); - } - - public void testPublishContentHeaderWhenMessageAuthorizationPasses() throws Exception - { - setTestSystemProperty(BrokerProperties.PROPERTY_MSG_AUTH, "true"); - AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore()); - channel.setLocalTransactional(); - - MessagePublishInfo info = mock(MessagePublishInfo.class); - Exchange e = mock(Exchange.class); - ContentHeaderBody contentHeaderBody= mock(ContentHeaderBody.class); - BasicContentHeaderProperties properties = mock(BasicContentHeaderProperties.class); - - when(contentHeaderBody.getProperties()).thenReturn(properties); - when(info.getExchange()).thenReturn(new AMQShortString("test")); - when(properties.getUserId()).thenReturn(new AMQShortString(_protocolSession.getAuthorizedPrincipal().getName())); - - channel.setPublishFrame(info, e); - channel.publishContentHeader(contentHeaderBody); - channel.commit(); - - assertEquals("Unexpected number of replies", 0, _replies.size()); - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java deleted file mode 100644 index f5e58cfd02..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java +++ /dev/null @@ -1,72 +0,0 @@ -package org.apache.qpid.server.protocol.v0_8; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.properties.ConnectionStartProperties; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.network.NetworkConnection; - -public class AMQProtocolEngineTest extends QpidTestCase -{ - private Broker _broker; - private Port _port; - private NetworkConnection _network; - private Transport _transport; - - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _broker = BrokerTestHelper.createBrokerMock(); - when(_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(true); - - _port = mock(Port.class); - _network = mock(NetworkConnection.class); - _transport = Transport.TCP; - } - - public void tearDown() throws Exception - { - try - { - super.tearDown(); - } - finally - { - BrokerTestHelper.tearDown(); - } - } - - public void testSetClientPropertiesForNoRouteProvidedAsString() - { - AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport); - assertTrue("Unexpected closeWhenNoRoute before client properties set", engine.isCloseWhenNoRoute()); - - Map<String, Object> clientProperties = new HashMap<String, Object>(); - clientProperties.put(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, Boolean.FALSE.toString()); - engine.setClientProperties(FieldTable.convertToFieldTable(clientProperties)); - - assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute()); - } - - public void testSetClientPropertiesForNoRouteProvidedAsBoolean() - { - AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport); - assertTrue("Unexpected closeWhenNoRoute before client properties set", engine.isCloseWhenNoRoute()); - - Map<String, Object> clientProperties = new HashMap<String, Object>(); - clientProperties.put(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, Boolean.FALSE); - engine.setClientProperties(FieldTable.convertToFieldTable(clientProperties)); - - assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute()); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java deleted file mode 100644 index 4ab64ca100..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ /dev/null @@ -1,436 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.flow.LimitlessCreditManager; -import org.apache.qpid.server.flow.Pre0_10CreditManager; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -import java.util.ArrayList; -import java.util.Set; - -/** - * Tests that acknowledgements are handled correctly. - */ -public class AckTest extends QpidTestCase -{ - private Subscription _subscription; - - private AMQProtocolSession _protocolSession; - - private TestableMemoryMessageStore _messageStore; - - private AMQChannel _channel; - - private AMQQueue _queue; - - private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag"); - private VirtualHost _virtualHost; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _channel = BrokerTestHelper_0_8.createChannel(5); - _protocolSession = _channel.getProtocolSession(); - _virtualHost = _protocolSession.getVirtualHost(); - _queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost); - _messageStore = (TestableMemoryMessageStore)_virtualHost.getMessageStore(); - } - - @Override - protected void tearDown() throws Exception - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - - private void publishMessages(int count) throws AMQException - { - publishMessages(count, false); - } - - private void publishMessages(int count, boolean persistent) throws AMQException - { - _queue.registerSubscription(_subscription,false); - for (int i = 1; i <= count; i++) - { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. - MessagePublishInfo publishBody = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return new AMQShortString("someExchange"); - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return new AMQShortString("rk"); - } - }; - final IncomingMessage msg = new IncomingMessage(publishBody); - //IncomingMessage msg2 = null; - BasicContentHeaderProperties b = new BasicContentHeaderProperties(); - ContentHeaderBody cb = new ContentHeaderBody(); - cb.setProperties(b); - - if (persistent) - { - //This is DeliveryMode.PERSISTENT - b.setDeliveryMode((byte) 2); - } - - msg.setContentHeaderBody(cb); - - // we increment the reference here since we are not delivering the messaging to any queues, which is where - // the reference is normally incremented. The test is easier to construct if we have direct access to the - // subscription - ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); - qs.add(_queue); - msg.enqueue(qs); - MessageMetaData mmd = msg.headersReceived(System.currentTimeMillis()); - final StoredMessage storedMessage = _messageStore.addMessage(mmd); - msg.setStoredMessage(storedMessage); - final AMQMessage message = new AMQMessage(storedMessage); - if(msg.allContentReceived()) - { - ServerTransaction txn = new AutoCommitTransaction(_messageStore); - txn.enqueue(_queue, message, new ServerTransaction.Action() { - public void postCommit() - { - try - { - - _queue.enqueue(message); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void onRollback() - { - //To change body of implemented methods use File | Settings | File Templates. - } - }); - - } - // we manually send the message to the subscription - //_subscription.send(new QueueEntry(_queue,msg), _queue); - } - try - { - Thread.sleep(2000L); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - } - - } - - /** - * Tests that the acknowledgements are correctly associated with a channel and - * order is preserved when acks are enabled - */ - public void testAckChannelAssociationTest() throws AMQException - { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); - final int msgCount = 10; - publishMessages(msgCount, true); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertEquals("Unextpected size for unacknowledge message map",msgCount,map.size()); - - Set<Long> deliveryTagSet = map.getDeliveryTags(); - int i = 1; - for (long deliveryTag : deliveryTagSet) - { - assertTrue(deliveryTag == i); - i++; - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); - } - - } - - /** - * Tests that in no-ack mode no messages are retained - */ - public void testNoAckMode() throws AMQException - { - // false arg means no acks expected - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager()); - final int msgCount = 10; - publishMessages(msgCount); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageCount() == 0); - - - } - - /** - * Tests that in no-ack mode no messages are retained - */ - public void testPersistentNoAckMode() throws AMQException - { - // false arg means no acks expected - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager()); - final int msgCount = 10; - publishMessages(msgCount, true); - - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageCount() == 0); - - - } - - /** - * Tests that a single acknowledgement is handled correctly (i.e multiple flag not - * set case) - */ - public void testSingleAckReceivedTest() throws AMQException - { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); - final int msgCount = 10; - publishMessages(msgCount); - - _channel.acknowledgeMessage(5, false); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertEquals("Map not expected size",msgCount - 1,map.size()); - - Set<Long> deliveryTagSet = map.getDeliveryTags(); - int i = 1; - for (long deliveryTag : deliveryTagSet) - { - assertTrue(deliveryTag == i); - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); - // 5 is the delivery tag of the message that *should* be removed - if (++i == 5) - { - ++i; - } - } - } - - /** - * Tests that a single acknowledgement is handled correctly (i.e multiple flag not - * set case) - */ - public void testMultiAckReceivedTest() throws AMQException - { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); - final int msgCount = 10; - publishMessages(msgCount); - - - - _channel.acknowledgeMessage(5, true); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 5); - - Set<Long> deliveryTagSet = map.getDeliveryTags(); - int i = 1; - for (long deliveryTag : deliveryTagSet) - { - assertTrue(deliveryTag == i + 5); - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); - ++i; - } - } - - /** - * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs. - */ - public void testMultiAckAllReceivedTest() throws AMQException - { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); - final int msgCount = 10; - publishMessages(msgCount); - - _channel.acknowledgeMessage(0, true); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 0); - - Set<Long> deliveryTagSet = map.getDeliveryTags(); - int i = 1; - for (long deliveryTag : deliveryTagSet) - { - assertTrue(deliveryTag == i + 5); - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); - ++i; - } - } - - /** - * A regression fixing QPID-1136 showed this up - * - * @throws Exception - */ - public void testMessageDequeueRestoresCreditTest() throws Exception - { - // Send 10 messages - Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); - - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, - DEFAULT_CONSUMER_TAG, true, null, false, creditManager); - final int msgCount = 1; - publishMessages(msgCount); - - _queue.deliverAsync(_subscription); - - _channel.acknowledgeMessage(1, false); - - // Check credit available - assertTrue("No credit available", creditManager.hasCredit()); - - } - - -/* - public void testPrefetchHighLow() throws AMQException - { - int lowMark = 5; - int highMark = 10; - - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); - _channel.setPrefetchLowMarkCount(lowMark); - _channel.setPrefetchHighMarkCount(highMark); - - assertTrue(_channel.getPrefetchLowMarkCount() == lowMark); - assertTrue(_channel.getPrefetchHighMarkCount() == highMark); - - publishMessages(highMark); - - // at this point we should have sent out only highMark messages - // which have not bee received so will be queued up in the channel - // which should be suspended - assertTrue(_subscription.isSuspended()); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == highMark); - - //acknowledge messages so we are just above lowMark - _channel.acknowledgeMessage(lowMark - 1, true); - - //we should still be suspended - assertTrue(_subscription.isSuspended()); - assertTrue(map.size() == lowMark + 1); - - //acknowledge one more message - _channel.acknowledgeMessage(lowMark, true); - - //and suspension should be lifted - assertTrue(!_subscription.isSuspended()); - - //pubilsh more msgs so we are just below the limit - publishMessages(lowMark - 1); - - //we should not be suspended - assertTrue(!_subscription.isSuspended()); - - //acknowledge all messages - _channel.acknowledgeMessage(0, true); - try - { - Thread.sleep(3000); - } - catch (InterruptedException e) - { - _log.error("Error: " + e, e); - } - //map will be empty - assertTrue(map.size() == 0); - } - -*/ -/* - public void testPrefetch() throws AMQException - { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); - _channel.setMessageCredit(5); - - assertTrue(_channel.getPrefetchCount() == 5); - - final int msgCount = 5; - publishMessages(msgCount); - - // at this point we should have sent out only 5 messages with a further 5 queued - // up in the channel which should now be suspended - assertTrue(_subscription.isSuspended()); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 5); - _channel.acknowledgeMessage(5, true); - assertTrue(!_subscription.isSuspended()); - try - { - Thread.sleep(3000); - } - catch (InterruptedException e) - { - _log.error("Error: " + e, e); - } - assertTrue(map.size() == 0); - } - -*/ - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(AckTest.class); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java deleted file mode 100644 index a9eb0ebfe7..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - - -import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -import java.util.List; - -public class AcknowledgeTest extends QpidTestCase -{ - private AMQChannel _channel; - private SimpleAMQQueue _queue; - private MessageStore _messageStore; - private String _queueName; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _channel = BrokerTestHelper_0_8.createChannel(); - VirtualHost virtualHost = _channel.getVirtualHost(); - _queueName = getTestName(); - _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); - _messageStore = virtualHost.getMessageStore(); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_channel != null) - { - _channel.getVirtualHost().close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - private AMQChannel getChannel() - { - return _channel; - } - - private InternalTestProtocolSession getSession() - { - return (InternalTestProtocolSession)_channel.getProtocolSession(); - } - - private SimpleAMQQueue getQueue() - { - return _queue; - } - - public void testTransactionalSingleAck() throws AMQException - { - getChannel().setLocalTransactional(); - runMessageAck(1, 1, 1, false, 0); - } - - public void testTransactionalMultiAck() throws AMQException - { - getChannel().setLocalTransactional(); - runMessageAck(10, 1, 5, true, 5); - } - - public void testTransactionalAckAll() throws AMQException - { - getChannel().setLocalTransactional(); - runMessageAck(10, 1, 0, true, 0); - } - - public void testNonTransactionalSingleAck() throws AMQException - { - runMessageAck(1, 1, 1, false, 0); - } - - public void testNonTransactionalMultiAck() throws AMQException - { - runMessageAck(10, 1, 5, true, 5); - } - - public void testNonTransactionalAckAll() throws AMQException - { - runMessageAck(10, 1, 0, true, 0); - } - - protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, long acknowledgeDeliveryTag, boolean acknowldegeMultiple, int remainingUnackedMessages) throws AMQException - { - //Check store is empty - checkStoreContents(0); - - //Send required messsages to the queue - BrokerTestHelper_0_8.publishMessages(getChannel(), - sendMessageCount, - _queueName, - ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); - - if (getChannel().isTransactional()) - { - getChannel().commit(); - } - - //Ensure they are stored - checkStoreContents(sendMessageCount); - - //Check that there are no unacked messages - assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size()); - - //Subscribe to the queue - AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true); - - getQueue().deliverAsync(); - - //Wait for the messages to be delivered - getSession().awaitDelivery(sendMessageCount); - - //Check that they are all waiting to be acknoledged - assertEquals("Channel should have unacked msgs", sendMessageCount, getChannel().getUnacknowledgedMessageMap().size()); - - List<InternalTestProtocolSession.DeliveryPair> messages = getSession().getDelivers(getChannel().getChannelId(), subscriber, sendMessageCount); - - //Double check we received the right number of messages - assertEquals(sendMessageCount, messages.size()); - - //Check that the first message has the expected deliveryTag - assertEquals("First message does not have expected deliveryTag", firstDeliveryTag, messages.get(0).getDeliveryTag()); - - //Send required Acknowledgement - getChannel().acknowledgeMessage(acknowledgeDeliveryTag, acknowldegeMultiple); - - if (getChannel().isTransactional()) - { - getChannel().commit(); - } - - // Check Remaining Acknowledgements - assertEquals("Channel unacked msgs count incorrect", remainingUnackedMessages, getChannel().getUnacknowledgedMessageMap().size()); - - //Check store contents are also correct. - checkStoreContents(remainingUnackedMessages); - } - - private void checkStoreContents(int messageCount) - { - assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java deleted file mode 100644 index 0919607bd7..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class BrokerTestHelper_0_8 extends BrokerTestHelper -{ - - public static AMQChannel createChannel(int channelId, AMQProtocolSession session) throws AMQException - { - AMQChannel channel = new AMQChannel(session, channelId, session.getVirtualHost().getMessageStore()); - session.addChannel(channel); - return channel; - } - - public static AMQChannel createChannel(int channelId) throws Exception - { - InternalTestProtocolSession session = createProtocolSession(); - return createChannel(channelId, session); - } - - public static AMQChannel createChannel() throws Exception - { - return createChannel(1); - } - - public static InternalTestProtocolSession createProtocolSession() throws Exception - { - return createProtocolSession("test"); - } - - public static InternalTestProtocolSession createProtocolSession(String hostName) throws Exception - { - VirtualHost virtualHost = createVirtualHost(hostName); - return new InternalTestProtocolSession(virtualHost, createBrokerMock()); - } - - public static void publishMessages(AMQChannel channel, int numberOfMessages, String queueName, String exchangeName) throws AMQException - { - AMQShortString rouningKey = new AMQShortString(queueName); - AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName); - MessagePublishInfo info = mock(MessagePublishInfo.class); - when(info.getExchange()).thenReturn(exchangeNameAsShortString); - when(info.getRoutingKey()).thenReturn(rouningKey); - - Exchange exchange = channel.getVirtualHost().getExchange(exchangeName); - for (int count = 0; count < numberOfMessages; count++) - { - channel.setPublishFrame(info, exchange); - - // Set the body size - ContentHeaderBody _headerBody = new ContentHeaderBody(); - _headerBody.setBodySize(0); - - // Set Minimum properties - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - - properties.setExpiration(0L); - properties.setTimestamp(System.currentTimeMillis()); - - // Make Message Persistent - properties.setDeliveryMode((byte) 2); - - _headerBody.setProperties(properties); - - channel.publishContentHeader(_headerBody); - } - channel.sync(); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java deleted file mode 100644 index 7f36b4a081..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import junit.framework.TestCase; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockAMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.QueueEntryIterator; -import org.apache.qpid.server.queue.SimpleQueueEntryList; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.subscription.MockSubscription; -import org.apache.qpid.server.subscription.Subscription; - -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; - -/** - * QPID-1385 : Race condition between added to unacked map and resending due to a rollback. - * - * In AMQChannel _unackedMap.clear() was done after the visit. This meant that the clear was not in the same - * synchronized block as as the preparation to resend. - * - * This clearing/prep for resend was done as a result of the rollback call. HOWEVER, the delivery thread was still - * in the process of sending messages to the client. It is therefore possible that a message could block on the - * _unackedMap lock waiting for the visit to compelete so that it can add the new message to the unackedMap.... - * which is then cleared by the resend/rollback thread. - * - * This problem was encountered by the testSend2ThenRollback test. - * - * To try and increase the chance of the race condition occuring this test will send multiple messages so that the - * delivery thread will be in progress while the rollback method is called. Hopefully this will cause the - * deliveryTag to be lost - */ -public class ExtractResendAndRequeueTest extends TestCase -{ - - private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap; - private static final int INITIAL_MSG_COUNT = 10; - private AMQQueue _queue = new MockAMQQueue(getName()); - private MessageStore _messageStore = new TestMemoryMessageStore(); - private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>(); - - @Override - public void setUp() throws AMQException - { - _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100); - - long id = 0; - SimpleQueueEntryList list = new SimpleQueueEntryList(_queue); - - // Add initial messages to QueueEntryList - for (int count = 0; count < INITIAL_MSG_COUNT; count++) - { - AMQMessage msg = new MockAMQMessage(id); - - list.add(msg); - - //Increment ID; - id++; - } - - // Iterate through the QueueEntryList and add entries to unacknowledgeMessageMap and referecenList - QueueEntryIterator queueEntries = list.iterator(); - while(queueEntries.advance()) - { - QueueEntry entry = queueEntries.getNode(); - _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry); - - // Store the entry for future inspection - _referenceList.add(entry); - } - - assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size()); - } - - /** - * Helper method to create a new subscription and aquire the given messages. - * - * @param messageList The messages to aquire - * - * @return Subscription that performed the aquire - */ - private Subscription createSubscriptionAndAquireMessages(LinkedList<QueueEntry> messageList) - { - Subscription subscription = new MockSubscription(); - - // Aquire messages in subscription - for (QueueEntry entry : messageList) - { - entry.acquire(subscription); - } - - return subscription; - } - - /** - * This is the normal consumer rollback method. - * - * An active consumer that has aquired messages expects those messasges to be reset when rollback is requested. - * - * This test validates that the msgToResend map includes all the messages and none are left behind. - * - * @throws AMQException the visit interface throws this - */ - public void testResend() throws AMQException - { - //We don't need the subscription object here. - createSubscriptionAndAquireMessages(_referenceList); - - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - - // requeueIfUnabletoResend doesn't matter here. - _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, true, _messageStore)); - - assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size()); - assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size()); - assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); - } - - /** - * This is the normal consumer close method. - * - * When a consumer that has aquired messages expects closes the messages that it has aquired should be removed from - * the unacknowledgeMap and placed in msgToRequeue - * - * This test validates that the msgToRequeue map includes all the messages and none are left behind. - * - * @throws AMQException the visit interface throws this - */ - public void testRequeueDueToSubscriptionClosure() throws AMQException - { - Subscription subscription = createSubscriptionAndAquireMessages(_referenceList); - - // Close subscription - subscription.close(); - - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - - // requeueIfUnabletoResend doesn't matter here. - _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, true, _messageStore)); - - assertEquals("Message count for resend not correct.", 0, msgToResend.size()); - assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size()); - assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); - } - - /** - * If the subscription is null, due to message being retrieved via a GET, And we request that messages are requeued - * requeueIfUnabletoResend(set to true) then all messages should be sent to the msgToRequeue map. - * - * @throws AMQException the visit interface throws this - */ - - public void testRequeueDueToMessageHavingNoConsumerTag() throws AMQException - { - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - - // requeueIfUnabletoResend = true so all messages should go to msgToRequeue - _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, true, _messageStore)); - - assertEquals("Message count for resend not correct.", 0, msgToResend.size()); - assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size()); - assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); - } - - /** - * If the subscription is null, due to message being retrieved via a GET, And we request that we don't - * requeueIfUnabletoResend(set to false) then all messages should be dropped as we do not have a dead letter queue. - * - * @throws AMQException the visit interface throws this - */ - - public void testDrop() throws AMQException - { - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - - // requeueIfUnabletoResend = false so all messages should be dropped all maps should be empty - _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, false, _messageStore)); - - assertEquals("Message count for resend not correct.", 0, msgToResend.size()); - assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size()); - assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); - - - for (QueueEntry entry : _referenceList) - { - assertTrue("Message was not discarded", entry.isDeleted()); - } - - } - - /** - * If the subscription is null, due to message being retrieved via a GET, AND the queue upon which the message was - * delivered has been deleted then it is not possible to requeue. Currently we simply discar the message but in the - * future we may wish to dead letter the message. - * - * Validate that at the end of the visit all Maps are empty and all messages are marked as deleted - * - * @throws AMQException the visit interface throws this - */ - public void testDiscard() throws AMQException - { - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - - _queue.delete(); - - // requeueIfUnabletoResend : value doesn't matter here as queue has been deleted - _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, false, _messageStore)); - - assertEquals("Message count for resend not correct.", 0, msgToResend.size()); - assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size()); - assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); - - for (QueueEntry entry : _referenceList) - { - assertTrue("Message was not discarded", entry.isDeleted()); - } - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java deleted file mode 100644 index e8fda2bc65..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine; -import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; -import org.apache.qpid.server.security.auth.UsernamePrincipal; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.protocol.v0_8.SubscriptionImpl; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.TestNetworkConnection; - -import javax.security.auth.Subject; - -import java.security.Principal; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter -{ - // ChannelID(LIST) -> LinkedList<Pair> - private final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers; - private AtomicInteger _deliveryCount = new AtomicInteger(0); - private static final AtomicLong ID_GENERATOR = new AtomicLong(0); - - public InternalTestProtocolSession(VirtualHost virtualHost, Broker broker) throws AMQException - { - super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null); - - _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>(); - - setTestAuthorizedSubject(); - setVirtualHost(virtualHost); - } - - private void setTestAuthorizedSubject() - { - Principal principal = new AuthenticatedPrincipal(new UsernamePrincipal("InternalTestProtocolSession")); - Subject authorizedSubject = new Subject( - true, - Collections.singleton(principal), - Collections.emptySet(), - Collections.emptySet()); - - setAuthorizedSubject(authorizedSubject); - } - - public ProtocolOutputConverter getProtocolOutputConverter() - { - return this; - } - - public byte getProtocolMajorVersion() - { - return (byte) 8; - } - - public void writeReturn(MessagePublishInfo messagePublishInfo, - ContentHeaderBody header, - MessageContentSource msgContent, - int channelId, - int replyCode, - AMQShortString replyText) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public byte getProtocolMinorVersion() - { - return (byte) 0; - } - - // *** - - public List<DeliveryPair> getDelivers(int channelId, AMQShortString consumerTag, int count) - { - synchronized (_channelDelivers) - { - List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag); - - if (all == null) - { - return new ArrayList<DeliveryPair>(0); - } - - List<DeliveryPair> msgs = all.subList(0, count); - - List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs); - - //Remove the msgs from the receivedList. - msgs.clear(); - - return response; - } - } - - // *** ProtocolOutputConverter Implementation - public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException - { - } - - public ClientDeliveryMethod createDeliveryMethod(int channelId) - { - return new InternalWriteDeliverMethod(channelId); - } - - public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) - { - } - - public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException - { - _deliveryCount.incrementAndGet(); - - synchronized (_channelDelivers) - { - Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId); - - if (consumers == null) - { - consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>(); - _channelDelivers.put(channelId, consumers); - } - - LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag); - - if (consumerDelivers == null) - { - consumerDelivers = new LinkedList<DeliveryPair>(); - consumers.put(consumerTag, consumerDelivers); - } - - consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage())); - } - } - - public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException - { - } - - public void awaitDelivery(int msgs) - { - while (msgs > _deliveryCount.get()) - { - try - { - Thread.sleep(100); - } - catch (InterruptedException e) - { - e.printStackTrace(); - } - } - } - - public class DeliveryPair - { - private long _deliveryTag; - private AMQMessage _message; - - public DeliveryPair(long deliveryTag, AMQMessage message) - { - _deliveryTag = deliveryTag; - _message = message; - } - - public AMQMessage getMessage() - { - return _message; - } - - public long getDeliveryTag() - { - return _deliveryTag; - } - } - - public void closeProtocolSession() - { - // Override as we don't have a real IOSession to close. - // The alternative is to fully implement the TestIOSession to return a CloseFuture from close(); - // Then the AMQMinaProtocolSession can join on the returning future without a NPE. - } - - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException - { - super.closeSession(session, cause, message); - - //Simulate the Client responding with a CloseOK - // should really update the StateManger but we don't have access here - // changeState(AMQState.CONNECTION_CLOSED); - ((AMQChannel)session).getProtocolSession().closeSession(); - - } - - private class InternalWriteDeliverMethod implements ClientDeliveryMethod - { - private int _channelId; - - public InternalWriteDeliverMethod(int channelId) - { - _channelId = channelId; - } - - - public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException - { - _deliveryCount.incrementAndGet(); - - synchronized (_channelDelivers) - { - Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId); - - if (consumers == null) - { - consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>(); - _channelDelivers.put(_channelId, consumers); - } - - LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag()); - - if (consumerDelivers == null) - { - consumerDelivers = new LinkedList<DeliveryPair>(); - consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers); - } - - consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage())); - } - } - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java deleted file mode 100644 index a77475c05f..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; - -/** Test class to test MBean operations for AMQMinaProtocolSession. */ -public class MaxChannelsTest extends QpidTestCase -{ - private AMQProtocolEngine _session; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _session = BrokerTestHelper_0_8.createProtocolSession(); - } - - public void testChannels() throws Exception - { - // check the channel count is correct - int channelCount = _session.getChannels().size(); - assertEquals("Initial channel count wrong", 0, channelCount); - - long maxChannels = 10L; - _session.setMaximumNumberOfChannels(maxChannels); - assertEquals("Number of channels not correctly set.", new Long(maxChannels), _session.getMaximumNumberOfChannels()); - - for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++) - { - _session.addChannel(new AMQChannel(_session, (int) currentChannel, null)); - } - - try - { - _session.addChannel(new AMQChannel(_session, (int) maxChannels, null)); - fail("Cannot create more channels then maximum"); - } - catch (AMQException e) - { - assertEquals("Wrong exception recevied.", e.getErrorCode(), AMQConstant.NOT_ALLOWED); - } - assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size())); - } - - @Override - public void tearDown() throws Exception - { - try - { - _session.getVirtualHost().close(); - _session.closeSession(); - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockAMQMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockAMQMessage.java deleted file mode 100644 index 1cc3607298..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockAMQMessage.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -public class MockAMQMessage extends AMQMessage -{ - public MockAMQMessage(long messageId) - { - super(new MockStoredMessage(messageId)); - } - - public MockAMQMessage(long messageId, String headerName, Object headerValue) - { - super(new MockStoredMessage(messageId, headerName, headerValue)); - } - - @Override - public long getSize() - { - return 0l; - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockMessagePublishInfo.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockMessagePublishInfo.java deleted file mode 100644 index ab29e58a6c..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockMessagePublishInfo.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; - -public class MockMessagePublishInfo implements MessagePublishInfo -{ - public AMQShortString getExchange() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isMandatory() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public AMQShortString getRoutingKey() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java deleted file mode 100755 index 15573a871f..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java +++ /dev/null @@ -1,115 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; - -import java.nio.ByteBuffer; - -public class MockStoredMessage implements StoredMessage<MessageMetaData> -{ - private long _messageId; - private MessageMetaData _metaData; - private final ByteBuffer _content; - - public MockStoredMessage(long messageId) - { - this(messageId, (String)null, null); - } - - public MockStoredMessage(long messageId, String headerName, Object headerValue) - { - this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60), headerName, headerValue); - } - - public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb) - { - this(messageId, info, chb, null, null); - } - - public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb, String headerName, Object headerValue) - { - _messageId = messageId; - if (headerName != null) - { - FieldTable headers = new FieldTable(); - headers.setString(headerName, headerValue == null? null :String.valueOf(headerValue)); - ((BasicContentHeaderProperties)chb.getProperties()).setHeaders(headers); - } - _metaData = new MessageMetaData(info, chb, 0); - _content = ByteBuffer.allocate(_metaData.getContentSize()); - } - - public MessageMetaData getMetaData() - { - return _metaData; - } - - public long getMessageNumber() - { - return _messageId; - } - - public void addContent(int offsetInMessage, ByteBuffer src) - { - src = src.duplicate(); - ByteBuffer dst = _content.duplicate(); - dst.position(offsetInMessage); - dst.put(src); - } - - public int getContent(int offset, ByteBuffer dst) - { - ByteBuffer src = _content.duplicate(); - src.position(offset); - src = src.slice(); - if(dst.remaining() < src.limit()) - { - src.limit(dst.remaining()); - } - dst.put(src); - return src.limit(); - } - - - - public ByteBuffer getContent(int offsetInMessage, int size) - { - ByteBuffer buf = ByteBuffer.allocate(size); - getContent(offsetInMessage, buf); - buf.position(0); - return buf; - } - - public StoreFuture flushToStore() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - public void remove() - { - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java deleted file mode 100644 index 21142e7ab6..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.AMQException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -import java.util.List; - -public class QueueBrowserUsesNoAckTest extends QpidTestCase -{ - private AMQChannel _channel; - private SimpleAMQQueue _queue; - private MessageStore _messageStore; - private String _queueName; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _channel = BrokerTestHelper_0_8.createChannel(); - VirtualHost virtualHost = _channel.getVirtualHost(); - _queueName = getTestName(); - _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); - _messageStore = virtualHost.getMessageStore(); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_channel != null) - { - _channel.getVirtualHost().close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - private AMQChannel getChannel() - { - return _channel; - } - - private InternalTestProtocolSession getSession() - { - return (InternalTestProtocolSession)_channel.getProtocolSession(); - } - - private SimpleAMQQueue getQueue() - { - return _queue; - } - - public void testQueueBrowserUsesNoAck() throws AMQException - { - int sendMessageCount = 2; - int prefetch = 1; - - //Check store is empty - checkStoreContents(0); - - //Send required messsages to the queue - BrokerTestHelper_0_8.publishMessages(getChannel(), - sendMessageCount, - _queueName, - ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); - - //Ensure they are stored - checkStoreContents(sendMessageCount); - - //Check that there are no unacked messages - assertEquals("Channel should have no unacked msgs ", 0, - getChannel().getUnacknowledgedMessageMap().size()); - - //Set the prefetch on the session to be less than the sent messages - getChannel().setCredit(0, prefetch); - - //browse the queue - AMQShortString browser = browse(getChannel(), getQueue()); - - getQueue().deliverAsync(); - - //Wait for messages to fill the prefetch - getSession().awaitDelivery(prefetch); - - //Get those messages - List<InternalTestProtocolSession.DeliveryPair> messages = - getSession().getDelivers(getChannel().getChannelId(), browser, - prefetch); - - //Ensure we recevied the prefetched messages - assertEquals(prefetch, messages.size()); - - //Check the process didn't suspend the subscription as this would - // indicate we are using the prefetch credit. i.e. using acks not No-Ack - assertTrue("The subscription has been suspended", - !getChannel().getSubscription(browser).getState() - .equals(Subscription.State.SUSPENDED)); - } - - private void checkStoreContents(int messageCount) - { - assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); - } - - private AMQShortString browse(AMQChannel channel, AMQQueue queue) throws AMQException - { - FieldTable filters = new FieldTable(); - filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - - return channel.subscribeToQueue(null, queue, true, filters, false, true); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java deleted file mode 100644 index 87fbcfa9b3..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.test.utils.QpidTestCase; - -/** - * Tests that reference counting works correctly with AMQMessage and the message store - */ -public class ReferenceCountingTest extends QpidTestCase -{ - private TestableMemoryMessageStore _store; - - - protected void setUp() throws Exception - { - _store = new TestableMemoryMessageStore(); - } - - /** - * Check that when the reference count is decremented the message removes itself from the store - */ - public void testMessageGetsRemoved() throws AMQException - { - ContentHeaderBody chb = createPersistentContentHeader(); - - MessagePublishInfo info = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - }; - - - - MessageMetaData mmd = new MessageMetaData(info, chb, 0); - StoredMessage storedMessage = _store.addMessage(mmd); - - - AMQMessage message = new AMQMessage(storedMessage); - - MessageReference ref = message.newReference(); - - assertEquals(1, _store.getMessageCount()); - - ref.release(); - - assertEquals(0, _store.getMessageCount()); - } - - private ContentHeaderBody createPersistentContentHeader() - { - ContentHeaderBody chb = new ContentHeaderBody(); - BasicContentHeaderProperties bchp = new BasicContentHeaderProperties(); - bchp.setDeliveryMode((byte)2); - chb.setProperties(bchp); - return chb; - } - - public void testMessageRemains() throws AMQException - { - - MessagePublishInfo info = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - }; - - final ContentHeaderBody chb = createPersistentContentHeader(); - - MessageMetaData mmd = new MessageMetaData(info, chb, 0); - StoredMessage storedMessage = _store.addMessage(mmd); - - AMQMessage message = new AMQMessage(storedMessage); - - - MessageReference ref = message.newReference(); - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - - assertEquals(1, _store.getMessageCount()); - MessageReference ref2 = message.newReference(); - ref.release(); - assertEquals(1, _store.getMessageCount()); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(ReferenceCountingTest.class); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java deleted file mode 100644 index e98dd63450..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.logging.UnitTestMessageLogger; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; - -public class SubscriptionFactoryImplTest extends QpidTestCase -{ - private AMQChannel _channel; - private AMQProtocolSession _session; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _channel = BrokerTestHelper_0_8.createChannel(); - _session = _channel.getProtocolSession(); - GenericActor.setDefaultMessageLogger(new UnitTestMessageLogger(false)); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_channel != null) - { - _channel.getVirtualHost().close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - /** - * Tests that while creating Subscriptions of various types, the - * ID numbers assigned are allocated from a common sequence - * (in increasing order). - */ - public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception - { - //create a No-Ack subscription, get the first Subscription ID - long previousId = 0; - Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), false, null, false, _channel.getCreditManager()); - previousId = noAckSub.getSubscriptionID(); - - //create an ack subscription, verify the next Subscription ID is used - Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID()); - previousId = ackSub.getSubscriptionID(); - - //create a browser subscription - FieldTable filters = new FieldTable(); - filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - Subscription browerSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, browerSub.getSubscriptionID()); - previousId = browerSub.getSubscriptionID(); - - //create an BasicGet NoAck subscription - Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(_channel, _session, new AMQShortString("1"), null, false, - _channel.getCreditManager(),_channel.getClientDeliveryMethod(), _channel.getRecordDeliveryMethod()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID()); - previousId = getNoAckSub.getSubscriptionID(); - - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index e0b2a90cb0..d348c3e67b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -22,7 +22,6 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.queue.QueueEntry.EntryState; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index 806606dd0f..7add2d4d43 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -23,7 +23,6 @@ import java.util.Collections; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.message.ServerMessage; import java.util.Arrays; |
