diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 14:23:19 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 14:23:19 +0000 |
| commit | 28dbfe8d101dd14a95b1d75e799107bdaa6e18d0 (patch) | |
| tree | 279390c83b70fb7a41a4d42ee5cda92991140337 /qpid/java/systests/src | |
| parent | 152b079dacea71ccd5efe7ef0458836d8aea8d2f (diff) | |
| download | qpid-python-28dbfe8d101dd14a95b1d75e799107bdaa6e18d0.tar.gz | |
QPID-6125 : [Java Broker] AMQP 0-8/9/9-1 protocol handler refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632583 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
4 files changed, 43 insertions, 100 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java index edffa7c0c0..20a6804517 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java @@ -20,14 +20,8 @@ */ package org.apache.qpid.server.logging; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ExchangeDeleteBody; -import org.apache.qpid.framing.ExchangeDeleteOkBody; -import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; +import java.io.IOException; +import java.util.List; import javax.jms.Connection; import javax.jms.JMSException; @@ -35,8 +29,16 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import java.io.IOException; -import java.util.List; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ExchangeDeleteBody; +import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; /** * Exchange @@ -191,7 +193,7 @@ public class ExchangeLoggingTest extends AbstractTestLogging } else { - MethodRegistry_8_0 registry = new MethodRegistry_8_0(); + MethodRegistry registry = new MethodRegistry(ProtocolVersion.v8_0); ExchangeDeleteBody body = registry.createExchangeDeleteBody(0, new AMQShortString(_name), false, true); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index dfc507d88a..32de06186a 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -41,8 +41,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; import org.apache.qpid.server.connection.SessionPrincipal; @@ -597,9 +596,9 @@ public class VirtualHostMessageStoreTest extends QpidTestCase headers.setString("Test", "MST"); properties.setHeaders(headers); - MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey); + MessagePublishInfo messageInfo = new MessagePublishInfo(new AMQShortString(exchange.getName()), false, false, new AMQShortString(routingKey)); - ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l); + ContentHeaderBody headerBody = new ContentHeaderBody(properties,0l); MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis()); @@ -824,52 +823,4 @@ public class VirtualHostMessageStoreTest extends QpidTestCase assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getQueueDepthMessages()); } - - private class TestMessagePublishInfo implements MessagePublishInfo - { - - ExchangeImpl<?> _exchange; - boolean _immediate; - boolean _mandatory; - String _routingKey; - - TestMessagePublishInfo(ExchangeImpl<?> exchange, boolean immediate, boolean mandatory, String routingKey) - { - _exchange = exchange; - _immediate = immediate; - _mandatory = mandatory; - _routingKey = routingKey; - } - - @Override - public AMQShortString getExchange() - { - return new AMQShortString(_exchange.getName()); - } - - @Override - public void setExchange(AMQShortString exchange) - { - //no-op - } - - @Override - public boolean isImmediate() - { - return _immediate; - } - - @Override - public boolean isMandatory() - { - return _mandatory; - } - - @Override - public AMQShortString getRoutingKey() - { - return new AMQShortString(_routingKey); - } - } - } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java index b43fe35a09..3fe45143d5 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java @@ -86,8 +86,16 @@ public class JavaServerCloseRaceConditionTest extends QpidBrokerTestCase // Set no wait true so that we block the connection // Also set a different exchange class string so the attempt to declare // the exchange causes an exchange. - ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), new AMQShortString(EXCHANGE_NAME), null, - true, false, false, false, true, null); + ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), + new AMQShortString( + EXCHANGE_NAME), + null, + true, + false, + false, + false, + true, + null); AMQFrame exchangeDeclare = body.generateFrame(session.getChannelId()); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java index 322b971487..f76203887c 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java @@ -26,8 +26,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -41,22 +41,19 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; -import org.apache.qpid.codec.MarkableDataInput; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.codec.ClientDecoder; +import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BodyFactory; -import org.apache.qpid.framing.ByteArrayDataInput; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.amqp_0_91.ConnectionStartOkBodyImpl; -import org.apache.qpid.framing.amqp_0_91.ConnectionTuneOkBodyImpl; -import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; +import org.apache.qpid.framing.FrameCreatingMethodProcessor; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; @@ -114,11 +111,11 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase { @Override - public void evaluate(final Socket socket, final List<AMQFrame> frames) + public void evaluate(final Socket socket, final List<AMQDataBlock> frames) { if(!socket.isClosed()) { - AMQFrame lastFrame = frames.get(frames.size() - 1); + AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1); assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody); } } @@ -163,11 +160,11 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase { @Override - public void evaluate(final Socket socket, final List<AMQFrame> frames) + public void evaluate(final Socket socket, final List<AMQDataBlock> frames) { if(!socket.isClosed()) { - AMQFrame lastFrame = frames.get(frames.size() - 1); + AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1); assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody); } } @@ -177,7 +174,7 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase private static interface ResultEvaluator { - void evaluate(Socket socket, List<AMQFrame> frames); + void evaluate(Socket socket, List<AMQDataBlock> frames); } private void doAMQP08test(int frameSize, ResultEvaluator evaluator) @@ -220,12 +217,12 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase response[i++] = b; } - ConnectionStartOkBody startOK = new ConnectionStartOkBodyImpl(new FieldTable(), AMQShortString.valueOf("PLAIN"), response, AMQShortString.valueOf("en_US")); + ConnectionStartOkBody startOK = new ConnectionStartOkBody(new FieldTable(), AMQShortString.valueOf("PLAIN"), response, AMQShortString.valueOf("en_US")); DataOutputStream dos = new DataOutputStream(os); new AMQFrame(0, startOK).writePayload(dos); dos.flush(); - ConnectionTuneOkBody tuneOk = new ConnectionTuneOkBodyImpl(256, frameSize, 0); + ConnectionTuneOkBody tuneOk = new ConnectionTuneOkBody(256, frameSize, 0); new AMQFrame(0, tuneOk).writePayload(dos); dos.flush(); socket.setSoTimeout(5000); @@ -238,26 +235,11 @@ public class MaxFrameSizeTest extends QpidBrokerTestCase } byte[] serverData = baos.toByteArray(); - ByteArrayDataInput badi = new ByteArrayDataInput(serverData); - AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder(); - final MethodRegistry_0_91 methodRegistry_0_91 = new MethodRegistry_0_91(); - BodyFactory methodBodyFactory = new BodyFactory() - { - @Override - public AMQBody createBody(final MarkableDataInput in, final long bodySize) - throws AMQFrameDecodingException, IOException - { - return methodRegistry_0_91.convertToBody(in, bodySize); - } - }; - - List<AMQFrame> frames = new ArrayList<>(); - while (datablockDecoder.decodable(badi)) - { - frames.add(datablockDecoder.createAndPopulateFrame(methodBodyFactory, badi)); - } + final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91); + AMQDecoder decoder = new ClientDecoder(methodProcessor); + decoder.decodeBuffer(ByteBuffer.wrap(serverData)); - evaluator.evaluate(socket, frames); + evaluator.evaluate(socket, methodProcessor.getProcessedMethods()); } private static class TestClientDelegate extends ClientDelegate |
