summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-16 10:49:37 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-16 10:49:37 +0000
commit42b37635a7b0252c7cdaae11b431d8d3c67cbae9 (patch)
tree263350daf0522ba805f736b86185a96ad95f7824 /qpid/java/broker/src/test
parent403007b67551a262b62ada16429f4f305c2d93fd (diff)
downloadqpid-python-42b37635a7b0252c7cdaae11b431d8d3c67cbae9.tar.gz
QPID-4659 : [Java Broker] move amqp 0-8 implementation into a plugin
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java205
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java141
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java72
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java436
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java181
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java99
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java253
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java269
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java83
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockAMQMessage.java40
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockMessagePublishInfo.java52
-rwxr-xr-xqpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java115
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java149
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java162
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java96
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java1
18 files changed, 0 insertions, 2356 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
index d413c4d4c9..701ccaab47 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
@@ -24,7 +24,6 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.NullRootMessageLogger;
import org.apache.qpid.server.util.BrokerTestHelper;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
deleted file mode 100644
index 0d4b4ba9f8..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.protocol;
-
-import static org.mockito.Mockito.when;
-
-import java.nio.ByteBuffer;
-import java.util.EnumSet;
-import java.util.Set;
-
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.TestNetworkConnection;
-
-public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
-{
- private VirtualHost _virtualHost;
- private Broker _broker;
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
- _broker = BrokerTestHelper.createBrokerMock();
- VirtualHostRegistry virtualHostRegistry = _broker.getVirtualHostRegistry();
- when(_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)).thenReturn("default");
-
- // AMQP 1-0 connection needs default vhost to be present
- _virtualHost = BrokerTestHelper.createVirtualHost("default", virtualHostRegistry);
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- try
- {
- _virtualHost.close();
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- private static final byte[] AMQP_0_8_HEADER =
- new byte[] { (byte) 'A',
- (byte) 'M',
- (byte) 'Q',
- (byte) 'P',
- (byte) 1,
- (byte) 1,
- (byte) 8,
- (byte) 0
- };
-
- private static final byte[] AMQP_0_9_HEADER =
- new byte[] { (byte) 'A',
- (byte) 'M',
- (byte) 'Q',
- (byte) 'P',
- (byte) 1,
- (byte) 1,
- (byte) 0,
- (byte) 9
- };
-
- private static final byte[] AMQP_0_9_1_HEADER =
- new byte[] { (byte) 'A',
- (byte) 'M',
- (byte) 'Q',
- (byte) 'P',
- (byte) 0,
- (byte) 0,
- (byte) 9,
- (byte) 1
- };
-
-
- private static final byte[] AMQP_0_10_HEADER =
- new byte[] { (byte) 'A',
- (byte) 'M',
- (byte) 'Q',
- (byte) 'P',
- (byte) 1,
- (byte) 1,
- (byte) 0,
- (byte) 10
- };
-
-
- private static final byte[] AMQP_1_0_0_HEADER =
- new byte[] {
- (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte) 0,
- (byte) 1,
- (byte) 0,
- (byte) 0
- };
-
-
- private byte[] getAmqpHeader(final AmqpProtocolVersion version)
- {
- switch(version)
- {
- case v0_8:
- return AMQP_0_8_HEADER;
- case v0_9:
- return AMQP_0_9_HEADER;
- case v0_9_1:
- return AMQP_0_9_1_HEADER;
- case v0_10:
- return AMQP_0_10_HEADER;
- case v1_0_0:
- return AMQP_1_0_0_HEADER;
- default:
- fail("unknown AMQP version, appropriate header must be added for new protocol version");
- return null;
- }
- }
-
- /**
- * Test to verify that connections established using a MultiVersionProtocolEngine are assigned
- * IDs from a common sequence, independent of the protocol version under use.
- */
- public void testDifferentProtocolVersionsShareCommonIDNumberingSequence()
- {
- Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class);
-
- MultiVersionProtocolEngineFactory factory =
- new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, null, null,
- org.apache.qpid.server.model.Transport.TCP);
-
- //create a dummy to retrieve the 'current' ID number
- long previousId = factory.newProtocolEngine().getConnectionId();
-
- //create a protocol engine and send the AMQP header for all supported AMQP verisons,
- //ensuring the ID assigned increases as expected
- for(AmqpProtocolVersion version : versions)
- {
- long expectedID = previousId + 1;
- byte[] header = getAmqpHeader(version);
- assertNotNull("protocol header should not be null", header);
-
- ServerProtocolEngine engine = factory.newProtocolEngine();
- TestNetworkConnection conn = new TestNetworkConnection();
- engine.setNetworkConnection(conn, conn.getSender());
- assertEquals("ID did not increment as expected", expectedID, engine.getConnectionId());
-
- //actually feed in the AMQP header for this protocol version, and ensure the ID remains consistent
- engine.received(ByteBuffer.wrap(header));
- assertEquals("ID was not as expected following receipt of the AMQP version header", expectedID, engine.getConnectionId());
-
- previousId = expectedID;
- engine.closed();
- }
- }
-
- /**
- * Test to verify that when requesting a ProtocolEngineFactory to produce engines having a default reply to unsupported
- * version initiations, there is enforcement that the default reply is itself a supported protocol version.
- */
- public void testUnsupportedDefaultReplyCausesIllegalArgumentException()
- {
- Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class);
- versions.remove(AmqpProtocolVersion.v0_9);
-
- try
- {
- new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, AmqpProtocolVersion.v0_9, null,
- org.apache.qpid.server.model.Transport.TCP);
- fail("should not have been allowed to create the factory");
- }
- catch(IllegalArgumentException iae)
- {
- //expected
- }
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
deleted file mode 100644
index b358c7c5c5..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class AMQChannelTest extends QpidTestCase
-{
- private VirtualHost _virtualHost;
- private AMQProtocolSession _protocolSession;
- private Map<Integer,String> _replies;
- private Broker _broker;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
- _virtualHost = BrokerTestHelper.createVirtualHost(getTestName());
- _broker = BrokerTestHelper.createBrokerMock();
- _protocolSession = new InternalTestProtocolSession(_virtualHost, _broker)
- {
- @Override
- public void writeReturn(MessagePublishInfo messagePublishInfo,
- ContentHeaderBody header,
- MessageContentSource msgContent,
- int channelId,
- int replyCode,
- AMQShortString replyText) throws AMQException
- {
- _replies.put(replyCode, replyText.asString());
- }
- };
- _replies = new HashMap<Integer, String>();
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- _virtualHost.close();
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- public void testCompareTo() throws Exception
- {
- AMQChannel channel1 = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore());
-
- // create a channel with the same channelId but on a different session
- AMQChannel channel2 = new AMQChannel(new InternalTestProtocolSession(_virtualHost, _broker), 1, _virtualHost.getMessageStore());
- assertFalse("Unexpected compare result", channel1.compareTo(channel2) == 0);
- assertEquals("Unexpected compare result", 0, channel1.compareTo(channel1));
- }
-
- public void testPublishContentHeaderWhenMessageAuthorizationFails() throws Exception
- {
- setTestSystemProperty(BrokerProperties.PROPERTY_MSG_AUTH, "true");
- AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore());
- channel.setLocalTransactional();
-
- MessagePublishInfo info = mock(MessagePublishInfo.class);
- Exchange e = mock(Exchange.class);
- ContentHeaderBody contentHeaderBody= mock(ContentHeaderBody.class);
- BasicContentHeaderProperties properties = mock(BasicContentHeaderProperties.class);
-
- when(contentHeaderBody.getProperties()).thenReturn(properties);
- when(info.getExchange()).thenReturn(new AMQShortString("test"));
- when(properties.getUserId()).thenReturn(new AMQShortString(_protocolSession.getAuthorizedPrincipal().getName() + "_incorrect"));
-
- channel.setPublishFrame(info, e);
- channel.publishContentHeader(contentHeaderBody);
- channel.commit();
-
- assertEquals("Unexpected number of replies", 1, _replies.size());
- assertEquals("Message authorization passed", "Access Refused", _replies.get(403));
- }
-
- public void testPublishContentHeaderWhenMessageAuthorizationPasses() throws Exception
- {
- setTestSystemProperty(BrokerProperties.PROPERTY_MSG_AUTH, "true");
- AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore());
- channel.setLocalTransactional();
-
- MessagePublishInfo info = mock(MessagePublishInfo.class);
- Exchange e = mock(Exchange.class);
- ContentHeaderBody contentHeaderBody= mock(ContentHeaderBody.class);
- BasicContentHeaderProperties properties = mock(BasicContentHeaderProperties.class);
-
- when(contentHeaderBody.getProperties()).thenReturn(properties);
- when(info.getExchange()).thenReturn(new AMQShortString("test"));
- when(properties.getUserId()).thenReturn(new AMQShortString(_protocolSession.getAuthorizedPrincipal().getName()));
-
- channel.setPublishFrame(info, e);
- channel.publishContentHeader(contentHeaderBody);
- channel.commit();
-
- assertEquals("Unexpected number of replies", 0, _replies.size());
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
deleted file mode 100644
index f5e58cfd02..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package org.apache.qpid.server.protocol.v0_8;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.properties.ConnectionStartProperties;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-public class AMQProtocolEngineTest extends QpidTestCase
-{
- private Broker _broker;
- private Port _port;
- private NetworkConnection _network;
- private Transport _transport;
-
- public void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
- _broker = BrokerTestHelper.createBrokerMock();
- when(_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(true);
-
- _port = mock(Port.class);
- _network = mock(NetworkConnection.class);
- _transport = Transport.TCP;
- }
-
- public void tearDown() throws Exception
- {
- try
- {
- super.tearDown();
- }
- finally
- {
- BrokerTestHelper.tearDown();
- }
- }
-
- public void testSetClientPropertiesForNoRouteProvidedAsString()
- {
- AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport);
- assertTrue("Unexpected closeWhenNoRoute before client properties set", engine.isCloseWhenNoRoute());
-
- Map<String, Object> clientProperties = new HashMap<String, Object>();
- clientProperties.put(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, Boolean.FALSE.toString());
- engine.setClientProperties(FieldTable.convertToFieldTable(clientProperties));
-
- assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute());
- }
-
- public void testSetClientPropertiesForNoRouteProvidedAsBoolean()
- {
- AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport);
- assertTrue("Unexpected closeWhenNoRoute before client properties set", engine.isCloseWhenNoRoute());
-
- Map<String, Object> clientProperties = new HashMap<String, Object>();
- clientProperties.put(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, Boolean.FALSE);
- engine.setClientProperties(FieldTable.convertToFieldTable(clientProperties));
-
- assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute());
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
deleted file mode 100644
index 4ab64ca100..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.flow.LimitlessCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-import java.util.ArrayList;
-import java.util.Set;
-
-/**
- * Tests that acknowledgements are handled correctly.
- */
-public class AckTest extends QpidTestCase
-{
- private Subscription _subscription;
-
- private AMQProtocolSession _protocolSession;
-
- private TestableMemoryMessageStore _messageStore;
-
- private AMQChannel _channel;
-
- private AMQQueue _queue;
-
- private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag");
- private VirtualHost _virtualHost;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
- _channel = BrokerTestHelper_0_8.createChannel(5);
- _protocolSession = _channel.getProtocolSession();
- _virtualHost = _protocolSession.getVirtualHost();
- _queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost);
- _messageStore = (TestableMemoryMessageStore)_virtualHost.getMessageStore();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
-
- private void publishMessages(int count) throws AMQException
- {
- publishMessages(count, false);
- }
-
- private void publishMessages(int count, boolean persistent) throws AMQException
- {
- _queue.registerSubscription(_subscription,false);
- for (int i = 1; i <= count; i++)
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- MessagePublishInfo publishBody = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return new AMQShortString("someExchange");
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public boolean isMandatory()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return new AMQShortString("rk");
- }
- };
- final IncomingMessage msg = new IncomingMessage(publishBody);
- //IncomingMessage msg2 = null;
- BasicContentHeaderProperties b = new BasicContentHeaderProperties();
- ContentHeaderBody cb = new ContentHeaderBody();
- cb.setProperties(b);
-
- if (persistent)
- {
- //This is DeliveryMode.PERSISTENT
- b.setDeliveryMode((byte) 2);
- }
-
- msg.setContentHeaderBody(cb);
-
- // we increment the reference here since we are not delivering the messaging to any queues, which is where
- // the reference is normally incremented. The test is easier to construct if we have direct access to the
- // subscription
- ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
- qs.add(_queue);
- msg.enqueue(qs);
- MessageMetaData mmd = msg.headersReceived(System.currentTimeMillis());
- final StoredMessage storedMessage = _messageStore.addMessage(mmd);
- msg.setStoredMessage(storedMessage);
- final AMQMessage message = new AMQMessage(storedMessage);
- if(msg.allContentReceived())
- {
- ServerTransaction txn = new AutoCommitTransaction(_messageStore);
- txn.enqueue(_queue, message, new ServerTransaction.Action() {
- public void postCommit()
- {
- try
- {
-
- _queue.enqueue(message);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void onRollback()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- });
-
- }
- // we manually send the message to the subscription
- //_subscription.send(new QueueEntry(_queue,msg), _queue);
- }
- try
- {
- Thread.sleep(2000L);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
-
- }
-
- /**
- * Tests that the acknowledgements are correctly associated with a channel and
- * order is preserved when acks are enabled
- */
- public void testAckChannelAssociationTest() throws AMQException
- {
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
- final int msgCount = 10;
- publishMessages(msgCount, true);
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertEquals("Unextpected size for unacknowledge message map",msgCount,map.size());
-
- Set<Long> deliveryTagSet = map.getDeliveryTags();
- int i = 1;
- for (long deliveryTag : deliveryTagSet)
- {
- assertTrue(deliveryTag == i);
- i++;
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
- }
-
- }
-
- /**
- * Tests that in no-ack mode no messages are retained
- */
- public void testNoAckMode() throws AMQException
- {
- // false arg means no acks expected
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
- final int msgCount = 10;
- publishMessages(msgCount);
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageCount() == 0);
-
-
- }
-
- /**
- * Tests that in no-ack mode no messages are retained
- */
- public void testPersistentNoAckMode() throws AMQException
- {
- // false arg means no acks expected
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
- final int msgCount = 10;
- publishMessages(msgCount, true);
-
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageCount() == 0);
-
-
- }
-
- /**
- * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
- * set case)
- */
- public void testSingleAckReceivedTest() throws AMQException
- {
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
- final int msgCount = 10;
- publishMessages(msgCount);
-
- _channel.acknowledgeMessage(5, false);
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertEquals("Map not expected size",msgCount - 1,map.size());
-
- Set<Long> deliveryTagSet = map.getDeliveryTags();
- int i = 1;
- for (long deliveryTag : deliveryTagSet)
- {
- assertTrue(deliveryTag == i);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
- // 5 is the delivery tag of the message that *should* be removed
- if (++i == 5)
- {
- ++i;
- }
- }
- }
-
- /**
- * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
- * set case)
- */
- public void testMultiAckReceivedTest() throws AMQException
- {
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
- final int msgCount = 10;
- publishMessages(msgCount);
-
-
-
- _channel.acknowledgeMessage(5, true);
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 5);
-
- Set<Long> deliveryTagSet = map.getDeliveryTags();
- int i = 1;
- for (long deliveryTag : deliveryTagSet)
- {
- assertTrue(deliveryTag == i + 5);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
- ++i;
- }
- }
-
- /**
- * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
- */
- public void testMultiAckAllReceivedTest() throws AMQException
- {
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
- final int msgCount = 10;
- publishMessages(msgCount);
-
- _channel.acknowledgeMessage(0, true);
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 0);
-
- Set<Long> deliveryTagSet = map.getDeliveryTags();
- int i = 1;
- for (long deliveryTag : deliveryTagSet)
- {
- assertTrue(deliveryTag == i + 5);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
- ++i;
- }
- }
-
- /**
- * A regression fixing QPID-1136 showed this up
- *
- * @throws Exception
- */
- public void testMessageDequeueRestoresCreditTest() throws Exception
- {
- // Send 10 messages
- Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
-
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
- DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
- final int msgCount = 1;
- publishMessages(msgCount);
-
- _queue.deliverAsync(_subscription);
-
- _channel.acknowledgeMessage(1, false);
-
- // Check credit available
- assertTrue("No credit available", creditManager.hasCredit());
-
- }
-
-
-/*
- public void testPrefetchHighLow() throws AMQException
- {
- int lowMark = 5;
- int highMark = 10;
-
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
- _channel.setPrefetchLowMarkCount(lowMark);
- _channel.setPrefetchHighMarkCount(highMark);
-
- assertTrue(_channel.getPrefetchLowMarkCount() == lowMark);
- assertTrue(_channel.getPrefetchHighMarkCount() == highMark);
-
- publishMessages(highMark);
-
- // at this point we should have sent out only highMark messages
- // which have not bee received so will be queued up in the channel
- // which should be suspended
- assertTrue(_subscription.isSuspended());
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == highMark);
-
- //acknowledge messages so we are just above lowMark
- _channel.acknowledgeMessage(lowMark - 1, true);
-
- //we should still be suspended
- assertTrue(_subscription.isSuspended());
- assertTrue(map.size() == lowMark + 1);
-
- //acknowledge one more message
- _channel.acknowledgeMessage(lowMark, true);
-
- //and suspension should be lifted
- assertTrue(!_subscription.isSuspended());
-
- //pubilsh more msgs so we are just below the limit
- publishMessages(lowMark - 1);
-
- //we should not be suspended
- assertTrue(!_subscription.isSuspended());
-
- //acknowledge all messages
- _channel.acknowledgeMessage(0, true);
- try
- {
- Thread.sleep(3000);
- }
- catch (InterruptedException e)
- {
- _log.error("Error: " + e, e);
- }
- //map will be empty
- assertTrue(map.size() == 0);
- }
-
-*/
-/*
- public void testPrefetch() throws AMQException
- {
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
- _channel.setMessageCredit(5);
-
- assertTrue(_channel.getPrefetchCount() == 5);
-
- final int msgCount = 5;
- publishMessages(msgCount);
-
- // at this point we should have sent out only 5 messages with a further 5 queued
- // up in the channel which should now be suspended
- assertTrue(_subscription.isSuspended());
- UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == 5);
- _channel.acknowledgeMessage(5, true);
- assertTrue(!_subscription.isSuspended());
- try
- {
- Thread.sleep(3000);
- }
- catch (InterruptedException e)
- {
- _log.error("Error: " + e, e);
- }
- assertTrue(map.size() == 0);
- }
-
-*/
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(AckTest.class);
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
deleted file mode 100644
index a9eb0ebfe7..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-import java.util.List;
-
-public class AcknowledgeTest extends QpidTestCase
-{
- private AMQChannel _channel;
- private SimpleAMQQueue _queue;
- private MessageStore _messageStore;
- private String _queueName;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
- _channel = BrokerTestHelper_0_8.createChannel();
- VirtualHost virtualHost = _channel.getVirtualHost();
- _queueName = getTestName();
- _queue = BrokerTestHelper.createQueue(_queueName, virtualHost);
- _messageStore = virtualHost.getMessageStore();
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- if (_channel != null)
- {
- _channel.getVirtualHost().close();
- }
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- private AMQChannel getChannel()
- {
- return _channel;
- }
-
- private InternalTestProtocolSession getSession()
- {
- return (InternalTestProtocolSession)_channel.getProtocolSession();
- }
-
- private SimpleAMQQueue getQueue()
- {
- return _queue;
- }
-
- public void testTransactionalSingleAck() throws AMQException
- {
- getChannel().setLocalTransactional();
- runMessageAck(1, 1, 1, false, 0);
- }
-
- public void testTransactionalMultiAck() throws AMQException
- {
- getChannel().setLocalTransactional();
- runMessageAck(10, 1, 5, true, 5);
- }
-
- public void testTransactionalAckAll() throws AMQException
- {
- getChannel().setLocalTransactional();
- runMessageAck(10, 1, 0, true, 0);
- }
-
- public void testNonTransactionalSingleAck() throws AMQException
- {
- runMessageAck(1, 1, 1, false, 0);
- }
-
- public void testNonTransactionalMultiAck() throws AMQException
- {
- runMessageAck(10, 1, 5, true, 5);
- }
-
- public void testNonTransactionalAckAll() throws AMQException
- {
- runMessageAck(10, 1, 0, true, 0);
- }
-
- protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, long acknowledgeDeliveryTag, boolean acknowldegeMultiple, int remainingUnackedMessages) throws AMQException
- {
- //Check store is empty
- checkStoreContents(0);
-
- //Send required messsages to the queue
- BrokerTestHelper_0_8.publishMessages(getChannel(),
- sendMessageCount,
- _queueName,
- ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString());
-
- if (getChannel().isTransactional())
- {
- getChannel().commit();
- }
-
- //Ensure they are stored
- checkStoreContents(sendMessageCount);
-
- //Check that there are no unacked messages
- assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
-
- //Subscribe to the queue
- AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true);
-
- getQueue().deliverAsync();
-
- //Wait for the messages to be delivered
- getSession().awaitDelivery(sendMessageCount);
-
- //Check that they are all waiting to be acknoledged
- assertEquals("Channel should have unacked msgs", sendMessageCount, getChannel().getUnacknowledgedMessageMap().size());
-
- List<InternalTestProtocolSession.DeliveryPair> messages = getSession().getDelivers(getChannel().getChannelId(), subscriber, sendMessageCount);
-
- //Double check we received the right number of messages
- assertEquals(sendMessageCount, messages.size());
-
- //Check that the first message has the expected deliveryTag
- assertEquals("First message does not have expected deliveryTag", firstDeliveryTag, messages.get(0).getDeliveryTag());
-
- //Send required Acknowledgement
- getChannel().acknowledgeMessage(acknowledgeDeliveryTag, acknowldegeMultiple);
-
- if (getChannel().isTransactional())
- {
- getChannel().commit();
- }
-
- // Check Remaining Acknowledgements
- assertEquals("Channel unacked msgs count incorrect", remainingUnackedMessages, getChannel().getUnacknowledgedMessageMap().size());
-
- //Check store contents are also correct.
- checkStoreContents(remainingUnackedMessages);
- }
-
- private void checkStoreContents(int messageCount)
- {
- assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount());
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
deleted file mode 100644
index 0919607bd7..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class BrokerTestHelper_0_8 extends BrokerTestHelper
-{
-
- public static AMQChannel createChannel(int channelId, AMQProtocolSession session) throws AMQException
- {
- AMQChannel channel = new AMQChannel(session, channelId, session.getVirtualHost().getMessageStore());
- session.addChannel(channel);
- return channel;
- }
-
- public static AMQChannel createChannel(int channelId) throws Exception
- {
- InternalTestProtocolSession session = createProtocolSession();
- return createChannel(channelId, session);
- }
-
- public static AMQChannel createChannel() throws Exception
- {
- return createChannel(1);
- }
-
- public static InternalTestProtocolSession createProtocolSession() throws Exception
- {
- return createProtocolSession("test");
- }
-
- public static InternalTestProtocolSession createProtocolSession(String hostName) throws Exception
- {
- VirtualHost virtualHost = createVirtualHost(hostName);
- return new InternalTestProtocolSession(virtualHost, createBrokerMock());
- }
-
- public static void publishMessages(AMQChannel channel, int numberOfMessages, String queueName, String exchangeName) throws AMQException
- {
- AMQShortString rouningKey = new AMQShortString(queueName);
- AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName);
- MessagePublishInfo info = mock(MessagePublishInfo.class);
- when(info.getExchange()).thenReturn(exchangeNameAsShortString);
- when(info.getRoutingKey()).thenReturn(rouningKey);
-
- Exchange exchange = channel.getVirtualHost().getExchange(exchangeName);
- for (int count = 0; count < numberOfMessages; count++)
- {
- channel.setPublishFrame(info, exchange);
-
- // Set the body size
- ContentHeaderBody _headerBody = new ContentHeaderBody();
- _headerBody.setBodySize(0);
-
- // Set Minimum properties
- BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
-
- properties.setExpiration(0L);
- properties.setTimestamp(System.currentTimeMillis());
-
- // Make Message Persistent
- properties.setDeliveryMode((byte) 2);
-
- _headerBody.setProperties(properties);
-
- channel.publishContentHeader(_headerBody);
- }
- channel.sync();
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
deleted file mode 100644
index 7f36b4a081..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import junit.framework.TestCase;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.queue.SimpleQueueEntryList;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
-
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-
-/**
- * QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
- *
- * In AMQChannel _unackedMap.clear() was done after the visit. This meant that the clear was not in the same
- * synchronized block as as the preparation to resend.
- *
- * This clearing/prep for resend was done as a result of the rollback call. HOWEVER, the delivery thread was still
- * in the process of sending messages to the client. It is therefore possible that a message could block on the
- * _unackedMap lock waiting for the visit to compelete so that it can add the new message to the unackedMap....
- * which is then cleared by the resend/rollback thread.
- *
- * This problem was encountered by the testSend2ThenRollback test.
- *
- * To try and increase the chance of the race condition occuring this test will send multiple messages so that the
- * delivery thread will be in progress while the rollback method is called. Hopefully this will cause the
- * deliveryTag to be lost
- */
-public class ExtractResendAndRequeueTest extends TestCase
-{
-
- private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
- private static final int INITIAL_MSG_COUNT = 10;
- private AMQQueue _queue = new MockAMQQueue(getName());
- private MessageStore _messageStore = new TestMemoryMessageStore();
- private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
-
- @Override
- public void setUp() throws AMQException
- {
- _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100);
-
- long id = 0;
- SimpleQueueEntryList list = new SimpleQueueEntryList(_queue);
-
- // Add initial messages to QueueEntryList
- for (int count = 0; count < INITIAL_MSG_COUNT; count++)
- {
- AMQMessage msg = new MockAMQMessage(id);
-
- list.add(msg);
-
- //Increment ID;
- id++;
- }
-
- // Iterate through the QueueEntryList and add entries to unacknowledgeMessageMap and referecenList
- QueueEntryIterator queueEntries = list.iterator();
- while(queueEntries.advance())
- {
- QueueEntry entry = queueEntries.getNode();
- _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
-
- // Store the entry for future inspection
- _referenceList.add(entry);
- }
-
- assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size());
- }
-
- /**
- * Helper method to create a new subscription and aquire the given messages.
- *
- * @param messageList The messages to aquire
- *
- * @return Subscription that performed the aquire
- */
- private Subscription createSubscriptionAndAquireMessages(LinkedList<QueueEntry> messageList)
- {
- Subscription subscription = new MockSubscription();
-
- // Aquire messages in subscription
- for (QueueEntry entry : messageList)
- {
- entry.acquire(subscription);
- }
-
- return subscription;
- }
-
- /**
- * This is the normal consumer rollback method.
- *
- * An active consumer that has aquired messages expects those messasges to be reset when rollback is requested.
- *
- * This test validates that the msgToResend map includes all the messages and none are left behind.
- *
- * @throws AMQException the visit interface throws this
- */
- public void testResend() throws AMQException
- {
- //We don't need the subscription object here.
- createSubscriptionAndAquireMessages(_referenceList);
-
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- // requeueIfUnabletoResend doesn't matter here.
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
-
- assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
- assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
- }
-
- /**
- * This is the normal consumer close method.
- *
- * When a consumer that has aquired messages expects closes the messages that it has aquired should be removed from
- * the unacknowledgeMap and placed in msgToRequeue
- *
- * This test validates that the msgToRequeue map includes all the messages and none are left behind.
- *
- * @throws AMQException the visit interface throws this
- */
- public void testRequeueDueToSubscriptionClosure() throws AMQException
- {
- Subscription subscription = createSubscriptionAndAquireMessages(_referenceList);
-
- // Close subscription
- subscription.close();
-
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- // requeueIfUnabletoResend doesn't matter here.
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
- }
-
- /**
- * If the subscription is null, due to message being retrieved via a GET, And we request that messages are requeued
- * requeueIfUnabletoResend(set to true) then all messages should be sent to the msgToRequeue map.
- *
- * @throws AMQException the visit interface throws this
- */
-
- public void testRequeueDueToMessageHavingNoConsumerTag() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- // requeueIfUnabletoResend = true so all messages should go to msgToRequeue
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
- }
-
- /**
- * If the subscription is null, due to message being retrieved via a GET, And we request that we don't
- * requeueIfUnabletoResend(set to false) then all messages should be dropped as we do not have a dead letter queue.
- *
- * @throws AMQException the visit interface throws this
- */
-
- public void testDrop() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- // requeueIfUnabletoResend = false so all messages should be dropped all maps should be empty
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
-
- for (QueueEntry entry : _referenceList)
- {
- assertTrue("Message was not discarded", entry.isDeleted());
- }
-
- }
-
- /**
- * If the subscription is null, due to message being retrieved via a GET, AND the queue upon which the message was
- * delivered has been deleted then it is not possible to requeue. Currently we simply discar the message but in the
- * future we may wish to dead letter the message.
- *
- * Validate that at the end of the visit all Maps are empty and all messages are marked as deleted
- *
- * @throws AMQException the visit interface throws this
- */
- public void testDiscard() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- _queue.delete();
-
- // requeueIfUnabletoResend : value doesn't matter here as queue has been deleted
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
- for (QueueEntry entry : _referenceList)
- {
- assertTrue("Message was not discarded", entry.isDeleted());
- }
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
deleted file mode 100644
index e8fda2bc65..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
-import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionImpl;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.TestNetworkConnection;
-
-import javax.security.auth.Subject;
-
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
-{
- // ChannelID(LIST) -> LinkedList<Pair>
- private final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
- private AtomicInteger _deliveryCount = new AtomicInteger(0);
- private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
-
- public InternalTestProtocolSession(VirtualHost virtualHost, Broker broker) throws AMQException
- {
- super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null);
-
- _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
-
- setTestAuthorizedSubject();
- setVirtualHost(virtualHost);
- }
-
- private void setTestAuthorizedSubject()
- {
- Principal principal = new AuthenticatedPrincipal(new UsernamePrincipal("InternalTestProtocolSession"));
- Subject authorizedSubject = new Subject(
- true,
- Collections.singleton(principal),
- Collections.emptySet(),
- Collections.emptySet());
-
- setAuthorizedSubject(authorizedSubject);
- }
-
- public ProtocolOutputConverter getProtocolOutputConverter()
- {
- return this;
- }
-
- public byte getProtocolMajorVersion()
- {
- return (byte) 8;
- }
-
- public void writeReturn(MessagePublishInfo messagePublishInfo,
- ContentHeaderBody header,
- MessageContentSource msgContent,
- int channelId,
- int replyCode,
- AMQShortString replyText) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public byte getProtocolMinorVersion()
- {
- return (byte) 0;
- }
-
- // ***
-
- public List<DeliveryPair> getDelivers(int channelId, AMQShortString consumerTag, int count)
- {
- synchronized (_channelDelivers)
- {
- List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
-
- if (all == null)
- {
- return new ArrayList<DeliveryPair>(0);
- }
-
- List<DeliveryPair> msgs = all.subList(0, count);
-
- List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
-
- //Remove the msgs from the receivedList.
- msgs.clear();
-
- return response;
- }
- }
-
- // *** ProtocolOutputConverter Implementation
- public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
- {
- }
-
- public ClientDeliveryMethod createDeliveryMethod(int channelId)
- {
- return new InternalWriteDeliverMethod(channelId);
- }
-
- public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
- {
- }
-
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
- {
- _deliveryCount.incrementAndGet();
-
- synchronized (_channelDelivers)
- {
- Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
-
- if (consumers == null)
- {
- consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
- _channelDelivers.put(channelId, consumers);
- }
-
- LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag);
-
- if (consumerDelivers == null)
- {
- consumerDelivers = new LinkedList<DeliveryPair>();
- consumers.put(consumerTag, consumerDelivers);
- }
-
- consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
- }
- }
-
- public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- }
-
- public void awaitDelivery(int msgs)
- {
- while (msgs > _deliveryCount.get())
- {
- try
- {
- Thread.sleep(100);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- }
-
- public class DeliveryPair
- {
- private long _deliveryTag;
- private AMQMessage _message;
-
- public DeliveryPair(long deliveryTag, AMQMessage message)
- {
- _deliveryTag = deliveryTag;
- _message = message;
- }
-
- public AMQMessage getMessage()
- {
- return _message;
- }
-
- public long getDeliveryTag()
- {
- return _deliveryTag;
- }
- }
-
- public void closeProtocolSession()
- {
- // Override as we don't have a real IOSession to close.
- // The alternative is to fully implement the TestIOSession to return a CloseFuture from close();
- // Then the AMQMinaProtocolSession can join on the returning future without a NPE.
- }
-
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
- {
- super.closeSession(session, cause, message);
-
- //Simulate the Client responding with a CloseOK
- // should really update the StateManger but we don't have access here
- // changeState(AMQState.CONNECTION_CLOSED);
- ((AMQChannel)session).getProtocolSession().closeSession();
-
- }
-
- private class InternalWriteDeliverMethod implements ClientDeliveryMethod
- {
- private int _channelId;
-
- public InternalWriteDeliverMethod(int channelId)
- {
- _channelId = channelId;
- }
-
-
- public void deliverToClient(Subscription sub, QueueEntry entry, long deliveryTag) throws AMQException
- {
- _deliveryCount.incrementAndGet();
-
- synchronized (_channelDelivers)
- {
- Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
-
- if (consumers == null)
- {
- consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
- _channelDelivers.put(_channelId, consumers);
- }
-
- LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag());
-
- if (consumerDelivers == null)
- {
- consumerDelivers = new LinkedList<DeliveryPair>();
- consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers);
- }
-
- consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
- }
- }
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
deleted file mode 100644
index a77475c05f..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-/** Test class to test MBean operations for AMQMinaProtocolSession. */
-public class MaxChannelsTest extends QpidTestCase
-{
- private AMQProtocolEngine _session;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
- _session = BrokerTestHelper_0_8.createProtocolSession();
- }
-
- public void testChannels() throws Exception
- {
- // check the channel count is correct
- int channelCount = _session.getChannels().size();
- assertEquals("Initial channel count wrong", 0, channelCount);
-
- long maxChannels = 10L;
- _session.setMaximumNumberOfChannels(maxChannels);
- assertEquals("Number of channels not correctly set.", new Long(maxChannels), _session.getMaximumNumberOfChannels());
-
- for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
- {
- _session.addChannel(new AMQChannel(_session, (int) currentChannel, null));
- }
-
- try
- {
- _session.addChannel(new AMQChannel(_session, (int) maxChannels, null));
- fail("Cannot create more channels then maximum");
- }
- catch (AMQException e)
- {
- assertEquals("Wrong exception recevied.", e.getErrorCode(), AMQConstant.NOT_ALLOWED);
- }
- assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size()));
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- _session.getVirtualHost().close();
- _session.closeSession();
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockAMQMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockAMQMessage.java
deleted file mode 100644
index 1cc3607298..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockAMQMessage.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-public class MockAMQMessage extends AMQMessage
-{
- public MockAMQMessage(long messageId)
- {
- super(new MockStoredMessage(messageId));
- }
-
- public MockAMQMessage(long messageId, String headerName, Object headerValue)
- {
- super(new MockStoredMessage(messageId, headerName, headerValue));
- }
-
- @Override
- public long getSize()
- {
- return 0l;
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockMessagePublishInfo.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockMessagePublishInfo.java
deleted file mode 100644
index ab29e58a6c..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockMessagePublishInfo.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-
-public class MockMessagePublishInfo implements MessagePublishInfo
-{
- public AMQShortString getExchange()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isMandatory()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public AMQShortString getRoutingKey()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
deleted file mode 100755
index 15573a871f..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-
-import java.nio.ByteBuffer;
-
-public class MockStoredMessage implements StoredMessage<MessageMetaData>
-{
- private long _messageId;
- private MessageMetaData _metaData;
- private final ByteBuffer _content;
-
- public MockStoredMessage(long messageId)
- {
- this(messageId, (String)null, null);
- }
-
- public MockStoredMessage(long messageId, String headerName, Object headerValue)
- {
- this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60), headerName, headerValue);
- }
-
- public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb)
- {
- this(messageId, info, chb, null, null);
- }
-
- public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb, String headerName, Object headerValue)
- {
- _messageId = messageId;
- if (headerName != null)
- {
- FieldTable headers = new FieldTable();
- headers.setString(headerName, headerValue == null? null :String.valueOf(headerValue));
- ((BasicContentHeaderProperties)chb.getProperties()).setHeaders(headers);
- }
- _metaData = new MessageMetaData(info, chb, 0);
- _content = ByteBuffer.allocate(_metaData.getContentSize());
- }
-
- public MessageMetaData getMetaData()
- {
- return _metaData;
- }
-
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- src = src.duplicate();
- ByteBuffer dst = _content.duplicate();
- dst.position(offsetInMessage);
- dst.put(src);
- }
-
- public int getContent(int offset, ByteBuffer dst)
- {
- ByteBuffer src = _content.duplicate();
- src.position(offset);
- src = src.slice();
- if(dst.remaining() < src.limit())
- {
- src.limit(dst.remaining());
- }
- dst.put(src);
- return src.limit();
- }
-
-
-
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- getContent(offsetInMessage, buf);
- buf.position(0);
- return buf;
- }
-
- public StoreFuture flushToStore()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- public void remove()
- {
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
deleted file mode 100644
index 21142e7ab6..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-import java.util.List;
-
-public class QueueBrowserUsesNoAckTest extends QpidTestCase
-{
- private AMQChannel _channel;
- private SimpleAMQQueue _queue;
- private MessageStore _messageStore;
- private String _queueName;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
- _channel = BrokerTestHelper_0_8.createChannel();
- VirtualHost virtualHost = _channel.getVirtualHost();
- _queueName = getTestName();
- _queue = BrokerTestHelper.createQueue(_queueName, virtualHost);
- _messageStore = virtualHost.getMessageStore();
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- if (_channel != null)
- {
- _channel.getVirtualHost().close();
- }
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- private AMQChannel getChannel()
- {
- return _channel;
- }
-
- private InternalTestProtocolSession getSession()
- {
- return (InternalTestProtocolSession)_channel.getProtocolSession();
- }
-
- private SimpleAMQQueue getQueue()
- {
- return _queue;
- }
-
- public void testQueueBrowserUsesNoAck() throws AMQException
- {
- int sendMessageCount = 2;
- int prefetch = 1;
-
- //Check store is empty
- checkStoreContents(0);
-
- //Send required messsages to the queue
- BrokerTestHelper_0_8.publishMessages(getChannel(),
- sendMessageCount,
- _queueName,
- ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString());
-
- //Ensure they are stored
- checkStoreContents(sendMessageCount);
-
- //Check that there are no unacked messages
- assertEquals("Channel should have no unacked msgs ", 0,
- getChannel().getUnacknowledgedMessageMap().size());
-
- //Set the prefetch on the session to be less than the sent messages
- getChannel().setCredit(0, prefetch);
-
- //browse the queue
- AMQShortString browser = browse(getChannel(), getQueue());
-
- getQueue().deliverAsync();
-
- //Wait for messages to fill the prefetch
- getSession().awaitDelivery(prefetch);
-
- //Get those messages
- List<InternalTestProtocolSession.DeliveryPair> messages =
- getSession().getDelivers(getChannel().getChannelId(), browser,
- prefetch);
-
- //Ensure we recevied the prefetched messages
- assertEquals(prefetch, messages.size());
-
- //Check the process didn't suspend the subscription as this would
- // indicate we are using the prefetch credit. i.e. using acks not No-Ack
- assertTrue("The subscription has been suspended",
- !getChannel().getSubscription(browser).getState()
- .equals(Subscription.State.SUSPENDED));
- }
-
- private void checkStoreContents(int messageCount)
- {
- assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount());
- }
-
- private AMQShortString browse(AMQChannel channel, AMQQueue queue) throws AMQException
- {
- FieldTable filters = new FieldTable();
- filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
-
- return channel.subscribeToQueue(null, queue, true, filters, false, true);
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
deleted file mode 100644
index 87fbcfa9b3..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-/**
- * Tests that reference counting works correctly with AMQMessage and the message store
- */
-public class ReferenceCountingTest extends QpidTestCase
-{
- private TestableMemoryMessageStore _store;
-
-
- protected void setUp() throws Exception
- {
- _store = new TestableMemoryMessageStore();
- }
-
- /**
- * Check that when the reference count is decremented the message removes itself from the store
- */
- public void testMessageGetsRemoved() throws AMQException
- {
- ContentHeaderBody chb = createPersistentContentHeader();
-
- MessagePublishInfo info = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return null;
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public boolean isMandatory()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return null;
- }
- };
-
-
-
- MessageMetaData mmd = new MessageMetaData(info, chb, 0);
- StoredMessage storedMessage = _store.addMessage(mmd);
-
-
- AMQMessage message = new AMQMessage(storedMessage);
-
- MessageReference ref = message.newReference();
-
- assertEquals(1, _store.getMessageCount());
-
- ref.release();
-
- assertEquals(0, _store.getMessageCount());
- }
-
- private ContentHeaderBody createPersistentContentHeader()
- {
- ContentHeaderBody chb = new ContentHeaderBody();
- BasicContentHeaderProperties bchp = new BasicContentHeaderProperties();
- bchp.setDeliveryMode((byte)2);
- chb.setProperties(bchp);
- return chb;
- }
-
- public void testMessageRemains() throws AMQException
- {
-
- MessagePublishInfo info = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return null;
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public boolean isMandatory()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return null;
- }
- };
-
- final ContentHeaderBody chb = createPersistentContentHeader();
-
- MessageMetaData mmd = new MessageMetaData(info, chb, 0);
- StoredMessage storedMessage = _store.addMessage(mmd);
-
- AMQMessage message = new AMQMessage(storedMessage);
-
-
- MessageReference ref = message.newReference();
- // we call routing complete to set up the handle
- // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
-
- assertEquals(1, _store.getMessageCount());
- MessageReference ref2 = message.newReference();
- ref.release();
- assertEquals(1, _store.getMessageCount());
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(ReferenceCountingTest.class);
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java
deleted file mode 100644
index e98dd63450..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.logging.UnitTestMessageLogger;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class SubscriptionFactoryImplTest extends QpidTestCase
-{
- private AMQChannel _channel;
- private AMQProtocolSession _session;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
- _channel = BrokerTestHelper_0_8.createChannel();
- _session = _channel.getProtocolSession();
- GenericActor.setDefaultMessageLogger(new UnitTestMessageLogger(false));
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- if (_channel != null)
- {
- _channel.getVirtualHost().close();
- }
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- /**
- * Tests that while creating Subscriptions of various types, the
- * ID numbers assigned are allocated from a common sequence
- * (in increasing order).
- */
- public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception
- {
- //create a No-Ack subscription, get the first Subscription ID
- long previousId = 0;
- Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), false, null, false, _channel.getCreditManager());
- previousId = noAckSub.getSubscriptionID();
-
- //create an ack subscription, verify the next Subscription ID is used
- Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager());
- assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID());
- previousId = ackSub.getSubscriptionID();
-
- //create a browser subscription
- FieldTable filters = new FieldTable();
- filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- Subscription browerSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager());
- assertEquals("Unexpected Subscription ID allocated", previousId + 1, browerSub.getSubscriptionID());
- previousId = browerSub.getSubscriptionID();
-
- //create an BasicGet NoAck subscription
- Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(_channel, _session, new AMQShortString("1"), null, false,
- _channel.getCreditManager(),_channel.getClientDeliveryMethod(), _channel.getRecordDeliveryMethod());
- assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID());
- previousId = getNoAckSub.getSubscriptionID();
-
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index e0b2a90cb0..d348c3e67b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -22,7 +22,6 @@ import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry.EntryState;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index 806606dd0f..7add2d4d43 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.message.ServerMessage;
import java.util.Arrays;