summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-17 14:23:19 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-17 14:23:19 +0000
commit28dbfe8d101dd14a95b1d75e799107bdaa6e18d0 (patch)
tree279390c83b70fb7a41a4d42ee5cda92991140337 /qpid/java/systests/src
parent152b079dacea71ccd5efe7ef0458836d8aea8d2f (diff)
downloadqpid-python-28dbfe8d101dd14a95b1d75e799107bdaa6e18d0.tar.gz
QPID-6125 : [Java Broker] AMQP 0-8/9/9-1 protocol handler refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632583 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java24
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java55
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java12
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java52
4 files changed, 43 insertions, 100 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
index edffa7c0c0..20a6804517 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
@@ -20,14 +20,8 @@
*/
package org.apache.qpid.server.logging;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.ExchangeDeleteOkBody;
-import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
+import java.io.IOException;
+import java.util.List;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -35,8 +29,16 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import java.io.IOException;
-import java.util.List;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ExchangeDeleteBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
/**
* Exchange
@@ -191,7 +193,7 @@ public class ExchangeLoggingTest extends AbstractTestLogging
}
else
{
- MethodRegistry_8_0 registry = new MethodRegistry_8_0();
+ MethodRegistry registry = new MethodRegistry(ProtocolVersion.v8_0);
ExchangeDeleteBody body = registry.createExchangeDeleteBody(0, new AMQShortString(_name), false, true);
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index dfc507d88a..32de06186a 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -41,8 +41,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.connection.SessionPrincipal;
@@ -597,9 +596,9 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
headers.setString("Test", "MST");
properties.setHeaders(headers);
- MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey);
+ MessagePublishInfo messageInfo = new MessagePublishInfo(new AMQShortString(exchange.getName()), false, false, new AMQShortString(routingKey));
- ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l);
+ ContentHeaderBody headerBody = new ContentHeaderBody(properties,0l);
MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis());
@@ -824,52 +823,4 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getQueueDepthMessages());
}
-
- private class TestMessagePublishInfo implements MessagePublishInfo
- {
-
- ExchangeImpl<?> _exchange;
- boolean _immediate;
- boolean _mandatory;
- String _routingKey;
-
- TestMessagePublishInfo(ExchangeImpl<?> exchange, boolean immediate, boolean mandatory, String routingKey)
- {
- _exchange = exchange;
- _immediate = immediate;
- _mandatory = mandatory;
- _routingKey = routingKey;
- }
-
- @Override
- public AMQShortString getExchange()
- {
- return new AMQShortString(_exchange.getName());
- }
-
- @Override
- public void setExchange(AMQShortString exchange)
- {
- //no-op
- }
-
- @Override
- public boolean isImmediate()
- {
- return _immediate;
- }
-
- @Override
- public boolean isMandatory()
- {
- return _mandatory;
- }
-
- @Override
- public AMQShortString getRoutingKey()
- {
- return new AMQShortString(_routingKey);
- }
- }
-
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
index b43fe35a09..3fe45143d5 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
@@ -86,8 +86,16 @@ public class JavaServerCloseRaceConditionTest extends QpidBrokerTestCase
// Set no wait true so that we block the connection
// Also set a different exchange class string so the attempt to declare
// the exchange causes an exchange.
- ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), new AMQShortString(EXCHANGE_NAME), null,
- true, false, false, false, true, null);
+ ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(),
+ new AMQShortString(
+ EXCHANGE_NAME),
+ null,
+ true,
+ false,
+ false,
+ false,
+ true,
+ null);
AMQFrame exchangeDeclare = body.generateFrame(session.getChannelId());
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
index 322b971487..f76203887c 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
@@ -26,8 +26,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -41,22 +41,19 @@ import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
-import org.apache.qpid.codec.MarkableDataInput;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlockDecoder;
+import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.ClientDecoder;
+import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BodyFactory;
-import org.apache.qpid.framing.ByteArrayDataInput;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.amqp_0_91.ConnectionStartOkBodyImpl;
-import org.apache.qpid.framing.amqp_0_91.ConnectionTuneOkBodyImpl;
-import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
+import org.apache.qpid.framing.FrameCreatingMethodProcessor;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
@@ -114,11 +111,11 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase
{
@Override
- public void evaluate(final Socket socket, final List<AMQFrame> frames)
+ public void evaluate(final Socket socket, final List<AMQDataBlock> frames)
{
if(!socket.isClosed())
{
- AMQFrame lastFrame = frames.get(frames.size() - 1);
+ AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1);
assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
}
}
@@ -163,11 +160,11 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase
{
@Override
- public void evaluate(final Socket socket, final List<AMQFrame> frames)
+ public void evaluate(final Socket socket, final List<AMQDataBlock> frames)
{
if(!socket.isClosed())
{
- AMQFrame lastFrame = frames.get(frames.size() - 1);
+ AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1);
assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
}
}
@@ -177,7 +174,7 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase
private static interface ResultEvaluator
{
- void evaluate(Socket socket, List<AMQFrame> frames);
+ void evaluate(Socket socket, List<AMQDataBlock> frames);
}
private void doAMQP08test(int frameSize, ResultEvaluator evaluator)
@@ -220,12 +217,12 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase
response[i++] = b;
}
- ConnectionStartOkBody startOK = new ConnectionStartOkBodyImpl(new FieldTable(), AMQShortString.valueOf("PLAIN"), response, AMQShortString.valueOf("en_US"));
+ ConnectionStartOkBody startOK = new ConnectionStartOkBody(new FieldTable(), AMQShortString.valueOf("PLAIN"), response, AMQShortString.valueOf("en_US"));
DataOutputStream dos = new DataOutputStream(os);
new AMQFrame(0, startOK).writePayload(dos);
dos.flush();
- ConnectionTuneOkBody tuneOk = new ConnectionTuneOkBodyImpl(256, frameSize, 0);
+ ConnectionTuneOkBody tuneOk = new ConnectionTuneOkBody(256, frameSize, 0);
new AMQFrame(0, tuneOk).writePayload(dos);
dos.flush();
socket.setSoTimeout(5000);
@@ -238,26 +235,11 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase
}
byte[] serverData = baos.toByteArray();
- ByteArrayDataInput badi = new ByteArrayDataInput(serverData);
- AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder();
- final MethodRegistry_0_91 methodRegistry_0_91 = new MethodRegistry_0_91();
- BodyFactory methodBodyFactory = new BodyFactory()
- {
- @Override
- public AMQBody createBody(final MarkableDataInput in, final long bodySize)
- throws AMQFrameDecodingException, IOException
- {
- return methodRegistry_0_91.convertToBody(in, bodySize);
- }
- };
-
- List<AMQFrame> frames = new ArrayList<>();
- while (datablockDecoder.decodable(badi))
- {
- frames.add(datablockDecoder.createAndPopulateFrame(methodBodyFactory, badi));
- }
+ final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
+ AMQDecoder decoder = new ClientDecoder(methodProcessor);
+ decoder.decodeBuffer(ByteBuffer.wrap(serverData));
- evaluator.evaluate(socket, frames);
+ evaluator.evaluate(socket, methodProcessor.getProcessedMethods());
}
private static class TestClientDelegate extends ClientDelegate