From 8ef73fe7003c6a5a528277743d4ebfd9f6e31b89 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 7 Apr 2007 16:03:07 +0000 Subject: Refactoring and multi-version support git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@526447 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/AMQChannelException.java | 4 +- .../org/apache/qpid/AMQConnectionException.java | 3 +- .../java/org/apache/qpid/codec/AMQDecoder.java | 16 ++- .../org/apache/qpid/common/ClientProperties.java | 20 ++- .../main/java/org/apache/qpid/framing/AMQBody.java | 21 ++- .../java/org/apache/qpid/framing/AMQBodyImpl.java | 27 ++++ .../apache/qpid/framing/AMQDataBlockDecoder.java | 2 +- .../org/apache/qpid/framing/AMQMethodBody.java | 136 +++--------------- .../apache/qpid/framing/AMQMethodBodyFactory.java | 2 +- .../org/apache/qpid/framing/AMQMethodBodyImpl.java | 102 +++++++++++++ .../qpid/framing/AMQMethodBodyInstanceFactory.java | 3 +- .../org/apache/qpid/framing/AMQMethodFactory.java | 90 ++++++++++++ .../java/org/apache/qpid/framing/ContentBody.java | 2 +- .../apache/qpid/framing/ContentBodyFactory.java | 2 +- .../org/apache/qpid/framing/ContentHeaderBody.java | 4 +- .../qpid/framing/ContentHeaderBodyFactory.java | 2 +- .../framing/ContentHeaderPropertiesFactory.java | 4 +- .../org/apache/qpid/framing/HeartbeatBody.java | 6 +- .../apache/qpid/framing/HeartbeatBodyFactory.java | 2 +- .../java/org/apache/qpid/framing/MainRegistry.java | 35 +++++ .../apache/qpid/framing/MethodConverter_8_0.java | 22 ++- .../qpid/framing/VersionSpecificRegistry.java | 2 +- .../abstraction/MessagePublishInfoConverter.java | 3 +- .../ProtocolVersionMethodConverter.java | 3 +- .../qpid/framing/amqp_8_0/AMQMethodBody_8_0.java | 160 +++++++++++++++++++++ .../framing/amqp_8_0/AMQMethodFactory_8_0.java | 117 +++++++++++++++ .../java/org/apache/qpid/pool/PoolingFilter.java | 1 + .../org/apache/qpid/protocol/AMQMethodEvent.java | 1 + .../apache/qpid/protocol/AMQMethodListener.java | 4 +- .../protocol/AMQVersionAwareProtocolSession.java | 4 +- .../apache/qpid/protocol/ProtocolVersionAware.java | 5 +- 31 files changed, 631 insertions(+), 174 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/framing/AMQBodyImpl.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/MainRegistry.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodFactory_8_0.java (limited to 'java/common') diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index d8c9b287bd..12120bd10d 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -23,6 +23,8 @@ package org.apache.qpid; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl; +import org.apache.qpid.framing.amqp_8_0.ChannelCloseBodyImpl; import org.apache.qpid.protocol.AMQConstant; public class AMQChannelException extends AMQException @@ -53,6 +55,6 @@ public class AMQChannelException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); + return new AMQFrame(channel, new ChannelCloseBodyImpl(getErrorCode().getCode(), new AMQShortString(getMessage()),0,0)); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index c4f80191a3..094e26802d 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -24,6 +24,7 @@ package org.apache.qpid; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl; import org.apache.qpid.protocol.AMQConstant; public class AMQConnectionException extends AMQException @@ -57,7 +58,7 @@ public class AMQConnectionException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); + return new AMQFrame(channel, new ConnectionCloseBodyImpl(getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId)); } diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index bb981a242f..6e0a5c3786 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -48,13 +48,21 @@ public class AMQDecoder extends CumulativeProtocolDecoder protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - if (_expectProtocolInitiation) + try { - return doDecodePI(session, in, out); + if (_expectProtocolInitiation) + { + return doDecodePI(session, in, out); + } + else + { + return doDecodeDataBlock(session, in, out); + } } - else + catch (Exception e) { - return doDecodeDataBlock(session, in, out); + e.printStackTrace(); + throw e; } } diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java index 07371b5182..1f1911aa35 100644 --- a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java @@ -20,10 +20,28 @@ */ package org.apache.qpid.common; +import org.apache.qpid.framing.AMQShortString; + public enum ClientProperties { + + instance, product, version, - platform + platform; + + + private final AMQShortString _name; + + private ClientProperties() + { + _name = new AMQShortString(toString()); + } + + public AMQShortString getName() + { + return _name; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index ebeea8d2b4..4dd5ab7a9a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,18 +22,15 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public abstract class AMQBody +public interface AMQBody { - public abstract byte getFrameType(); - - /** + byte getFrameType(); + + /** * Get the size of the body * @return unsigned short */ - protected abstract int getSize(); - - protected abstract void writePayload(ByteBuffer buffer); - - protected abstract void populateFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException; + int getSize(); + + void writePayload(ByteBuffer buffer); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBodyImpl.java new file mode 100644 index 0000000000..6b2d1feae5 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBodyImpl.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + + +public abstract class AMQBodyImpl implements AMQBody +{ + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 43f888c029..2ecd4d4650 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -72,7 +72,7 @@ public class AMQDataBlockDecoder final byte type = in.get(); BodyFactory bodyFactory; - if(type == AMQMethodBody.TYPE) + if(type == AMQMethodBodyImpl.TYPE) { bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); if(bodyFactory == null) diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index f2e91083ca..ddd03145c4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -1,132 +1,28 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQConstant; -public abstract class AMQMethodBody extends AMQBody +/** + * Created by IntelliJ IDEA. + * User: U146758 + * Date: 08-Mar-2007 + * Time: 11:30:28 + * To change this template use File | Settings | File Templates. + */ +public interface AMQMethodBody extends AMQBody { - public static final byte TYPE = 1; - - /** AMQP version */ - protected byte major; - protected byte minor; - - public byte getMajor() - { - return major; - } - - public byte getMinor() - { - return minor; - } - - public AMQMethodBody(byte major, byte minor) - { - this.major = major; - this.minor = minor; - } - - /** unsigned short */ - protected abstract int getBodySize(); - - /** @return unsigned short */ - protected abstract int getClazz(); - - /** @return unsigned short */ - protected abstract int getMethod(); - - protected abstract void writeMethodPayload(ByteBuffer buffer); - - public byte getFrameType() - { - return TYPE; - } - - protected int getSize() - { - return 2 + 2 + getBodySize(); - } - - protected void writePayload(ByteBuffer buffer) - { - EncodingUtils.writeUnsignedShort(buffer, getClazz()); - EncodingUtils.writeUnsignedShort(buffer, getMethod()); - writeMethodPayload(buffer); - } - - protected abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; - - protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException - { - populateMethodBodyFromBuffer(buffer); - } - - public String toString() - { - StringBuffer buf = new StringBuffer(getClass().getName()); - buf.append("[ Class: ").append(getClazz()); - buf.append(" Method: ").append(getMethod()).append(']'); - return buf.toString(); - } - - /** - * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and - * method ids of the body it resulted from). - */ - - /** - * Convenience Method to create a channel not found exception - * - * @param channelId The channel id that is not found - * - * @return new AMQChannelException - */ - public AMQChannelException getChannelNotFoundException(int channelId) - { - return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId); - } - - public AMQChannelException getChannelException(AMQConstant code, String message) - { - return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor); - } + public int getClazz(); + public int getMethod(); + + AMQChannelException getChannelNotFoundException(int channelId); - public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) - { - return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause); - } + AMQChannelException getChannelException(AMQConstant code, String message); - public AMQConnectionException getConnectionException(AMQConstant code, String message) - { - return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor); - } + AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause); - public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) - { - return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause); - } + AMQConnectionException getConnectionException(AMQConstant code, String message); + AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index 5293c00379..f5cd971c0e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -37,6 +37,6 @@ public class AMQMethodBodyFactory implements BodyFactory public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize); + return _protocolSession.getRegistry().convertToBody(in, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java new file mode 100644 index 0000000000..0a802f8857 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.protocol.AMQConstant; + +public abstract class AMQMethodBodyImpl extends AMQBodyImpl implements AMQMethodBody +{ + public static final byte TYPE = 1; + + + public abstract byte getMajor(); + public abstract byte getMinor(); + + /** unsigned short */ + protected abstract int getBodySize(); + + /** @return unsigned short */ + public abstract int getClazz(); + + /** @return unsigned short */ + public abstract int getMethod(); + + protected abstract void writeMethodPayload(ByteBuffer buffer); + + public byte getFrameType() + { + return TYPE; + } + + public int getSize() + { + return 2 + 2 + getBodySize(); + } + + public void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, getClazz()); + EncodingUtils.writeUnsignedShort(buffer, getMethod()); + writeMethodPayload(buffer); + } + + + /** + * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and + * method ids of the body it resulted from). + */ + + /** + * Convenience Method to create a channel not found exception + * + * @param channelId The channel id that is not found + * + * @return new AMQChannelException + */ + public AMQChannelException getChannelNotFoundException(int channelId) + { + return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId); + } + + public AMQChannelException getChannelException(AMQConstant code, String message) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor()); + } + + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause); + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor()); + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java index cfbc9d1828..9a7868f3cd 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -5,6 +5,5 @@ import org.apache.mina.common.ByteBuffer; public abstract interface AMQMethodBodyInstanceFactory { - public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException; - public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException; + public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java new file mode 100644 index 0000000000..4ffc9e0066 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + + +public interface AMQMethodFactory +{ + + // Connection Methods + + ConnectionCloseBody createConnectionClose(); + + // Access Methods + + AccessRequestBody createAccessRequest(boolean active, boolean exclusive, boolean passive, boolean read, AMQShortString realm, boolean write); + + + // Tx Methods + + TxSelectBody createTxSelect(); + + TxCommitBody createTxCommit(); + + TxRollbackBody createTxRollback(); + + // Channel Methods + + ChannelOpenBody createChannelOpen(); + + ChannelCloseBody createChannelClose(int replyCode, AMQShortString replyText); + + ChannelFlowBody createChannelFlow(boolean active); + + + // Exchange Methods + + + ExchangeBoundBody createExchangeBound(AMQShortString exchangeName, + AMQShortString queueName, + AMQShortString routingKey); + + ExchangeDeclareBody createExchangeDeclare(AMQShortString name, AMQShortString type, int ticket); + + + // Queue Methods + + QueueDeclareBody createQueueDeclare(AMQShortString name, FieldTable arguments, boolean autoDelete, boolean durable, boolean exclusive, boolean passive, int ticket); + + QueueBindBody createQueueBind(AMQShortString queueName, AMQShortString exchangeName, AMQShortString routingKey, FieldTable arguments, int ticket); + + QueueDeleteBody createQueueDelete(AMQShortString queueName, boolean ifEmpty, boolean ifUnused, int ticket); + + + // Message Methods + + // In different versions of the protocol we change the class used for message transfer + // abstract this out so the appropriate methods are created + AMQMethodBody createRecover(boolean requeue); + + AMQMethodBody createConsumer(AMQShortString tag, AMQShortString queueName, FieldTable arguments, boolean noAck, boolean exclusive, boolean noLocal, int ticket); + + AMQMethodBody createConsumerCancel(AMQShortString consumerTag); + + AMQMethodBody createAcknowledge(long deliveryTag, boolean multiple); + + AMQMethodBody createRejectBody(long deliveryTag, boolean requeue); + + AMQMethodBody createMessageQos(int prefetchCount, int prefetchSize); + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index be38695384..a1aaab06c6 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class ContentBody extends AMQBody +public class ContentBody extends AMQBodyImpl { public static final byte TYPE = 3; diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java index 5636229d53..7b6a92e691 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -39,7 +39,7 @@ public class ContentBodyFactory implements BodyFactory _log.debug("Creating content body factory"); } - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBodyImpl createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { return new ContentBody(in, bodySize); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 02631a5f88..c71f47bad2 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class ContentHeaderBody extends AMQBody +public class ContentHeaderBody extends AMQBodyImpl { public static final byte TYPE = 2; @@ -110,7 +110,7 @@ public class ContentHeaderBody extends AMQBody properties.writePropertyListPayload(buffer); } - public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, + public static AMQFrame createAMQFrame(int channelId, int classId, int weight, CommonContentHeaderProperties properties, long bodySize) { return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize)); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java index 818fc9cf0c..9570ec800d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -39,7 +39,7 @@ public class ContentHeaderBodyFactory implements BodyFactory _log.debug("Creating content header body factory"); } - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBodyImpl createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { // all content headers are the same - it is only the properties that differ. // the content header body further delegates construction of properties diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index 7dac018872..a8a8097fd2 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; + import org.apache.mina.common.ByteBuffer; public class ContentHeaderPropertiesFactory @@ -43,7 +45,7 @@ public class ContentHeaderPropertiesFactory // AMQP version change: "Hardwired" version to major=8, minor=0 // TODO: Change so that the actual version is obtained from // the ProtocolInitiation object for this session. - if (classId == BasicConsumeBody.getClazz((byte)8, (byte)0)) + if (classId == BasicConsumeBodyImpl.CLASS_ID) { properties = new BasicContentHeaderProperties(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 7246c4a1cf..17b2a2f9c2 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class HeartbeatBody extends AMQBody +public class HeartbeatBody extends AMQBodyImpl { public static final byte TYPE = 8; public static AMQFrame FRAME = new HeartbeatBody().toFrame(); @@ -46,12 +46,12 @@ public class HeartbeatBody extends AMQBody return TYPE; } - protected int getSize() + public int getSize() { return 0;//heartbeats we generate have no payload } - protected void writePayload(ByteBuffer buffer) + public void writePayload(ByteBuffer buffer) { } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java index c7ada708dc..2249f1d1cf 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer; public class HeartbeatBodyFactory implements BodyFactory { - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBodyImpl createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { return new HeartbeatBody(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MainRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/MainRegistry.java new file mode 100644 index 0000000000..d75589f914 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/MainRegistry.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public class MainRegistry +{ + + public static VersionSpecificRegistry getVersionSpecificRegistry(byte versionMajor, byte versionMinor) + { + return null; //To change body of created methods use File | Settings | File Templates. + } + + public static VersionSpecificRegistry getVersionSpecificRegistry(ProtocolVersion pv) + { + return null; //To change body of created methods use File | Settings | File Templates. + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java index dd93cc97fa..f253372a65 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java @@ -4,6 +4,7 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.AbstractMethodConverter; +import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl; import org.apache.mina.common.ByteBuffer; @@ -19,7 +20,7 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot } - public AMQBody convertToBody(ContentChunk contentChunk) + public AMQBodyImpl convertToBody(ContentChunk contentChunk) { return new ContentBody(contentChunk.getData()); } @@ -52,8 +53,8 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot public void configure() { - _basicPublishClassId = BasicPublishBody.getClazz(getProtocolMajorVersion(),getProtocolMinorVersion()); - _basicPublishMethodId = BasicPublishBody.getMethod(getProtocolMajorVersion(),getProtocolMinorVersion()); + _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID; + _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; } @@ -87,18 +88,15 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot } - public AMQMethodBody convertToBody(MessagePublishInfo info) + public AMQMethodBodyImpl convertToBody(MessagePublishInfo info) { - return new BasicPublishBody(getProtocolMajorVersion(), - getProtocolMinorVersion(), - _basicPublishClassId, - _basicPublishMethodId, - info.getExchange(), - info.isImmediate(), + return new BasicPublishBodyImpl(0, // ticket + info.getExchange(), + info.getRoutingKey(), info.isMandatory(), - info.getRoutingKey(), - 0) ; // ticket + info.isImmediate() + ) ; } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java index ec371453aa..ebe0b91cf4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -179,7 +179,7 @@ public class VersionSpecificRegistry } - return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size); + return bodyFactory.newInstance(in, size); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java index c9e15f18e3..d5da133837 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java @@ -18,12 +18,13 @@ package org.apache.qpid.framing.abstraction; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.AMQMethodBody; public interface MessagePublishInfoConverter { public MessagePublishInfo convertToInfo(AMQMethodBody body); - public AMQMethodBody convertToBody(MessagePublishInfo info); + public AMQMethodBodyImpl convertToBody(MessagePublishInfo info); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java index 52e82cdf07..b8e460eb05 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java @@ -18,11 +18,12 @@ package org.apache.qpid.framing.abstraction; +import org.apache.qpid.framing.AMQBodyImpl; import org.apache.qpid.framing.AMQBody; public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter { - AMQBody convertToBody(ContentChunk contentBody); + AMQBodyImpl convertToBody(ContentChunk contentBody); ContentChunk convertToContentChunk(AMQBody body); void configure(); diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java new file mode 100644 index 0000000000..63d0f0e407 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java @@ -0,0 +1,160 @@ +package org.apache.qpid.framing.amqp_8_0; + +import org.apache.qpid.framing.EncodingUtils; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +import org.apache.mina.common.ByteBuffer; + +public abstract class AMQMethodBody_8_0 extends org.apache.qpid.framing.AMQMethodBodyImpl +{ + + public byte getMajor() + { + return 8; + } + + public byte getMinor() + { + return 0; + } + + public int getSize() + { + return 2 + 2 + getBodySize(); + } + + public void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, getClazz()); + EncodingUtils.writeUnsignedShort(buffer, getMethod()); + writeMethodPayload(buffer); + } + + + protected byte readByte(ByteBuffer buffer) + { + return buffer.get(); + } + + protected AMQShortString readAMQShortString(ByteBuffer buffer) + { + return EncodingUtils.readAMQShortString(buffer); + } + + protected int getSizeOf(AMQShortString string) + { + return EncodingUtils.encodedShortStringLength(string); + } + + protected void writeByte(ByteBuffer buffer, byte b) + { + buffer.put(b); + } + + protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string) + { + EncodingUtils.writeShortStringBytes(buffer, string); + } + + protected int readInt(ByteBuffer buffer) + { + return buffer.getInt(); + } + + protected void writeInt(ByteBuffer buffer, int i) + { + buffer.putInt(i); + } + + protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + { + return EncodingUtils.readFieldTable(buffer); + } + + protected int getSizeOf(FieldTable table) + { + return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates. + } + + protected void writeFieldTable(ByteBuffer buffer, FieldTable table) + { + EncodingUtils.writeFieldTableBytes(buffer, table); + } + + protected long readLong(ByteBuffer buffer) + { + return buffer.getLong(); + } + + protected void writeLong(ByteBuffer buffer, long l) + { + buffer.putLong(l); + } + + protected int getSizeOf(byte[] response) + { + return response == null ? 4 : response.length + 4; //To change body of created methods use File | Settings | File Templates. + } + + protected void writeBytes(ByteBuffer buffer, byte[] data) + { + EncodingUtils.writeLongstr(buffer,data); + } + + protected byte[] readBytes(ByteBuffer buffer) + { + return EncodingUtils.readLongstr(buffer); + } + + protected short readShort(ByteBuffer buffer) + { + return EncodingUtils.readShort(buffer); + } + + protected void writeShort(ByteBuffer buffer, short s) + { + EncodingUtils.writeShort(buffer, s); + } + + protected short readUnsignedByte(ByteBuffer buffer) + { + return buffer.getUnsigned(); + } + + protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte) + { + EncodingUtils.writeUnsignedByte(buffer, unsignedByte); + } + + protected byte readBitfield(ByteBuffer buffer) + { + return readByte(buffer); + } + + protected int readUnsignedShort(ByteBuffer buffer) + { + return buffer.getUnsignedShort(); + } + + protected void writeBitfield(ByteBuffer buffer, byte bitfield0) + { + buffer.put(bitfield0); + } + + protected void writeUnsignedShort(ByteBuffer buffer, int s) + { + EncodingUtils.writeUnsignedShort(buffer, s); + } + + protected long readUnsignedInteger(ByteBuffer buffer) + { + return buffer.getUnsignedInt(); + } + protected void writeUnsignedInteger(ByteBuffer buffer, long i) + { + EncodingUtils.writeUnsignedInteger(buffer, i); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodFactory_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodFactory_8_0.java new file mode 100644 index 0000000000..188ed07a70 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodFactory_8_0.java @@ -0,0 +1,117 @@ +package org.apache.qpid.framing.amqp_8_0; + +import org.apache.qpid.framing.*; +import org.apache.qpid.protocol.AMQConstant; + +import org.apache.mina.common.ByteBuffer; + + +public class AMQMethodFactory_8_0 implements AMQMethodFactory +{ + private static final AMQShortString CLIENT_INITIATED_CONNECTION_CLOSE = + new AMQShortString("Client initiated connection close"); + + public ConnectionCloseBody createConnectionClose() + { + return new ConnectionCloseBodyImpl(AMQConstant.REPLY_SUCCESS.getCode(), + CLIENT_INITIATED_CONNECTION_CLOSE, + 0, + 0); + } + + public AccessRequestBody createAccessRequest(boolean active, boolean exclusive, boolean passive, boolean read, AMQShortString realm, boolean write) + { + return new AccessRequestBodyImpl(realm,exclusive,passive,active,write,read); + } + + public TxSelectBody createTxSelect() + { + return new TxSelectBodyImpl(); + } + + public TxCommitBody createTxCommit() + { + return new TxCommitBodyImpl(); + } + + public TxRollbackBody createTxRollback() + { + return new TxRollbackBodyImpl(); + } + + public ChannelOpenBody createChannelOpen() + { + return new ChannelOpenBodyImpl((AMQShortString)null); + } + + public ChannelCloseBody createChannelClose(int replyCode, AMQShortString replyText) + { + return new ChannelCloseBodyImpl(replyCode, replyText, 0, 0); + } + + public ExchangeDeclareBody createExchangeDeclare(AMQShortString name, AMQShortString type, int ticket) + { + return new ExchangeDeclareBodyImpl(ticket,name,type,false,false,false,false,false,null); + } + + public ExchangeBoundBody createExchangeBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) + { + return new ExchangeBoundBodyImpl(exchangeName,routingKey,queueName); + } + + public QueueDeclareBody createQueueDeclare(AMQShortString name, FieldTable arguments, boolean autoDelete, boolean durable, boolean exclusive, boolean passive, int ticket) + { + return new QueueDeclareBodyImpl(ticket,name,passive,durable,exclusive,autoDelete,false,arguments); + } + + public QueueBindBody createQueueBind(AMQShortString queueName, AMQShortString exchangeName, AMQShortString routingKey, FieldTable arguments, int ticket) + { + return new QueueBindBodyImpl(ticket,queueName,exchangeName,routingKey,false,arguments); + } + + public QueueDeleteBody createQueueDelete(AMQShortString queueName, boolean ifEmpty, boolean ifUnused, int ticket) + { + return new QueueDeleteBodyImpl(ticket,queueName,ifUnused,ifEmpty,false); + } + + public ChannelFlowBody createChannelFlow(boolean active) + { + return new ChannelFlowBodyImpl(active); + } + + + // In different versions of the protocol we change the class used for message transfer + // abstract this out so the appropriate methods are created + public AMQMethodBody createRecover(boolean requeue) + { + return new BasicRecoverBodyImpl(requeue); + } + + public AMQMethodBody createConsumer(AMQShortString tag, AMQShortString queueName, FieldTable arguments, boolean noAck, boolean exclusive, boolean noLocal, int ticket) + { + return new BasicConsumeBodyImpl(ticket,queueName,tag,noLocal,noAck,exclusive,false,arguments); + } + + public AMQMethodBody createConsumerCancel(AMQShortString consumerTag) + { + return new BasicCancelBodyImpl(consumerTag, false); + } + + public AMQMethodBody createAcknowledge(long deliveryTag, boolean multiple) + { + return new BasicAckBodyImpl(deliveryTag,multiple); + } + + public AMQMethodBody createRejectBody(long deliveryTag, boolean requeue) + { + return new BasicRejectBodyImpl(deliveryTag, requeue); + } + + public AMQMethodBody createMessageQos(int prefetchCount, int prefetchSize) + { + return new BasicQosBodyImpl(prefetchSize, prefetchCount, false); + } + + + +} diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index 8126ca4bc8..17a2ec5d4e 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -136,6 +136,7 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH public void exceptionCaught(final NextFilter nextFilter, final IoSession session, final Throwable cause) throws Exception { + cause.printStackTrace(); nextFilter.exceptionCaught(session,cause); } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java index ab36041cb8..db76b6fe7e 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.AMQMethodBody; /** diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java index 85bbe50b11..75ae6645e8 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java @@ -21,6 +21,7 @@ package org.apache.qpid.protocol; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.AMQMethodBody; /** @@ -32,7 +33,6 @@ public interface AMQMethodListener /** * Invoked when a method frame has been received * @param evt the event that contains the method and channel - * @param protocolSession the protocol session associated with the event * @return true if the handler has processed the method frame, false otherwise. Note * that this does not prohibit the method event being delivered to subsequent listeners * but can be used to determine if nobody has dealt with an incoming method frame. @@ -40,7 +40,7 @@ public interface AMQMethodListener * to all registered listeners using the error() method (see below) allowing them to * perform cleanup if necessary. */ - boolean methodReceived(AMQMethodEvent evt) throws Exception; + boolean methodReceived(AMQMethodEvent evt) throws AMQException; /** * Callback when an error has occurred. Allows listeners to clean up. diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index b57c26e496..65f60e7f59 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.framing.VersionSpecificRegistry; +import org.apache.qpid.framing.MethodRegistry; public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, ProtocolVersionAware { - public VersionSpecificRegistry getRegistry(); + public MethodRegistry getRegistry(); } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java index 64db953bc2..c2c0bf29b7 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java @@ -20,9 +20,10 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.framing.ProtocolVersion; + public interface ProtocolVersionAware { - public byte getProtocolMinorVersion(); + public ProtocolVersion getProtocolVersion(); - public byte getProtocolMajorVersion(); } -- cgit v1.2.1