summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-17 14:23:19 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-17 14:23:19 +0000
commit28dbfe8d101dd14a95b1d75e799107bdaa6e18d0 (patch)
tree279390c83b70fb7a41a4d42ee5cda92991140337 /qpid/java/client
parent152b079dacea71ccd5efe7ef0458836d8aea8d2f (diff)
downloadqpid-python-28dbfe8d101dd14a95b1d75e799107bdaa6e18d0.tar.gz
QPID-6125 : [Java Broker] AMQP 0-8/9/9-1 protocol handler refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632583 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java44
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java162
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java108
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java115
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java61
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java16
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java4
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java11
10 files changed, 121 insertions, 422 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 77225a948d..d86a2739f2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -62,8 +62,6 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -316,21 +314,12 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
{
BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
- getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverOkBody.class);
- }
- else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
- {
- BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
- getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
- }
- else if(getProtocolVersion().equals(ProtocolVersion.v0_91))
- {
- BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false);
getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
}
else
{
- throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion());
+ BasicRecoverSyncBody body = getMethodRegistry().createBasicRecoverSyncBody(false);
+ getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
}
}
}
@@ -1148,33 +1137,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
if (isBound(null, AMQShortString.valueOf(queue), null))
{
- MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
- AMQMethodBody body;
- if (methodRegistry instanceof MethodRegistry_0_9)
+
+ if(ProtocolVersion.v8_0.equals(getProtocolVersion()))
{
- String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+ throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8");
+ }
- MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry;
- body = methodRegistry_0_9.createQueueUnbindBody(getTicket(),
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+
+ String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+
+ AMQMethodBody body = methodRegistry.createQueueUnbindBody(getTicket(),
AMQShortString.valueOf(queue),
AMQShortString.valueOf(exchange),
AMQShortString.valueOf(bindingKey),
null);
- }
- else if (methodRegistry instanceof MethodRegistry_0_91)
- {
- MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry;
- body = methodRegistry_0_91.createQueueUnbindBody(getTicket(),
- AMQShortString.valueOf(queue),
- AMQShortString.valueOf(exchange),
- AMQShortString.valueOf(binding.getBindingKey()),
- null);
- }
- else
- {
- throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8");
- }
getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class);
return null;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 4dcd5a2e44..69d02566bf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -216,7 +216,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
AMQFrame contentHeaderFrame =
ContentHeaderBody.createAMQFrame(getChannelId(),
- classIfForBasic, 0, contentHeaderProperties, size);
+ contentHeaderProperties, size);
if(contentHeaderFrame.getSize() > getSession().getAMQConnection().getMaximumFrameSize())
{
throw new JMSException("Unable to send message as the headers are too large ("
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index d76fdf25e6..e6eb2d814f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -125,6 +125,28 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
return false;
}
+ @Override
+ public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody body, final int channelId)
+ throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ @Override
+ public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody basicRecoverSyncOkBody,
+ final int channelId)
+ throws AMQException
+ {
+ return false;
+ }
+
+ @Override
+ public boolean dispatchChannelAlert(final ChannelAlertBody channelAlertBody, final int channelId)
+ throws AMQException
+ {
+ return false;
+ }
+
public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
{
_basicCancelOkMethodHandler.methodReceived(_session, body, channelId);
@@ -244,16 +266,6 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
return false;
}
- public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
{
throw new AMQMethodNotImplementedException(body);
@@ -324,16 +336,6 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
throw new AMQMethodNotImplementedException(body);
}
- public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
{
throw new AMQMethodNotImplementedException(body);
@@ -349,36 +351,6 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
throw new AMQMethodNotImplementedException(body);
}
- public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
{
throw new AMQMethodNotImplementedException(body);
@@ -399,30 +371,6 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
throw new AMQMethodNotImplementedException(body);
}
- public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
{
@@ -439,14 +387,17 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
throw new AMQMethodNotImplementedException(body);
}
- public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
+ @Override
+ public boolean dispatchQueueUnbind(final QueueUnbindBody queueUnbindBody, final int channelId) throws AMQException
{
- throw new AMQMethodNotImplementedException(body);
+ return false;
}
- public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
+ @Override
+ public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody basicRecoverSyncBody, final int channelId)
+ throws AMQException
{
- throw new AMQMethodNotImplementedException(body);
+ return false;
}
public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
@@ -465,46 +416,6 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
return false;
}
- public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
{
return false;
@@ -515,21 +426,6 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
return false;
}
- public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
{
return false;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
index f4fc3a4715..4232f59292 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
@@ -23,10 +23,14 @@ package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
-
-public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9
+import org.apache.qpid.framing.BasicRecoverSyncBody;
+import org.apache.qpid.framing.BasicRecoverSyncOkBody;
+import org.apache.qpid.framing.ChannelAlertBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+
+public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher
{
public ClientMethodDispatcherImpl_0_9(AMQProtocolSession session)
{
@@ -38,106 +42,18 @@ public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl i
return false;
}
- public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+ @Override
+ public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId)
+ throws AMQException
{
throw new AMQMethodNotImplementedException(body);
}
- public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+ public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
{
throw new AMQMethodNotImplementedException(body);
}
- public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
- {
- return false;
- }
-
public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
{
throw new AMQMethodNotImplementedException(body);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java
index 5f33561a8f..573ab52cc3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java
@@ -23,10 +23,14 @@ package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91;
-
-public class ClientMethodDispatcherImpl_0_91 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_91
+import org.apache.qpid.framing.BasicRecoverSyncBody;
+import org.apache.qpid.framing.BasicRecoverSyncOkBody;
+import org.apache.qpid.framing.ChannelAlertBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+
+public class ClientMethodDispatcherImpl_0_91 extends ClientMethodDispatcherImpl implements MethodDispatcher
{
public ClientMethodDispatcherImpl_0_91(AMQProtocolSession session)
{
@@ -38,119 +42,26 @@ public class ClientMethodDispatcherImpl_0_91 extends ClientMethodDispatcherImpl
return false;
}
- public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+ @Override
+ public boolean dispatchChannelAlert(final ChannelAlertBody body, final int channelId)
+ throws AMQException
{
throw new AMQMethodNotImplementedException(body);
}
- public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
- {
- throw new AMQMethodNotImplementedException(body);
- }
-
- public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+ public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
{
throw new AMQMethodNotImplementedException(body);
}
- public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
- {
- return false;
- }
-
public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
{
throw new AMQMethodNotImplementedException(body);
}
- public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
{
return false;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
index 28ad6037d4..7d421622e7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
@@ -22,72 +22,49 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.client.state.AMQMethodNotImplementedException;
+import org.apache.qpid.framing.BasicRecoverSyncBody;
+import org.apache.qpid.framing.BasicRecoverSyncOkBody;
import org.apache.qpid.framing.ChannelAlertBody;
-import org.apache.qpid.framing.TestContentBody;
-import org.apache.qpid.framing.TestContentOkBody;
-import org.apache.qpid.framing.TestIntegerBody;
-import org.apache.qpid.framing.TestIntegerOkBody;
-import org.apache.qpid.framing.TestStringBody;
-import org.apache.qpid.framing.TestStringOkBody;
-import org.apache.qpid.framing.TestTableBody;
-import org.apache.qpid.framing.TestTableOkBody;
-import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
-public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0
+public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher
{
public ClientMethodDispatcherImpl_8_0(AMQProtocolSession session)
{
super(session);
}
- public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
{
return false;
}
- public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
+ @Override
+ public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody queueUnbindOkBody, final int channelId)
{
return false;
}
- public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
+ @Override
+ public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody basicRecoverSyncOkBody,
+ final int channelId)
{
return false;
}
- public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
+ @Override
+ public boolean dispatchQueueUnbind(final QueueUnbindBody body, final int channelId) throws AMQException
{
- return false;
- }
-
- public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
- {
- return false;
+ throw new AMQMethodNotImplementedException(body);
}
- public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
+ @Override
+ public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, final int channelId)
+ throws AMQException
{
- return false;
+ throw new AMQMethodNotImplementedException(body);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 5c9d8f9b91..bb98c0abbd 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -23,8 +23,8 @@ package org.apache.qpid.client.protocol;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
@@ -48,6 +48,7 @@ import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.ClientDecoder;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
@@ -193,7 +194,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_connection = con;
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
- _decoder = new AMQDecoder(false, _protocolSession);
+ _decoder = new ClientDecoder(_protocolSession.getMethodProcessor());
_failoverHandler = new FailoverHandler(this);
}
@@ -459,9 +460,10 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_readBytes += msg.remaining();
_lastReadTime = System.currentTimeMillis();
+ final List<AMQDataBlock> dataBlocks = _protocolSession.getMethodProcessor().getProcessedMethods();
try
{
- final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
+ _decoder.decodeBuffer(msg);
// Decode buffer
int size = dataBlocks.size();
@@ -511,6 +513,10 @@ public class AMQProtocolHandler implements ProtocolEngine
propagateExceptionToFrameListeners(e);
exception(e);
}
+ finally
+ {
+ dataBlocks.clear();
+ }
}
@@ -753,8 +759,12 @@ public class AMQProtocolHandler implements ProtocolEngine
// Connection is already closed then don't do a syncWrite
try
{
- final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection."), 0, 0);
+ final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(
+ AMQConstant.REPLY_SUCCESS.getCode(),
+ // replyCode
+ new AMQShortString("JMS client is closing the connection."),
+ 0,
+ 0);
final AMQFrame frame = body.generateFrame(0);
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 2c69aa1b51..2fbb13079e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -44,6 +44,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FrameCreatingMethodProcessor;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
@@ -88,8 +89,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private ProtocolVersion _protocolVersion;
- private MethodRegistry _methodRegistry =
- MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
+ private final MethodRegistry _methodRegistry =
+ new MethodRegistry(ProtocolVersion.getLatestSupportedVersion());
+
+ private final FrameCreatingMethodProcessor _methodProcessor =
+ new FrameCreatingMethodProcessor(ProtocolVersion.getLatestSupportedVersion());
private MethodDispatcher _methodDispatcher;
@@ -416,7 +420,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_logger.debug("Setting ProtocolVersion to :" + pv);
}
_protocolVersion = pv;
- _methodRegistry = MethodRegistry.getMethodRegistry(pv);
+ _methodRegistry.setProtocolVersion(pv);
+ _methodProcessor.setProtocolVersion(pv);
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
}
@@ -549,4 +554,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_protocolHandler.setMaxFrameSize(frameMax);
}
+
+ public FrameCreatingMethodProcessor getMethodProcessor()
+ {
+ return _methodProcessor;
+ }
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java
index c56cf9a72b..2a8ab22b81 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java
@@ -23,7 +23,7 @@ package org.apache.qpid.client;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.transport.TestNetworkConnection;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.amqp_0_91.QueueDeclareOkBodyImpl;
+import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.url.AMQBindingURL;
@@ -50,7 +50,7 @@ public class AMQSession_0_8Test extends QpidTestCase
{
try
{
- _connection.getProtocolHandler().methodBodyReceived(1, new QueueDeclareOkBodyImpl(testQueueName, 0, 0));
+ _connection.getProtocolHandler().methodBodyReceived(1, new QueueDeclareOkBody(testQueueName, 0, 0));
}
catch (AMQException e)
{
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
index 70fcfcedb8..61e5247ead 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client.protocol;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.TestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,12 +35,10 @@ import org.apache.qpid.client.transport.TestNetworkConnection;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl;
+import org.apache.qpid.framing.BasicRecoverSyncOkBody;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception.
*
@@ -75,7 +76,7 @@ public class AMQProtocolHandlerTest extends TestCase
//Create a new ProtocolHandler with a fake connection.
_handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'"));
_handler.setNetworkConnection(new TestNetworkConnection());
- AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1);
+ AMQBody body = new BasicRecoverSyncOkBody(ProtocolVersion.v8_0);
_blockFrame = new AMQFrame(0, body);
_handleCountDown = new CountDownLatch(1);