summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-11-24 21:14:14 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-11-24 21:14:14 +0000
commit66f97f32c78e0cf5914a441ae8277ee3aa659ce9 (patch)
treef825c3d20483f741c06b9584ddb4e17d6a2404a9 /java/client/src
parent86996d5680a07acd82ffda2829dcdd6d6585e606 (diff)
downloadqpid-python-66f97f32c78e0cf5914a441ae8277ee3aa659ce9.tar.gz
QPID-567 : Add mutliversion support to Qpid/Java.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@597918 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java256
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java54
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java22
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java528
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java155
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java65
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java25
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java47
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java55
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java98
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java32
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java134
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java8
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java13
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java99
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java44
33 files changed, 1304 insertions, 584 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 9abc94b3df..85a5fbf996 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -579,19 +579,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throws AMQException, FailoverException
{
+ ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
+
// TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), null), // outOfBand
- ChannelOpenOkBody.class);
+ _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
+
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
// todo send low water mark when protocol allows.
// todo Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), false, // global
- prefetchHigh, // prefetchCount
- 0), // prefetchSize
- BasicQosOkBody.class);
+ _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
if (transacted)
{
@@ -600,9 +598,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.debug("Issuing TxSelect for " + channelId);
}
+ TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody();
+
// TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class);
+ _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
index 1a2fe0d355..90d5271145 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
@@ -54,6 +54,6 @@ public class AMQHeadersExchange extends AMQDestination
//Not sure what the best approach is here, probably to treat this like a topic
//and allow server to generate names. As it is AMQ specific it doesn't need to
//fit the JMS API expectations so this is not as yet critical.
- return false;
+ return getAMQQueueName() == null;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 804c846572..15c113a05d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -40,34 +40,9 @@ import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxCommitOkBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -436,9 +411,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
- final AMQFrame ackFrame =
- BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
- multiple);
+
+ BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
+
+ final AMQFrame ackFrame = body.generateFrame(_channelId);
if (_logger.isDebugEnabled())
{
@@ -448,6 +424,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
getProtocolHandler().writeFrame(ackFrame);
}
+ public MethodRegistry getMethodRegistry()
+ {
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+ return methodRegistry;
+ }
+
/**
* Binds the named queue, with the specified routing key, to the named exchange.
*
@@ -470,16 +452,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
public Object execute() throws AMQException, FailoverException
{
- AMQFrame queueBind =
- QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- arguments, // arguments
- exchangeName, // exchange
- false, // nowait
- queueName, // queue
- routingKey, // routingKey
- getTicket()); // ticket
+ QueueBindBody body = getMethodRegistry().createQueueBindBody(getTicket(),queueName,exchangeName,routingKey,false,arguments);
+
+ AMQFrame queueBind = body.generateFrame(_channelId);
- getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+ getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
return null;
}
@@ -526,16 +503,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
- getProtocolHandler().closeSession(this);
+ ChannelCloseBody body = getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
+ new AMQShortString("JMS client closing channel"), 0, 0);
- final AMQFrame frame =
- ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client closing channel")); // replyText
+ final AMQFrame frame = body.generateFrame(getChannelId());
- getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+ getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully.
@@ -656,8 +630,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Commits outstanding messages sent and outstanding acknowledgements.
final AMQProtocolHandler handler = getProtocolHandler();
- handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
- TxCommitOkBody.class);
+ TxCommitBody body = getMethodRegistry().createTxCommitBody();
+
+ handler.syncWrite(body.generateFrame(_channelId), TxCommitOkBody.class);
markClean();
}
@@ -990,18 +965,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
public Object execute() throws AMQException, FailoverException
{
- AMQFrame queueDeclare =
- QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- null, // arguments
- autoDelete, // autoDelete
- durable, // durable
- exclusive, // exclusive
- false, // nowait
- false, // passive
- name, // queue
- getTicket()); // ticket
+ QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null);
+
+ AMQFrame queueDeclare = body.generateFrame(_channelId);
- getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
return null;
}
@@ -1309,7 +1277,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- _highestDeliveryTag.set(message.getDeliverBody().deliveryTag);
+ _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag());
_queue.add(message);
}
}
@@ -1371,16 +1339,29 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (isStrictAMQP())
{
// We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
- _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
+
+ BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+ _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
_logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
}
else
{
-
- _connection.getProtocolHandler().syncWrite(
- BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
- , BasicRecoverOkBody.class);
+ // in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad
+ // in 0-9 we used the cleaner addition of a new sync recover method with its own ok
+ if(getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+ _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
+ }
+ else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
+ {
+ BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
+ _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+ }
+ else
+ {
+ throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion());
+ }
}
if (!isSuspended)
@@ -1398,15 +1379,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ private ProtocolVersion getProtocolVersion()
+ {
+ return getProtocolHandler().getProtocolVersion();
+ }
+
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
if (_logger.isTraceEnabled())
{
- _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+ _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().getDeliveryTag());
}
- rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+ rejectMessage(message.getDeliverBody().getDeliveryTag(), requeue);
}
public void rejectMessage(AbstractJMSMessage message, boolean requeue)
@@ -1429,11 +1415,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" + this.hashCode());
}
- AMQFrame basicRejectBody =
- BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
- requeue);
+ BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
+ AMQFrame frame = body.generateFrame(_channelId);
- _connection.getProtocolHandler().writeFrame(basicRejectBody);
+ _connection.getProtocolHandler().writeFrame(frame);
}
}
@@ -1469,8 +1454,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_dispatcher.rollback();
}
- _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+ TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
+ AMQFrame frame = body.generateFrame(getChannelId());
+ getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
markClean();
@@ -1728,16 +1714,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
AMQMethodEvent response =
- new FailoverRetrySupport<AMQMethodEvent, AMQException>(
- new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
- {
- public AMQMethodEvent execute() throws AMQException, FailoverException
- {
- AMQFrame boundFrame =
- ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(),
- getProtocolMinorVersion(), exchangeName, // exchange
- queueName, // queue
- routingKey); // routingKey
+ new FailoverRetrySupport<AMQMethodEvent, AMQException>(
+ new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+ {
+ public AMQMethodEvent execute() throws AMQException, FailoverException
+ {
+ ExchangeBoundBody body = getMethodRegistry().createExchangeBoundBody(exchangeName, routingKey, queueName);
+ AMQFrame boundFrame = body.generateFrame(_channelId);
return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
@@ -1747,7 +1730,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Extract and return the response code from the query.
ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
- return (responseBody.replyCode == 0);
+ return (responseBody.getReplyCode() == 0);
}
catch (AMQException e)
{
@@ -2092,16 +2075,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame jmsConsume =
- BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
- tag, // consumerTag
- consumer.isExclusive(), // exclusive
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
- consumer.isNoLocal(), // noLocal
- nowait, // nowait
- queueName, // queue
- getTicket()); // ticket
+ BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
+ queueName,
+ tag,
+ consumer.isNoLocal(),
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
+ consumer.isExclusive(),
+ nowait,
+ arguments);
+
+
+ AMQFrame jmsConsume = body.generateFrame(_channelId);
if (nowait)
{
@@ -2171,17 +2155,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
public Object execute() throws AMQException, FailoverException
{
- AMQFrame exchangeDeclare =
- ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- nowait, // nowait
- false, // passive
- getTicket(), // ticket
- type); // type
+ ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null);
+ AMQFrame exchangeDeclare = body.generateFrame(_channelId);
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
@@ -2224,16 +2199,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
amqd.setQueueName(protocolHandler.generateQueueName());
}
- AMQFrame queueDeclare =
- QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- null, // arguments
- amqd.isAutoDelete(), // autoDelete
- amqd.isDurable(), // durable
- amqd.isExclusive(), // exclusive
- false, // nowait
- false, // passive
- amqd.getAMQQueueName(), // queue
- getTicket()); // ticket
+ QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
+
+ AMQFrame queueDeclare = body.generateFrame(_channelId);
protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
@@ -2260,13 +2228,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
public Object execute() throws AMQException, FailoverException
{
- AMQFrame queueDeleteFrame =
- QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- false, // ifEmpty
- false, // ifUnused
- true, // nowait
- queueName, // queue
- getTicket()); // ticket
+ QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(),
+ queueName,
+ false,
+ false,
+ true);
+ AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
@@ -2450,12 +2417,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
- if ((consumerTag == null) || message.getDeliverBody().consumerTag.equals(consumerTag))
+ if ((consumerTag == null) || message.getDeliverBody().getConsumerTag().equals(consumerTag))
{
if (_logger.isDebugEnabled())
{
_logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
- + message.getDeliverBody().deliveryTag);
+ + message.getDeliverBody().getDeliveryTag());
}
messages.remove();
@@ -2503,12 +2470,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
- _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange,
- message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies());
+ _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().getExchange(),
+ message.getBounceBody().getRoutingKey(), message.getContentHeader(), message.getBodies());
- AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
- AMQShortString reason = message.getBounceBody().replyText;
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+ AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().getReplyCode());
+ AMQShortString reason = message.getBounceBody().getReplyText();
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
// @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
if (errorCode == AMQConstant.NO_CONSUMERS)
@@ -2558,9 +2525,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_suspended = suspend;
- AMQFrame channelFlowFrame =
- ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- !suspend);
+ ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend);
+
+ AMQFrame channelFlowFrame = body.generateFrame(_channelId);
_connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
}
@@ -2611,6 +2578,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return _dirty;
}
+ public void setTicket(int ticket)
+ {
+ _ticket = ticket;
+ }
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
@@ -2741,7 +2713,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_lock.wait(2000);
}
- if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
+ if (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get())
{
rejectMessage(message, true);
}
@@ -2798,7 +2770,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (message.getDeliverBody() != null)
{
final BasicMessageConsumer consumer =
- (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
+ (BasicMessageConsumer) _consumers.get(message.getDeliverBody().getConsumerTag());
if ((consumer == null) || consumer.isClosed())
{
@@ -2807,14 +2779,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (consumer == null)
{
_dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliverBody().deliveryTag + "] from queue "
- + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)...");
+ + message.getDeliverBody().getDeliveryTag() + "] from queue "
+ + message.getDeliverBody().getConsumerTag() + " )without a handler - rejecting(requeue)...");
}
else
{
- _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ") ["
- + message.getDeliverBody().deliveryTag + "] from queue consumer("
- + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
+ _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+ + message.getDeliverBody().getDeliveryTag() + "] from queue " + " consumer("
+ + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
}
}
// Don't reject if we're already closing
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 319e728edf..e9f68fdc38 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -96,8 +96,7 @@ public class AMQTopic extends AMQDestination implements Topic
public boolean isNameRequired()
{
- // Topics always rely on a server generated queue name.
- return false;
+ return !isDurable();
}
/**
@@ -112,4 +111,17 @@ public class AMQTopic extends AMQDestination implements Topic
public void setQueueName(String queueName)
{
}
+
+ public boolean equals(Object o)
+ {
+ return (o instanceof AMQTopic)
+ && ((AMQTopic)o).getExchangeName().equals(getExchangeName())
+ && ((AMQTopic)o).getRoutingKey().equals(getRoutingKey());
+
+ }
+
+ public int hashCode()
+ {
+ return getExchangeName().hashCode() + getRoutingKey().hashCode();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 4f8a3e5557..ae31f5ebdd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -514,11 +514,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (sendClose)
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame cancelFrame =
- BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
- false); // nowait
+ BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
+
+ final AMQFrame cancelFrame = body.generateFrame(_channelId);
try
{
@@ -603,15 +601,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (debug)
{
- _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag);
+ _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().getDeliveryTag());
}
try
{
AbstractJMSMessage jmsMessage =
- _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
- messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
- messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
+ _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(),
+ messageFrame.getDeliverBody().getRedelivered(), messageFrame.getDeliverBody().getExchange(),
+ messageFrame.getDeliverBody().getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
if (debug)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index fb6e4aa9fd..7e96fb537c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -140,20 +140,20 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private void declareDestination(AMQDestination destination)
{
+ ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
+ destination.getExchangeName(),
+ destination.getExchangeClass(),
+ false,
+ false,
+ false,
+ false,
+ true,
+ null);
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame declare =
- ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), null, // arguments
- false, // autoDelete
- false, // durable
- destination.getExchangeName(), // exchange
- false, // internal
- true, // nowait
- false, // passive
- _session.getTicket(), // ticket
- destination.getExchangeClass()); // type
+
+ AMQFrame declare = body.generateFrame(_channelId);
+
_protocolHandler.writeFrame(declare);
}
@@ -478,16 +478,14 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame publishFrame =
- BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange
- immediate, // immediate
- mandatory, // mandatory
- destination.getRoutingKey(), // routingKey
- _session.getTicket()); // ticket
+
+ BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
+ destination.getExchangeName(),
+ destination.getRoutingKey(),
+ mandatory,
+ immediate);
+
+ AMQFrame publishFrame = body.generateFrame(_channelId);
message.prepareForSending();
ByteBuffer payload = message.getData();
@@ -525,13 +523,13 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
_logger.debug("Sending content body frames to " + destination);
}
- // weight argument of zero indicates no child content headers, just bodies
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+
+ // TODO: This is a hacky way of getting the AMQP class-id for the Basic class
+ int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
+
AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
- BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
+ ContentHeaderBody.createAMQFrame(_channelId,
+ classIfForBasic, 0, contentHeaderProperties, size);
if (_logger.isDebugEnabled())
{
_logger.debug("Sending content header frame to " + destination);
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
new file mode 100644
index 0000000000..a150d1446a
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
@@ -0,0 +1,36 @@
+package org.apache.qpid.client.handler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody>
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AccessRequestOkMethodHandler.class);
+
+ private static AccessRequestOkMethodHandler _handler = new AccessRequestOkMethodHandler();
+
+ public static AccessRequestOkMethodHandler getInstance()
+ {
+ return _handler;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId)
+ throws AMQException
+ {
+ _logger.debug("AccessRequestOk method received");
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ session.setTicket(method.getTicket(), channelId);
+
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
index 8f0ee05b3e..e3e08667d8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
@@ -25,12 +25,13 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BasicCancelOkMethodHandler implements StateAwareMethodListener
+public class BasicCancelOkMethodHandler implements StateAwareMethodListener<BasicCancelOkBody>
{
private static final Logger _logger = LoggerFactory.getLogger(BasicCancelOkMethodHandler.class);
@@ -44,16 +45,18 @@ public class BasicCancelOkMethodHandler implements StateAwareMethodListener
private BasicCancelOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, BasicCancelOkBody body, int channelId)
throws AMQException
{
- BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+
if (_logger.isInfoEnabled())
{
- _logger.info("New BasicCancelOk method received for consumer:" + body.consumerTag);
+ _logger.info("New BasicCancelOk method received for consumer:" + body.getConsumerTag());
}
- protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
+ session.confirmConsumerCancelled(channelId, body.getConsumerTag());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
index 51120da55c..49c8a83833 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
@@ -31,7 +31,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BasicDeliverMethodHandler implements StateAwareMethodListener
+public class BasicDeliverMethodHandler implements StateAwareMethodListener<BasicDeliverBody>
{
private static final Logger _logger = LoggerFactory.getLogger(BasicDeliverMethodHandler.class);
@@ -42,11 +42,12 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, BasicDeliverBody body, int channelId)
throws AMQException
{
- final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod());
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ final UnprocessedMessage msg = new UnprocessedMessage(channelId, body);
_logger.debug("New JmsDeliver method received");
- protocolSession.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(msg);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
index 0f00c6a26e..428d366f07 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
@@ -31,7 +31,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BasicReturnMethodHandler implements StateAwareMethodListener
+public class BasicReturnMethodHandler implements StateAwareMethodListener<BasicReturnBody>
{
private static final Logger _logger = LoggerFactory.getLogger(BasicReturnMethodHandler.class);
@@ -42,12 +42,15 @@ public class BasicReturnMethodHandler implements StateAwareMethodListener
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, int channelId)
throws AMQException
{
_logger.debug("New JmsBounce method received");
- final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicReturnBody) evt.getMethod());
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ final UnprocessedMessage msg = new UnprocessedMessage(channelId, body);
- protocolSession.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(msg);
}
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 139a32370e..8c8814e9b7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -38,7 +38,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ChannelCloseMethodHandler implements StateAwareMethodListener
+public class ChannelCloseMethodHandler implements StateAwareMethodListener<ChannelCloseBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseMethodHandler.class);
@@ -49,22 +49,26 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId)
throws AMQException
{
_logger.debug("ChannelClose method received");
- ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
- AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
- AMQShortString reason = method.replyText;
+
+ AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+ AMQShortString reason = method.getReplyText();
if (_logger.isDebugEnabled())
{
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
}
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
- protocolSession.writeFrame(frame);
+
+
+ ChannelCloseOkBody body = session.getMethodRegistry().createChannelCloseOkBody();
+ AMQFrame frame = body.generateFrame(channelId);
+ session.writeFrame(frame);
+
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
if (_logger.isDebugEnabled())
@@ -100,6 +104,6 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
}
// fixme why is this only done when the close is expected...
// should the above forced closes not also cause a close?
- protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+ session.channelClosed(channelId, errorCode, String.valueOf(reason));
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
index e1fe2697e5..8d3277d4de 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -29,7 +30,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ChannelCloseOkMethodHandler implements StateAwareMethodListener
+public class ChannelCloseOkMethodHandler implements StateAwareMethodListener<ChannelCloseOkBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseOkMethodHandler.class);
@@ -40,11 +41,12 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody method, int channelId)
throws AMQException
{
- _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
+ _logger.info("Received channel-close-ok for channel-id " + channelId);
+ final AMQProtocolSession session = stateManager.getProtocolSession();
// todo this should do the local closure
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
index ca3f46d08b..96de8af54b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
@@ -30,7 +30,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ChannelFlowOkMethodHandler implements StateAwareMethodListener
+public class ChannelFlowOkMethodHandler implements StateAwareMethodListener<ChannelFlowOkBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ChannelFlowOkMethodHandler.class);
private static final ChannelFlowOkMethodHandler _instance = new ChannelFlowOkMethodHandler();
@@ -43,10 +43,12 @@ public class ChannelFlowOkMethodHandler implements StateAwareMethodListener
private ChannelFlowOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ChannelFlowOkBody body, int channelId)
+ throws AMQException
{
- ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod();
- _logger.debug("Received Channel.Flow-Ok message, active = " + method.active);
+
+ _logger.debug("Received Channel.Flow-Ok message, active = " + body.getActive());
}
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
new file mode 100644
index 0000000000..de976b05bd
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -0,0 +1,528 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQMethodNotImplementedException;
+
+
+public class ClientMethodDispatcherImpl implements MethodDispatcher
+{
+
+
+ private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance();
+ private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance();
+ private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
+ private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
+ private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+ private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
+ private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
+ private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
+ private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance();
+ private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance();
+ private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance();
+ private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance();
+ private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance();
+
+
+
+ private static interface DispatcherFactory
+ {
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager);
+ }
+
+ private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
+ new HashMap<ProtocolVersion, DispatcherFactory>();
+
+ static
+ {
+ _dispatcherFactories.put(ProtocolVersion.v8_0,
+ new DispatcherFactory()
+ {
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ClientMethodDispatcherImpl_8_0(stateManager);
+ }
+ });
+
+ _dispatcherFactories.put(ProtocolVersion.v0_9,
+ new DispatcherFactory()
+ {
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ClientMethodDispatcherImpl_0_9(stateManager);
+ }
+ });
+
+ }
+
+
+ public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager)
+ {
+ DispatcherFactory factory = _dispatcherFactories.get(version);
+ return factory.createMethodDispatcher(stateManager);
+ }
+
+
+
+
+ private AMQStateManager _stateManager;
+
+ public ClientMethodDispatcherImpl(AMQStateManager stateManager)
+ {
+ _stateManager = stateManager;
+ }
+
+
+ public AMQStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+
+ public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
+ {
+ _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
+ {
+ _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
+ {
+ _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
+ {
+ _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
+ {
+ _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
+ {
+ _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
+ {
+ _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
+ {
+ _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
+ {
+ _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
+ {
+ _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
+ {
+ _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
+ {
+ _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
+ {
+ _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
new file mode 100644
index 0000000000..ae6d5e8283
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQMethodNotImplementedException;
+
+
+public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9
+{
+ public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+ {
+ super(stateManager);
+ }
+
+
+ public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
new file mode 100644
index 0000000000..c28720f370
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
@@ -0,0 +1,65 @@
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+
+public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0
+{
+ public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+ {
+ super(stateManager);
+ }
+
+ public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index df096d3c4e..4d805cf123 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -36,7 +36,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConnectionCloseMethodHandler implements StateAwareMethodListener
+public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ConnectionCloseMethodHandler.class);
@@ -50,24 +50,26 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
private ConnectionCloseMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody method, int channelId)
+ throws AMQException
{
_logger.info("ConnectionClose frame received");
- ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+
// does it matter
// stateManager.changeState(AMQState.CONNECTION_CLOSING);
- AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
- AMQShortString reason = method.replyText;
+ AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+ AMQShortString reason = method.getReplyText();
try
{
+
+ ConnectionCloseOkBody closeOkBody = session.getMethodRegistry().createConnectionCloseOkBody();
// TODO: check whether channel id of zero is appropriate
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(),
- method.getMinor()));
+ session.writeFrame(closeOkBody.generateFrame(0));
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
@@ -75,7 +77,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
{
_logger.info("Authentication Error:" + Thread.currentThread().getName());
- protocolSession.closeProtocolSession();
+ session.closeProtocolSession();
// todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
@@ -94,9 +96,11 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
{
// this actually closes the connection in the case where it is not an error.
- protocolSession.closeProtocolSession();
+ session.closeProtocolSession();
stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
}
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
index 2e0f273c32..fd7acac84f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
@@ -21,13 +21,14 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.protocol.AMQMethodEvent;
-public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener
+public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<ConnectionOpenOkBody>
{
private static final ConnectionOpenOkMethodHandler _instance = new ConnectionOpenOkMethodHandler();
@@ -40,9 +41,11 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionOpenOkBody body, int channelId)
+ throws AMQException
{
stateManager.changeState(AMQState.CONNECTION_OPEN);
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
index 213c0eba6e..cac68c9467 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
@@ -30,7 +30,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
+public class ConnectionRedirectMethodHandler implements StateAwareMethodListener<ConnectionRedirectBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ConnectionRedirectMethodHandler.class);
@@ -46,13 +46,13 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
private ConnectionRedirectMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionRedirectBody method, int channelId)
+ throws AMQException
{
_logger.info("ConnectionRedirect frame received");
- ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
- String host = method.host.toString();
+ String host = method.getHost().toString();
// the host is in the form hostname:port with the port being optional
int portIndex = host.indexOf(':');
@@ -68,6 +68,7 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
}
- protocolSession.failover(host, port);
+ session.failover(host, port);
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
index ab6acffeaf..ab6d4b1b92 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
@@ -32,7 +32,7 @@ import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-public class ConnectionSecureMethodHandler implements StateAwareMethodListener
+public class ConnectionSecureMethodHandler implements StateAwareMethodListener<ConnectionSecureBody>
{
private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler();
@@ -41,27 +41,26 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionSecureBody body, int channelId)
+ throws AMQException
{
- SaslClient client = protocolSession.getSaslClient();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ SaslClient client = session.getSaslClient();
if (client == null)
{
throw new AMQException("No SASL client set up - cannot proceed with authentication");
}
- ConnectionSecureBody body = (ConnectionSecureBody) evt.getMethod();
+
try
{
// Evaluate server challenge
- byte[] response = client.evaluateChallenge(body.challenge);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
- body.getMajor(), body.getMinor(),
- response); // response
- protocolSession.writeFrame(responseFrame);
+ byte[] response = client.evaluateChallenge(body.getChallenge());
+
+ ConnectionSecureOkBody secureOkBody = session.getMethodRegistry().createConnectionSecureOkBody(response);
+
+ session.writeFrame(secureOkBody.generateFrame(channelId));
}
catch (SaslException e)
{
@@ -70,4 +69,6 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
}
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index f14e256172..4c755fb68c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -48,7 +48,7 @@ import java.io.UnsupportedEncodingException;
import java.util.HashSet;
import java.util.StringTokenizer;
-public class ConnectionStartMethodHandler implements StateAwareMethodListener
+public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody>
{
private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class);
@@ -62,15 +62,16 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
private ConnectionStartMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionStartBody body, int channelId)
+ throws AMQException
{
_log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, "
+ "AMQMethodEvent evt): called");
- ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+
- ProtocolVersion pv = new ProtocolVersion((byte) body.versionMajor, (byte) body.versionMinor);
+ ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(), (byte) body.getVersionMinor());
// For the purposes of interop, we can make the client accept the broker's version string.
// If it does, it then internally records the version as being the latest one that it understands.
@@ -83,26 +84,26 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
if (pv.isSupported())
{
- protocolSession.setProtocolVersion(pv.getMajorVersion(), pv.getMinorVersion());
+ session.setProtocolVersion(pv);
try
{
// Used to hold the SASL mechanism to authenticate with.
String mechanism;
- if (body.mechanisms == null)
+ if (body.getMechanisms()== null)
{
throw new AMQException("mechanism not specified in ConnectionStart method frame");
}
else
{
- mechanism = chooseMechanism(body.mechanisms);
+ mechanism = chooseMechanism(body.getMechanisms());
_log.debug("mechanism = " + mechanism);
}
if (mechanism == null)
{
- throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms));
+ throw new AMQException("No supported security mechanism found, passed: " + new String(body.getMechanisms()));
}
byte[] saslResponse;
@@ -110,7 +111,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
{
SaslClient sc =
Sasl.createSaslClient(new String[] { mechanism }, null, "AMQP", "localhost", null,
- createCallbackHandler(mechanism, protocolSession));
+ createCallbackHandler(mechanism, session));
if (sc == null)
{
throw new AMQException(
@@ -119,21 +120,21 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
+ " details of how to register non-standard SASL client providers.");
}
- protocolSession.setSaslClient(sc);
+ session.setSaslClient(sc);
saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null);
}
catch (SaslException e)
{
- protocolSession.setSaslClient(null);
+ session.setSaslClient(null);
throw new AMQException("Unable to create SASL client: " + e, e);
}
- if (body.locales == null)
+ if (body.getLocales() == null)
{
throw new AMQException("Locales is not defined in Connection Start method");
}
- final String locales = new String(body.locales, "utf8");
+ final String locales = new String(body.getLocales(), "utf8");
final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
String selectedLocale = null;
if (tokenizer.hasMoreTokens())
@@ -149,23 +150,20 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
FieldTable clientProperties = FieldTableFactory.newFieldTable();
clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()),
- protocolSession.getClientID());
+ session.getClientID());
clientProperties.setString(new AMQShortString(ClientProperties.product.toString()),
QpidProperties.getProductName());
clientProperties.setString(new AMQShortString(ClientProperties.version.toString()),
QpidProperties.getReleaseVersion());
clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+
+ ConnectionStartOkBody connectionStartOkBody = session.getMethodRegistry().createConnectionStartOkBody(clientProperties,new AMQShortString(mechanism),saslResponse,new AMQShortString(locales));
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
- protocolSession.getProtocolMajorVersion(), protocolSession.getProtocolMinorVersion(),
- clientProperties, // clientProperties
- new AMQShortString(selectedLocale), // locale
- new AMQShortString(mechanism), // mechanism
- saslResponse)); // response
-
+ session.writeFrame(connectionStartOkBody.generateFrame(channelId));
+
}
catch (UnsupportedEncodingException e)
{
@@ -174,10 +172,10 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
}
else
{
- _log.error("Broker requested Protocol [" + body.versionMajor + "-" + body.versionMinor
+ _log.error("Broker requested Protocol [" + body.getVersionMajor() + "-" + body.getVersionMinor()
+ "] which is not supported by this version of the client library");
- protocolSession.closeProtocolSession();
+ session.closeProtocolSession();
}
}
@@ -236,4 +234,5 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
throw new AMQException("Unable to create callback handler: " + e, e);
}
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
index 68556991d7..fc0e40b745 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
@@ -26,17 +26,13 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConnectionTuneMethodHandler implements StateAwareMethodListener
+public class ConnectionTuneMethodHandler implements StateAwareMethodListener<ConnectionTuneBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ConnectionTuneMethodHandler.class);
@@ -50,48 +46,41 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener
protected ConnectionTuneMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionTuneBody frame, int channelId)
+ throws AMQException
{
_logger.debug("ConnectionTune frame received");
- ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ final MethodRegistry methodRegistry = session.getMethodRegistry();
- ConnectionTuneParameters params = protocolSession.getConnectionTuneParameters();
+
+ ConnectionTuneParameters params = session.getConnectionTuneParameters();
if (params == null)
{
params = new ConnectionTuneParameters();
}
- params.setFrameMax(frame.frameMax);
- params.setChannelMax(frame.channelMax);
- params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
- protocolSession.setConnectionTuneParameters(params);
+ params.setFrameMax(frame.getFrameMax());
+ params.setChannelMax(frame.getChannelMax());
+ params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
+ session.setConnectionTuneParameters(params);
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params, frame.getMajor(), frame.getMinor()));
- String host = protocolSession.getAMQConnection().getVirtualHost();
+ ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(),
+ params.getFrameMax(),
+ params.getHeartbeat());
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(tuneOkBody.generateFrame(channelId));
+
+ String host = session.getAMQConnection().getVirtualHost();
AMQShortString virtualHost = new AMQShortString("/" + host);
- protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), virtualHost, null, true, frame.getMajor(),
- frame.getMinor()));
- }
+ ConnectionOpenBody openBody = methodRegistry.createConnectionOpenBody(virtualHost,null,true);
- protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities,
- boolean insist, byte major, byte minor)
- {
// Be aware of possible changes to parameter order as versions change.
- return ConnectionOpenBody.createAMQFrame(channel, major, minor, // AMQP version (major, minor)
- capabilities, // capabilities
- insist, // insist
- path); // virtualHost
+ session.writeFrame(openBody.generateFrame(channelId));
}
- protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params, byte major, byte minor)
- {
- // Be aware of possible changes to parameter order as versions change.
- return ConnectionTuneOkBody.createAMQFrame(channel, major, minor, params.getChannelMax(), // channelMax
- params.getFrameMax(), // frameMax
- params.getHeartbeat()); // heartbeat
- }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
index 862a9be8d4..8de40beb10 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
/**
* @author Apache Software Foundation
*/
-public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener
+public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener<ExchangeBoundOkBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ExchangeBoundOkMethodHandler.class);
private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler();
@@ -46,14 +46,14 @@ public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener
private ExchangeBoundOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ExchangeBoundOkBody body, int channelId)
+ throws AMQException
{
if (_logger.isDebugEnabled())
{
- ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod();
- _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: "
- + body.replyText);
+ _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.getReplyCode() + " text: "
+ + body.getReplyText());
}
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
index 65060d44d2..41225c0569 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
/**
* @author Apache Software Foundation
*/
-public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
+public class QueueDeleteOkMethodHandler implements StateAwareMethodListener<QueueDeleteOkBody>
{
private static final Logger _logger = LoggerFactory.getLogger(QueueDeleteOkMethodHandler.class);
private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
@@ -46,13 +46,14 @@ public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
private QueueDeleteOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
- {
+ public void methodReceived(AMQStateManager stateManager, QueueDeleteOkBody body, int channelId)
+ throws AMQException
+ {
if (_logger.isDebugEnabled())
{
- QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
- _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
+ _logger.debug("Received Queue.Delete-Ok message, message count: " + body.getMessageCount());
}
}
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 19142067cb..56efec4fa2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -41,16 +41,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -618,16 +609,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
getStateManager().changeState(AMQState.CONNECTION_CLOSING);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- final AMQFrame frame =
- ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(),
- _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection.")); // replyText
+ ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client is closing the connection."),0,0);
+
+
+ final AMQFrame frame = body.generateFrame(0);
try
{
@@ -730,4 +716,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
return _protocolSession.getProtocolMinorVersion();
}
+
+ public MethodRegistry getMethodRegistry()
+ {
+ return getStateManager().getMethodRegistry();
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolSession.getProtocolVersion();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 5fe6ffe6c6..18c1e85eaa 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -20,40 +20,29 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.commons.lang.StringUtils;
-
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.security.sasl.SaslClient;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
-// import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MainRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.VersionSpecificRegistry;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.JMSException;
-import javax.security.sasl.SaslClient;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.UUID;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
/**
* Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -100,12 +89,19 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected int _queueId = 1;
protected final Object _queueIdLock = new Object();
- private byte _protocolMinorVersion;
- private byte _protocolMajorVersion;
- private VersionSpecificRegistry _registry =
- MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
+ private ProtocolVersion _protocolVersion;
+// private VersionSpecificRegistry _registry =
+// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
+
- private final AMQConnection _connection;
+ private MethodRegistry _methodRegistry =
+ MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
+
+
+ private MethodDispatcher _methodDispatcher;
+
+
+ private final AMQConnection _connection;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{
@@ -125,6 +121,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
_stateManager = stateManager;
_stateManager.setProtocolSession(this);
+ _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
+ stateManager);
_connection = connection;
}
@@ -163,6 +162,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(_protocolVersion,
+ stateManager);
}
public String getVirtualHost()
@@ -434,26 +435,55 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
session.confirmConsumerCancelled(consumerTag);
}
- public void setProtocolVersion(final byte versionMajor, final byte versionMinor)
+ public void setProtocolVersion(final ProtocolVersion pv)
{
- _protocolMajorVersion = versionMajor;
- _protocolMinorVersion = versionMinor;
- _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
+ _protocolVersion = pv;
+ _methodRegistry = MethodRegistry.getMethodRegistry(pv);
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, _stateManager);
+
+ // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
}
public byte getProtocolMinorVersion()
{
- return _protocolMinorVersion;
+ return _protocolVersion.getMinorVersion();
}
public byte getProtocolMajorVersion()
{
- return _protocolMajorVersion;
+ return _protocolVersion.getMajorVersion();
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
+ }
+
+// public VersionSpecificRegistry getRegistry()
+// {
+// return _registry;
+// }
+
+ public MethodRegistry getMethodRegistry()
+ {
+ return _methodRegistry;
+ }
+
+ public MethodDispatcher getMethodDispatcher()
+ {
+ return _methodDispatcher;
}
- public VersionSpecificRegistry getRegistry()
+
+ public void setTicket(int ticket, int channelId)
{
- return _registry;
+ final AMQSession session = getSession(channelId);
+ session.setTicket(ticket);
}
+
+ public void setMethodDispatcher(MethodDispatcher methodDispatcher)
+ {
+ _methodDispatcher = methodDispatcher;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java
new file mode 100644
index 0000000000..767bcfcbcd
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.state;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.AMQException;
+
+public class AMQMethodNotImplementedException extends AMQException
+{
+ public AMQMethodNotImplementedException(AMQMethodBody body)
+ {
+ super("Unexpected Method Received: " + body.getClass().getName());
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 72ff3844ca..a9473df08c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -21,43 +21,15 @@
package org.apache.qpid.client.state;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.handler.BasicCancelOkMethodHandler;
-import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
-import org.apache.qpid.client.handler.BasicReturnMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
-import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
-import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
-import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
-import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
-import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler;
-import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
/**
@@ -73,11 +45,11 @@ public class AMQStateManager implements AMQMethodListener
/** The current state */
private AMQState _currentState;
+
/**
* Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
* AMQFrame.
*/
- protected final Map _state2HandlersMap = new HashMap();
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
private final Object _stateLock = new Object();
@@ -97,53 +69,10 @@ public class AMQStateManager implements AMQMethodListener
{
_protocolSession = protocolSession;
_currentState = state;
- if (register)
- {
- registerListeners();
- }
- }
- protected void registerListeners()
- {
- Map frame2handlerMap = new HashMap();
-
- // we need to register a map for the null (i.e. all state) handlers otherwise you get
- // a stack overflow in the handler searching code when you present it with a frame for which
- // no handlers are registered
- //
- _state2HandlersMap.put(null, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
+ }
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap);
-
- //
- // ConnectionOpen handlers
- //
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandler.getInstance());
- frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
- frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
- frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
- frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
- frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
- frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
- }
public AMQState getCurrentState()
{
@@ -177,56 +106,14 @@ public class AMQStateManager implements AMQMethodListener
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
- StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
- if (handler != null)
- {
- handler.methodReceived(this, _protocolSession, evt);
-
- return true;
- }
- return false;
+ B method = evt.getMethod();
+
+ // StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
+ method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId());
+ return true;
}
- protected StateAwareMethodListener findStateTransitionHandler(AMQState currentState, AMQMethodBody frame)
- // throws IllegalStateTransitionException
- {
- final Class clazz = frame.getClass();
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + clazz);
- }
-
- final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState);
-
- if (classToHandlerMap == null)
- {
- // if no specialised per state handler is registered look for a
- // handler registered for "all" states
- return findStateTransitionHandler(null, frame);
- }
-
- final StateAwareMethodListener handler = (StateAwareMethodListener) classToHandlerMap.get(clazz);
- if (handler == null)
- {
- if (currentState == null)
- {
- _logger.debug("No state transition handler defined for receiving frame " + frame);
-
- return null;
- }
- else
- {
- // if no specialised per state handler is registered look for a
- // handler registered for "all" states
- return findStateTransitionHandler(null, frame);
- }
- }
- else
- {
- return handler;
- }
- }
public void attainState(final AMQState s) throws AMQException
{
@@ -273,4 +160,9 @@ public class AMQStateManager implements AMQMethodListener
{
_protocolSession = session;
}
+
+ public MethodRegistry getMethodRegistry()
+ {
+ return getProtocolSession().getMethodRegistry();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
index b3932533ce..8c65f56af3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client.state;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -29,8 +30,9 @@ import org.apache.qpid.protocol.AMQMethodEvent;
* the opportunity to update state.
*
*/
-public interface StateAwareMethodListener
+public interface StateAwareMethodListener<B extends AMQMethodBody>
{
- void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession,
- AMQMethodEvent evt) throws AMQException;
+
+ void methodReceived(AMQStateManager stateManager, B body, int channelId) throws AMQException;
+
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java
index 4cffcecf8a..69684a81ea 100644
--- a/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java
@@ -2,6 +2,8 @@ package org.apache.qpid.framing;
import junit.framework.TestCase;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+
import org.apache.mina.common.ByteBuffer;
/*
@@ -58,8 +60,8 @@ public class SpecificMethodFrameListenerTest extends TestCase
public void testProcessMethod() throws AMQFrameDecodingException
{
- ChannelCloseOkBody ccob = (ChannelCloseOkBody) ChannelCloseOkBody.getFactory().newInstance((byte) 8, (byte) 0, ByteBuffer.allocate(0), 0);
- ChannelOpenOkBody coob = (ChannelOpenOkBody) ChannelOpenOkBody.getFactory().newInstance((byte) 8, (byte) 0, ByteBuffer.allocate(0), 0);
+ ChannelCloseOkBody ccob = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9).createChannelCloseOkBody();
+ ChannelOpenOkBody coob = ((MethodRegistry_0_9)(MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9))).createChannelOpenOkBody(new byte[0]);
assertTrue("This SpecificMethodFrameListener should process a ChannelCloseOkBody", close1a.processMethod(1, ccob));
assertFalse("This SpecificMethodFrameListener should NOT process a ChannelOpenOkBody", close1a.processMethod(1, coob));
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
index 575d542633..a74ae6f6d8 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
@@ -37,7 +37,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener
+public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener<ChannelCloseBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseMethodHandlerNoCloseOk.class);
@@ -48,14 +48,15 @@ public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListe
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId)
throws AMQException
{
_logger.debug("ChannelClose method received");
- ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
- AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
- AMQShortString reason = method.replyText;
+
+ AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+ AMQShortString reason = method.getReplyText();
if (_logger.isDebugEnabled())
{
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
@@ -95,6 +96,6 @@ public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListe
}
- protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+ session.channelClosed(channelId, errorCode, String.valueOf(reason));
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
index f1099ca5ab..da9d2ee9a1 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
@@ -25,20 +25,13 @@ import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
@@ -55,6 +48,9 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener
{
@@ -138,8 +134,11 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
/*
close channel and send guff then send ok no errors
+ REMOVE TEST - The behaviour after server has sent close is undefined.
+ the server should be free to fail as it may wish to reclaim its resources
+ immediately after close.
*/
- public void testSendingMethodsAfterClose() throws Exception
+ /*public void testSendingMethodsAfterClose() throws Exception
{
try
{
@@ -161,6 +160,17 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
// Set StateManager to manager that ignores Close-oks
AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession();
+
+ MethodDispatcher d = protocolSession.getMethodDispatcher();
+
+ MethodDispatcher wrappedDispatcher = (MethodDispatcher)
+ Proxy.newProxyInstance(d.getClass().getClassLoader(),
+ d.getClass().getInterfaces(),
+ new MethodDispatcherProxyHandler(
+ (ClientMethodDispatcherImpl) d));
+
+ protocolSession.setMethodDispatcher(wrappedDispatcher);
+
AMQStateManager newStateManager = new NoCloseOKStateManager(protocolSession);
newStateManager.changeState(oldStateManager.getCurrentState());
@@ -250,7 +260,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
}
}
}
-
+*/
private void createChannelAndTest(int channel) throws FailoverException
{
// Create A channel
@@ -277,10 +287,9 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
private void sendClose(int channel)
{
- AMQFrame frame =
- ChannelCloseOkBody.createAMQFrame(channel,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion());
+ ChannelCloseOkBody body =
+ ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelCloseOkBody();
+ AMQFrame frame = body.generateFrame(channel);
((AMQConnection) _connection).getProtocolHandler().writeFrame(frame);
}
@@ -338,25 +347,22 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
private void declareExchange(final int channelId, final String _type, final String _name, final boolean nowait)
throws AMQException, FailoverException
{
-// new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
-// {
-// public Object execute() throws AMQException, FailoverException
-// {
+ ExchangeDeclareBody body =
+ ((AMQConnection) _connection).getProtocolHandler()
+ .getMethodRegistry()
+ .createExchangeDeclareBody(0,
+ new AMQShortString(_name),
+ new AMQShortString(_type),
+ true,
+ false,
+ false,
+ false,
+ nowait,
+ null);
+ AMQFrame exchangeDeclare = body.generateFrame(channelId);
AMQProtocolHandler protocolHandler = ((AMQConnection) _connection).getProtocolHandler();
- AMQFrame exchangeDeclare =
- ExchangeDeclareBody.createAMQFrame(channelId,
- protocolHandler.getProtocolMajorVersion(),
- protocolHandler.getProtocolMinorVersion(), null, // arguments
- false, // autoDelete
- false, // durable
- new AMQShortString(_name), // exchange
- false, // internal
- nowait, // nowait
- true, // passive
- 0, // ticket
- new AMQShortString(_type)); // type
if (nowait)
{
@@ -375,9 +381,10 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
private void createChannel(int channelId) throws AMQException, FailoverException
{
- ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand
+ ChannelOpenBody body =
+ ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
+
+ ((AMQConnection) _connection).getProtocolHandler().syncWrite(body.generateFrame(channelId), // outOfBand
ChannelOpenOkBody.class);
}
@@ -409,4 +416,28 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
public void failoverComplete()
{
}
+
+ private static final class MethodDispatcherProxyHandler implements InvocationHandler
+ {
+ private final ClientMethodDispatcherImpl _underlyingDispatcher;
+ private final ChannelCloseMethodHandlerNoCloseOk _handler = ChannelCloseMethodHandlerNoCloseOk.getInstance();
+
+
+ public MethodDispatcherProxyHandler(ClientMethodDispatcherImpl dispatcher)
+ {
+ _underlyingDispatcher = dispatcher;
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ if(method.getName().equals("dispatchChannelClose"))
+ {
+ _handler.methodReceived(_underlyingDispatcher.getStateManager(),
+ (ChannelCloseBody) args[0], (Integer)args[1]);
+ }
+ Method dispatcherMethod = _underlyingDispatcher.getClass().getMethod(method.getName(), method.getParameterTypes());
+ return dispatcherMethod.invoke(_underlyingDispatcher, args);
+
+ }
+ }
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
index d128f30727..c7eb745566 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
@@ -59,49 +59,7 @@ public class NoCloseOKStateManager extends AMQStateManager
super(protocolSession);
}
- protected void registerListeners()
- {
- Map frame2handlerMap = new HashMap();
-
- // we need to register a map for the null (i.e. all state) handlers otherwise you get
- // a stack overflow in the handler searching code when you present it with a frame for which
- // no handlers are registered
- //
- _state2HandlersMap.put(null, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap);
-
- //
- // ConnectionOpen handlers
- //
- frame2handlerMap = new HashMap();
- // Use Test Handler for Close methods to not send Close-OKs
- frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandlerNoCloseOk.getInstance());
-
- frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
- frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
- frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
- frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
- frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
- frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
- }
+
}