diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2006-12-22 17:00:28 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2006-12-22 17:00:28 +0000 |
| commit | 5129ac060aed57d8e31a62c3cd64ff0ad8995949 (patch) | |
| tree | 12616b7ae0cc57d7c3fb88025fd05cf31686d51a /java/broker | |
| parent | 142d35580b326c99a306f6476ff0a0b723db920e (diff) | |
| download | qpid-python-5129ac060aed57d8e31a62c3cd64ff0ad8995949.tar.gz | |
AMQP version using new generator - Part 1. In these changes, all places where version-specific info is required, it has been hard-wired to major=8, minor=0. The next phase of changes will connect the version info to that obtained from ProtocolInitiation for the current session.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@489691 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
26 files changed, 274 insertions, 73 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 87691ccaa3..5e463646f9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -81,7 +81,11 @@ public abstract class RequiredDeliveryException extends AMQException public CompositeAMQDataBlock getReturnMessage(int channel) { - BasicReturnBody returnBody = new BasicReturnBody(); + // AMQP version change: All generated *Body classes are now version-aware. + // Shortcut: hardwire version to 0-8 (major=8, minor=0) for now. + // TODO: Connect the version to that returned by the ProtocolInitiation + // for this session. + BasicReturnBody returnBody = new BasicReturnBody((byte)8, (byte)0); returnBody.exchange = _publishBody.exchange; returnBody.replyCode = getReplyCode(); returnBody.replyText = _message; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index 673556cbec..198d2c1f3d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -54,7 +54,12 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC channel.unsubscribeConsumer(protocolSession, body.consumerTag); if(!body.nowait) { - final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), body.consumerTag); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + body.consumerTag); // consumerTag protocolSession.writeFrame(responseFrame); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 1e57c714ff..d3aece9818 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -81,7 +81,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic body.arguments, body.noLocal); if (!body.nowait) { - session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag)); // consumerTag } //now allow queue to start async processing of any backlog of messages @@ -90,16 +95,28 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic catch (AMQInvalidSelectorException ise) { _log.info("Closing connection due to invalid selector"); - session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(), - ise.getMessage(), BasicConsumeBody.CLASS_ID, - BasicConsumeBody.METHOD_ID)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.INVALID_SELECTOR.getCode(), // replyCode + ise.getMessage())); // replyText } catch (ConsumerTagNotUniqueException e) { String msg = "Non-unique consumer tag, '" + body.consumerTag + "'"; - session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, - BasicConsumeBody.CLASS_ID, - BasicConsumeBody.METHOD_ID)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.NOT_ALLOWED.getCode(), // replyCode + msg)); // replyText } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index efdbe7aae4..423ea5f276 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -64,7 +64,15 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi protocolSession.closeChannel(evt.getChannelId()); // TODO: modify code gen to make getClazz and getMethod public methods rather than protected // then we can remove the hardcoded 0,0 - AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), 500, "Unknown exchange name", 0, 0); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + ChannelCloseBody.getClazz((byte)8, (byte)0), // classId + ChannelCloseBody.getMethod((byte)8, (byte)0), // methodId + 500, // replyCode + "Unknown exchange name"); // replyText protocolSession.writeFrame(cf); } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java index 1357ff16b9..379ce3d072 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java @@ -44,6 +44,9 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException { session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); - session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody())); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)8, (byte)0))); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 0efe12b137..d26f84d17e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -55,7 +55,10 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + " and method " + body.methodId); protocolSession.closeChannel(evt.getChannelId()); - AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index 87ccc60907..27833ac250 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -58,6 +58,12 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB channel.setSuspended(!body.active); _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active); - AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), body.active); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + body.active); // active protocolSession.writeFrame(response); - }} + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 4cccc774ba..43d2cae8e4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -55,7 +55,10 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(), exchangeRegistry); protocolSession.addChannel(channel); - AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index 7bdb1942d0..d000e3b590 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -62,7 +62,10 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C { _logger.error("Error closing protocol session: " + e, e); } - final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index bfcc50e1f8..9f9b029ada 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -64,7 +64,12 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con contextKey = generateClientID(); } protocolSession.setContextKey(contextKey); - AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, contextKey); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, + (byte)8, (byte)0, // AMQP version (major, minor) + contextKey); // knownHosts stateManager.changeState(AMQState.CONNECTION_OPEN); protocolSession.writeFrame(response); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index c32f5e4283..ea93a357d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -75,25 +75,43 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName()); _logger.info("Authentication failed"); stateManager.changeState(AMQState.CONNECTION_CLOSING); - AMQFrame close = ConnectionCloseBody.createAMQFrame(0, AMQConstant.NOT_ALLOWED.getCode(), - AMQConstant.NOT_ALLOWED.getName(), - ConnectionCloseBody.CLASS_ID, - ConnectionCloseBody.METHOD_ID); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame close = ConnectionCloseBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + ConnectionCloseBody.getClazz((byte)8, (byte)0), // classId + ConnectionCloseBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.NOT_ALLOWED.getCode(), // replyCode + AMQConstant.NOT_ALLOWED.getName()); // replyText protocolSession.writeFrame(close); disposeSaslServer(protocolSession); break; case SUCCESS: _logger.info("Connected as: " + ss.getAuthorizationID()); stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); - AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, - ConnectionStartOkMethodHandler.getConfiguredFrameSize(), - HeartbeatConfig.getInstance().getDelay()); + // TODO: Check the value of channelMax here: This should be the max + // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire), + // not Integer.MAX_VALUE (which is signed 4 bytes). + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + Integer.MAX_VALUE, // channelMax + ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax + HeartbeatConfig.getInstance().getDelay()); // heartbeat protocolSession.writeFrame(tune); disposeSaslServer(protocolSession); break; case CONTINUE: stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); - AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + authResult.challenge); // challenge protocolSession.writeFrame(challenge); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 79b2e11bca..9f24100df1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -92,13 +92,24 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< _logger.info("Connected as: " + ss.getAuthorizationID()); stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); - AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(), - HeartbeatConfig.getInstance().getDelay()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + Integer.MAX_VALUE, // channelMax + getConfiguredFrameSize(), // frameMax + HeartbeatConfig.getInstance().getDelay()); // heartbeat protocolSession.writeFrame(tune); break; case CONTINUE: stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); - AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + authResult.challenge); // challenge protocolSession.writeFrame(challenge); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 5aaf78d6b7..30e8990b54 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -64,6 +64,11 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + byte major = (byte)8; + byte minor = (byte)0; + ExchangeBoundBody body = evt.getMethod(); String exchangeName = body.exchange; @@ -77,8 +82,11 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo AMQFrame response; if (exchange == null) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND, - "Exchange " + exchangeName + " not found"); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + EXCHANGE_NOT_FOUND, // replyCode + "Exchange " + exchangeName + " not found"); // replyText } else if (routingKey == null) { @@ -86,11 +94,19 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { if (exchange.hasBindings()) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + OK, // replyCode + null); // replyText } else { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + NO_BINDINGS, // replyCode + null); // replyText } } else @@ -98,20 +114,29 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo AMQQueue queue = queueRegistry.getQueue(queueName); if (queue == null) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND, - "Queue " + queueName + " not found"); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + QUEUE_NOT_FOUND, // replyCode + "Queue " + queueName + " not found"); // replyText } else { if (exchange.isBound(queue)) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + OK, // replyCode + null); // replyText } else { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND, - "Queue " + queueName + " not bound to exchange " + - exchangeName); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + QUEUE_NOT_BOUND, // replyCode + "Queue " + queueName + " not bound to exchange " + exchangeName); // replyText } } } @@ -121,24 +146,30 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo AMQQueue queue = queueRegistry.getQueue(queueName); if (queue == null) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND, - "Queue " + queueName + " not found"); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + QUEUE_NOT_FOUND, // replyCode + "Queue " + queueName + " not found"); // replyText } else { if (exchange.isBound(body.routingKey, queue)) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, - null); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + OK, // replyCode + null); // replyText } else { + // AMQP version change: Be aware of possible changes to parameter order as versions change. response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, - "Queue " + queueName + - " not bound with routing key " + - body.routingKey + " to exchange " + - exchangeName); + major, minor, // AMQP version (major, minor) + SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode + "Queue " + queueName + " not bound with routing key " + + body.routingKey + " to exchange " + exchangeName); // replyText } } } @@ -146,16 +177,20 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { if (exchange.isBound(body.routingKey)) { - response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, - null); + // AMQP version change: Be aware of possible changes to parameter order as versions change. + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + major, minor, // AMQP version (major, minor) + OK, // replyCode + null); // replyText } else { + // AMQP version change: Be aware of possible changes to parameter order as versions change. response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), - NO_QUEUE_BOUND_WITH_RK, - "No queue bound with routing key " + - body.routingKey + " to exchange " + - exchangeName); + major, minor, // AMQP version (major, minor) + NO_QUEUE_BOUND_WITH_RK, // replyCode + "No queue bound with routing key " + body.routingKey + + " to exchange " + exchangeName); // replyText } } protocolSession.writeFrame(response); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index b7c75e290a..7937a9bb2d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -75,7 +75,10 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange } if(!body.nowait) { - AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index 93ef902190..153a9de4c4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -53,7 +53,10 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD try { exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused); - AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } catch (ExchangeInUseException e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index cf9e40a660..b7fc786981 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -90,7 +90,10 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> } if (!body.nowait) { - final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); protocolSession.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index b7004de2a9..83f98de2d9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -102,7 +102,14 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } if (!body.nowait) { - AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), body.queue, 0L, 0L); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + 0L, // consumerCount + 0L, // messageCount + body.queue); // queue _log.info("Queue " + body.queue + " declared successfully"); protocolSession.writeFrame(response); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 0dbc54f29b..688968b8a0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -81,7 +81,12 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete { int purged = queue.delete(body.ifUnused, body.ifEmpty); _store.removeQueue(queue.getName()); - session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), purged)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + purged)); // messageCount } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index ac864cab6c..7fcad5bbf3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -52,7 +52,10 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> try{ AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); channel.commit(); - protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId())); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); channel.processReturns(protocolSession); }catch(AMQException e){ throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 475f6ecacf..588dc026d4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -51,7 +51,10 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod try{ AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); channel.rollback(); - protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId())); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). channel.resend(protocolSession); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index c30bc7d66f..7df3825d8a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -48,6 +48,9 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> AMQMethodEvent<TxSelectBody> evt) throws AMQException { protocolSession.getChannel(evt.getChannelId()).setTransactional(true); - protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId())); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 7a9dfbc67c..9ff6b96690 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -165,8 +165,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _minor = pi.protocolMinor; String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); String locales = "en_US"; - AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null, - mechanisms.getBytes(), locales.getBytes()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, + (byte)8, (byte)0, // AMQP version (major, minor) + locales.getBytes(), // locales + mechanisms.getBytes(), // mechanisms + null, // serverProperties + (short)8, // versionMajor + (short)0 // versionMinor + ); _minaProtocolSession.write(response); } catch (AMQException e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 18980f440b..2e9590277b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -168,11 +168,20 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco } else if(throwable instanceof IOException) { - _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable); + _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable); } else { - protocolSession.write(ConnectionCloseBody.createAMQFrame(0, 200, throwable.getMessage(), 0, 0)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.write(ConnectionCloseBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + 200, // replyCode + throwable.getMessage() // replyText + )); _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); protocolSession.close(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index d57f9b9be1..0ceadcb30b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -193,8 +193,16 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public void closeConnection() throws JMException { - final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, AMQConstant.REPLY_SUCCESS.getCode(), - "Broker Management Console has closing the connection.", 0, 0); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, + (byte)8, (byte)0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + "Broker Management Console has closing the connection." // replyText + ); _session.writeFrame(response); try diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index b27cd807c0..afe4ea95b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -157,10 +157,20 @@ public class AMQMessage public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag) { + AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()]; - allFrames[0] = BasicDeliverBody.createAMQFrame(channel, consumerTag, deliveryTag, _redelivered, - getExchangeName(), getRoutingKey()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + allFrames[0] = BasicDeliverBody.createAMQFrame(channel, + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag, // consumerTag + deliveryTag, // deliveryTag + getExchangeName(), // exchange + _redelivered, // redelivered + getRoutingKey() // routingKey + ); allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); for (int i = 2; i < allFrames.length; i++) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 4272541298..78310e8eb3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -379,7 +379,13 @@ public class SubscriptionImpl implements Subscription if (!_closed) { _logger.info("Closing autoclose subscription:" + this); - protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag // consumerTag + )); _closed = true; } } @@ -392,9 +398,17 @@ public class SubscriptionImpl implements Subscription private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { - AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag, - deliveryTag, false, exchange, - routingKey); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag, // consumerTag + deliveryTag, // deliveryTag + exchange, // exchange + false, // redelivered + routingKey // routingKey + ); ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? deliverFrame.writePayload(buf); buf.flip(); |
