From 32d79a6e99ad773e8a7b49efa12e06c028e7d2f4 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 22 Dec 2006 17:00:28 +0000 Subject: AMQP version using new generator - Part 1. In these changes, all places where version-specific info is required, it has been hard-wired to major=8, minor=0. The next phase of changes will connect the version info to that obtained from ProtocolInitiation for the current session. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@489691 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/RequiredDeliveryException.java | 6 +- .../server/handler/BasicCancelMethodHandler.java | 7 +- .../server/handler/BasicConsumeMethodHandler.java | 31 ++++++-- .../server/handler/BasicPublishMethodHandler.java | 10 ++- .../qpid/server/handler/BasicQosHandler.java | 5 +- .../qpid/server/handler/ChannelCloseHandler.java | 5 +- .../qpid/server/handler/ChannelFlowHandler.java | 10 ++- .../qpid/server/handler/ChannelOpenHandler.java | 5 +- .../handler/ConnectionCloseMethodHandler.java | 5 +- .../handler/ConnectionOpenMethodHandler.java | 7 +- .../handler/ConnectionSecureOkMethodHandler.java | 34 +++++++-- .../handler/ConnectionStartOkMethodHandler.java | 17 ++++- .../qpid/server/handler/ExchangeBoundHandler.java | 85 +++++++++++++++------- .../server/handler/ExchangeDeclareHandler.java | 5 +- .../qpid/server/handler/ExchangeDeleteHandler.java | 5 +- .../qpid/server/handler/QueueBindHandler.java | 5 +- .../qpid/server/handler/QueueDeclareHandler.java | 9 ++- .../qpid/server/handler/QueueDeleteHandler.java | 7 +- .../qpid/server/handler/TxCommitHandler.java | 5 +- .../qpid/server/handler/TxRollbackHandler.java | 5 +- .../qpid/server/handler/TxSelectHandler.java | 5 +- .../server/protocol/AMQMinaProtocolSession.java | 13 +++- .../server/protocol/AMQPFastProtocolHandler.java | 13 +++- .../server/protocol/AMQProtocolSessionMBean.java | 12 ++- .../org/apache/qpid/server/queue/AMQMessage.java | 14 +++- .../apache/qpid/server/queue/SubscriptionImpl.java | 22 +++++- 26 files changed, 274 insertions(+), 73 deletions(-) (limited to 'qpid/java/broker') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 87691ccaa3..5e463646f9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -81,7 +81,11 @@ public abstract class RequiredDeliveryException extends AMQException public CompositeAMQDataBlock getReturnMessage(int channel) { - BasicReturnBody returnBody = new BasicReturnBody(); + // AMQP version change: All generated *Body classes are now version-aware. + // Shortcut: hardwire version to 0-8 (major=8, minor=0) for now. + // TODO: Connect the version to that returned by the ProtocolInitiation + // for this session. + BasicReturnBody returnBody = new BasicReturnBody((byte)8, (byte)0); returnBody.exchange = _publishBody.exchange; returnBody.replyCode = getReplyCode(); returnBody.replyText = _message; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index 673556cbec..198d2c1f3d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -54,7 +54,12 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener AMQProtocolSession session, AMQMethodEvent evt) throws AMQException { session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); - session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody())); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)8, (byte)0))); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 0efe12b137..d26f84d17e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -55,7 +55,10 @@ public class ChannelCloseHandler implements StateAwareMethodListener evt) throws AMQException { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + byte major = (byte)8; + byte minor = (byte)0; + ExchangeBoundBody body = evt.getMethod(); String exchangeName = body.exchange; @@ -77,8 +82,11 @@ public class ExchangeBoundHandler implements StateAwareMethodListener } if (!body.nowait) { - final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index b7004de2a9..83f98de2d9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -102,7 +102,14 @@ public class QueueDeclareHandler implements StateAwareMethodListener try{ AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); channel.commit(); - protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId())); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); channel.processReturns(protocolSession); }catch(AMQException e){ throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 475f6ecacf..588dc026d4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -51,7 +51,10 @@ public class TxRollbackHandler implements StateAwareMethodListener AMQMethodEvent evt) throws AMQException { protocolSession.getChannel(evt.getChannelId()).setTransactional(true); - protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId())); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 7a9dfbc67c..9ff6b96690 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -165,8 +165,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _minor = pi.protocolMinor; String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); String locales = "en_US"; - AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null, - mechanisms.getBytes(), locales.getBytes()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, + (byte)8, (byte)0, // AMQP version (major, minor) + locales.getBytes(), // locales + mechanisms.getBytes(), // mechanisms + null, // serverProperties + (short)8, // versionMajor + (short)0 // versionMinor + ); _minaProtocolSession.write(response); } catch (AMQException e) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 18980f440b..2e9590277b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -168,11 +168,20 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco } else if(throwable instanceof IOException) { - _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable); + _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable); } else { - protocolSession.write(ConnectionCloseBody.createAMQFrame(0, 200, throwable.getMessage(), 0, 0)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.write(ConnectionCloseBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + 200, // replyCode + throwable.getMessage() // replyText + )); _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); protocolSession.close(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index d57f9b9be1..0ceadcb30b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -193,8 +193,16 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public void closeConnection() throws JMException { - final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, AMQConstant.REPLY_SUCCESS.getCode(), - "Broker Management Console has closing the connection.", 0, 0); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + "Broker Management Console has closing the connection." // replyText + ); _session.writeFrame(response); try diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index b27cd807c0..afe4ea95b9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -157,10 +157,20 @@ public class AMQMessage public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag) { + AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()]; - allFrames[0] = BasicDeliverBody.createAMQFrame(channel, consumerTag, deliveryTag, _redelivered, - getExchangeName(), getRoutingKey()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + allFrames[0] = BasicDeliverBody.createAMQFrame(channel, + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag, // consumerTag + deliveryTag, // deliveryTag + getExchangeName(), // exchange + _redelivered, // redelivered + getRoutingKey() // routingKey + ); allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); for (int i = 2; i < allFrames.length; i++) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 4272541298..78310e8eb3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -379,7 +379,13 @@ public class SubscriptionImpl implements Subscription if (!_closed) { _logger.info("Closing autoclose subscription:" + this); - protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag // consumerTag + )); _closed = true; } } @@ -392,9 +398,17 @@ public class SubscriptionImpl implements Subscription private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { - AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag, - deliveryTag, false, exchange, - routingKey); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag, // consumerTag + deliveryTag, // deliveryTag + exchange, // exchange + false, // redelivered + routingKey // routingKey + ); ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? deliverFrame.writePayload(buf); buf.flip(); -- cgit v1.2.1