diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 14:23:19 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 14:23:19 +0000 |
| commit | 28dbfe8d101dd14a95b1d75e799107bdaa6e18d0 (patch) | |
| tree | 279390c83b70fb7a41a4d42ee5cda92991140337 /qpid/java/client | |
| parent | 152b079dacea71ccd5efe7ef0458836d8aea8d2f (diff) | |
| download | qpid-python-28dbfe8d101dd14a95b1d75e799107bdaa6e18d0.tar.gz | |
QPID-6125 : [Java Broker] AMQP 0-8/9/9-1 protocol handler refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632583 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
10 files changed, 121 insertions, 422 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 77225a948d..d86a2739f2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -62,8 +62,6 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; -import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -316,21 +314,12 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0)) { BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); - getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverOkBody.class); - } - else if(getProtocolVersion().equals(ProtocolVersion.v0_9)) - { - BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false); - getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); - } - else if(getProtocolVersion().equals(ProtocolVersion.v0_91)) - { - BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false); getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); } else { - throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion()); + BasicRecoverSyncBody body = getMethodRegistry().createBasicRecoverSyncBody(false); + getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); } } } @@ -1148,33 +1137,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if (isBound(null, AMQShortString.valueOf(queue), null)) { - MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); - AMQMethodBody body; - if (methodRegistry instanceof MethodRegistry_0_9) + + if(ProtocolVersion.v8_0.equals(getProtocolVersion())) { - String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); + } - MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry; - body = methodRegistry_0_9.createQueueUnbindBody(getTicket(), + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + + String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + AMQMethodBody body = methodRegistry.createQueueUnbindBody(getTicket(), AMQShortString.valueOf(queue), AMQShortString.valueOf(exchange), AMQShortString.valueOf(bindingKey), null); - } - else if (methodRegistry instanceof MethodRegistry_0_91) - { - MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry; - body = methodRegistry_0_91.createQueueUnbindBody(getTicket(), - AMQShortString.valueOf(queue), - AMQShortString.valueOf(exchange), - AMQShortString.valueOf(binding.getBindingKey()), - null); - } - else - { - throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); - } getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class); return null; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 4dcd5a2e44..69d02566bf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -216,7 +216,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(getChannelId(), - classIfForBasic, 0, contentHeaderProperties, size); + contentHeaderProperties, size); if(contentHeaderFrame.getSize() > getSession().getAMQConnection().getMaximumFrameSize()) { throw new JMSException("Unable to send message as the headers are too large (" diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index d76fdf25e6..e6eb2d814f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -125,6 +125,28 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher return false; } + @Override + public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody body, final int channelId) + throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + @Override + public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody basicRecoverSyncOkBody, + final int channelId) + throws AMQException + { + return false; + } + + @Override + public boolean dispatchChannelAlert(final ChannelAlertBody channelAlertBody, final int channelId) + throws AMQException + { + return false; + } + public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException { _basicCancelOkMethodHandler.methodReceived(_session, body, channelId); @@ -244,16 +266,6 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher return false; } - public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); @@ -324,16 +336,6 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher throw new AMQMethodNotImplementedException(body); } - public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); @@ -349,36 +351,6 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher throw new AMQMethodNotImplementedException(body); } - public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); @@ -399,30 +371,6 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher throw new AMQMethodNotImplementedException(body); } - public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException { @@ -439,14 +387,17 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher throw new AMQMethodNotImplementedException(body); } - public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException + @Override + public boolean dispatchQueueUnbind(final QueueUnbindBody queueUnbindBody, final int channelId) throws AMQException { - throw new AMQMethodNotImplementedException(body); + return false; } - public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException + @Override + public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody basicRecoverSyncBody, final int channelId) + throws AMQException { - throw new AMQMethodNotImplementedException(body); + return false; } public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException @@ -465,46 +416,6 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher return false; } - public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException { return false; @@ -515,21 +426,6 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher return false; } - public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException { return false; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java index f4fc3a4715..4232f59292 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java @@ -23,10 +23,14 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQMethodNotImplementedException; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; - -public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9 +import org.apache.qpid.framing.BasicRecoverSyncBody; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; +import org.apache.qpid.framing.ChannelAlertBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; + +public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher { public ClientMethodDispatcherImpl_0_9(AMQProtocolSession session) { @@ -38,106 +42,18 @@ public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl i return false; } - public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException + @Override + public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId) + throws AMQException { throw new AMQMethodNotImplementedException(body); } - public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException + public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); } - public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java index 5f33561a8f..573ab52cc3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java @@ -23,10 +23,14 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQMethodNotImplementedException; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91; - -public class ClientMethodDispatcherImpl_0_91 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_91 +import org.apache.qpid.framing.BasicRecoverSyncBody; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; +import org.apache.qpid.framing.ChannelAlertBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; + +public class ClientMethodDispatcherImpl_0_91 extends ClientMethodDispatcherImpl implements MethodDispatcher { public ClientMethodDispatcherImpl_0_91(AMQProtocolSession session) { @@ -38,119 +42,26 @@ public class ClientMethodDispatcherImpl_0_91 extends ClientMethodDispatcherImpl return false; } - public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException + @Override + public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId) + throws AMQException { throw new AMQMethodNotImplementedException(body); } - public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException + public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); } - public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); } - public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException { return false; } -}
\ No newline at end of file +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java index 28ad6037d4..7d421622e7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java @@ -22,72 +22,49 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.client.state.AMQMethodNotImplementedException; +import org.apache.qpid.framing.BasicRecoverSyncBody; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; import org.apache.qpid.framing.ChannelAlertBody; -import org.apache.qpid.framing.TestContentBody; -import org.apache.qpid.framing.TestContentOkBody; -import org.apache.qpid.framing.TestIntegerBody; -import org.apache.qpid.framing.TestIntegerOkBody; -import org.apache.qpid.framing.TestStringBody; -import org.apache.qpid.framing.TestStringOkBody; -import org.apache.qpid.framing.TestTableBody; -import org.apache.qpid.framing.TestTableOkBody; -import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; -public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0 +public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher { public ClientMethodDispatcherImpl_8_0(AMQProtocolSession session) { super(session); } - public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException { return false; } - public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException + @Override + public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody queueUnbindOkBody, final int channelId) { return false; } - public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException + @Override + public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody basicRecoverSyncOkBody, + final int channelId) { return false; } - public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException + @Override + public boolean dispatchQueueUnbind(final QueueUnbindBody body, final int channelId) throws AMQException { - return false; - } - - public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException - { - return false; + throw new AMQMethodNotImplementedException(body); } - public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException + @Override + public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, final int channelId) + throws AMQException { - return false; + throw new AMQMethodNotImplementedException(body); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 5c9d8f9b91..bb98c0abbd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -23,8 +23,8 @@ package org.apache.qpid.client.protocol; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; @@ -48,6 +48,7 @@ import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.codec.ClientDecoder; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; @@ -193,7 +194,7 @@ public class AMQProtocolHandler implements ProtocolEngine _connection = con; _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); - _decoder = new AMQDecoder(false, _protocolSession); + _decoder = new ClientDecoder(_protocolSession.getMethodProcessor()); _failoverHandler = new FailoverHandler(this); } @@ -459,9 +460,10 @@ public class AMQProtocolHandler implements ProtocolEngine { _readBytes += msg.remaining(); _lastReadTime = System.currentTimeMillis(); + final List<AMQDataBlock> dataBlocks = _protocolSession.getMethodProcessor().getProcessedMethods(); try { - final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg); + _decoder.decodeBuffer(msg); // Decode buffer int size = dataBlocks.size(); @@ -511,6 +513,10 @@ public class AMQProtocolHandler implements ProtocolEngine propagateExceptionToFrameListeners(e); exception(e); } + finally + { + dataBlocks.clear(); + } } @@ -753,8 +759,12 @@ public class AMQProtocolHandler implements ProtocolEngine // Connection is already closed then don't do a syncWrite try { - final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection."), 0, 0); + final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody( + AMQConstant.REPLY_SUCCESS.getCode(), + // replyCode + new AMQShortString("JMS client is closing the connection."), + 0, + 0); final AMQFrame frame = body.generateFrame(0); syncWrite(frame, ConnectionCloseOkBody.class, timeout); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 2c69aa1b51..2fbb13079e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -44,6 +44,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FrameCreatingMethodProcessor; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; @@ -88,8 +89,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private ProtocolVersion _protocolVersion; - private MethodRegistry _methodRegistry = - MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion()); + private final MethodRegistry _methodRegistry = + new MethodRegistry(ProtocolVersion.getLatestSupportedVersion()); + + private final FrameCreatingMethodProcessor _methodProcessor = + new FrameCreatingMethodProcessor(ProtocolVersion.getLatestSupportedVersion()); private MethodDispatcher _methodDispatcher; @@ -416,7 +420,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _logger.debug("Setting ProtocolVersion to :" + pv); } _protocolVersion = pv; - _methodRegistry = MethodRegistry.getMethodRegistry(pv); + _methodRegistry.setProtocolVersion(pv); + _methodProcessor.setProtocolVersion(pv); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); } @@ -549,4 +554,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolHandler.setMaxFrameSize(frameMax); } + + public FrameCreatingMethodProcessor getMethodProcessor() + { + return _methodProcessor; + } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java index c56cf9a72b..2a8ab22b81 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java @@ -23,7 +23,7 @@ package org.apache.qpid.client; import org.apache.qpid.AMQException; import org.apache.qpid.client.transport.TestNetworkConnection; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.amqp_0_91.QueueDeclareOkBodyImpl; +import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.url.AMQBindingURL; @@ -50,7 +50,7 @@ public class AMQSession_0_8Test extends QpidTestCase { try { - _connection.getProtocolHandler().methodBodyReceived(1, new QueueDeclareOkBodyImpl(testQueueName, 0, 0)); + _connection.getProtocolHandler().methodBodyReceived(1, new QueueDeclareOkBody(testQueueName, 0, 0)); } catch (AMQException e) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index 70fcfcedb8..61e5247ead 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client.protocol; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import junit.framework.TestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,12 +35,10 @@ import org.apache.qpid.client.transport.TestNetworkConnection; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - /** * This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception. * @@ -75,7 +76,7 @@ public class AMQProtocolHandlerTest extends TestCase //Create a new ProtocolHandler with a fake connection. _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'")); _handler.setNetworkConnection(new TestNetworkConnection()); - AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1); + AMQBody body = new BasicRecoverSyncOkBody(ProtocolVersion.v8_0); _blockFrame = new AMQFrame(0, body); _handleCountDown = new CountDownLatch(1); |
