From e56329673ddece158bf3dd124c40a820d0d12366 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 12 Apr 2007 16:04:05 +0000 Subject: Unbreaking trunk due to erroneous commit git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@527999 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/AMQChannelException.java | 4 +- .../org/apache/qpid/AMQConnectionException.java | 3 +- .../main/java/org/apache/qpid/AMQException.java | 27 +++- .../java/org/apache/qpid/codec/AMQDecoder.java | 16 +-- .../org/apache/qpid/common/ClientProperties.java | 20 +-- .../qpid/configuration/PropertyException.java | 20 ++- .../main/java/org/apache/qpid/framing/AMQBody.java | 75 +++++----- .../apache/qpid/framing/AMQDataBlockDecoder.java | 2 +- .../qpid/framing/AMQFrameDecodingException.java | 16 ++- .../org/apache/qpid/framing/AMQMethodBody.java | 160 +++++++++++++++++---- .../apache/qpid/framing/AMQMethodBodyFactory.java | 2 +- .../qpid/framing/AMQMethodBodyInstanceFactory.java | 3 +- .../java/org/apache/qpid/framing/ContentBody.java | 2 +- .../apache/qpid/framing/ContentBodyFactory.java | 2 +- .../org/apache/qpid/framing/ContentHeaderBody.java | 4 +- .../qpid/framing/ContentHeaderBodyFactory.java | 2 +- .../framing/ContentHeaderPropertiesFactory.java | 4 +- .../org/apache/qpid/framing/HeartbeatBody.java | 6 +- .../apache/qpid/framing/HeartbeatBodyFactory.java | 2 +- .../apache/qpid/framing/MethodConverter_8_0.java | 22 +-- .../qpid/framing/VersionSpecificRegistry.java | 73 +++++----- .../abstraction/MessagePublishInfoConverter.java | 3 +- .../ProtocolVersionMethodConverter.java | 3 +- .../java/org/apache/qpid/pool/PoolingFilter.java | 1 - .../org/apache/qpid/protocol/AMQMethodEvent.java | 1 - .../apache/qpid/protocol/AMQMethodListener.java | 4 +- .../protocol/AMQVersionAwareProtocolSession.java | 4 +- .../apache/qpid/protocol/ProtocolVersionAware.java | 5 +- 28 files changed, 304 insertions(+), 182 deletions(-) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index 12120bd10d..d8c9b287bd 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -23,8 +23,6 @@ package org.apache.qpid; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl; -import org.apache.qpid.framing.amqp_8_0.ChannelCloseBodyImpl; import org.apache.qpid.protocol.AMQConstant; public class AMQChannelException extends AMQException @@ -55,6 +53,6 @@ public class AMQChannelException extends AMQException public AMQFrame getCloseFrame(int channel) { - return new AMQFrame(channel, new ChannelCloseBodyImpl(getErrorCode().getCode(), new AMQShortString(getMessage()),0,0)); + return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index 094e26802d..c4f80191a3 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -24,7 +24,6 @@ package org.apache.qpid; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl; import org.apache.qpid.protocol.AMQConstant; public class AMQConnectionException extends AMQException @@ -58,7 +57,7 @@ public class AMQConnectionException extends AMQException public AMQFrame getCloseFrame(int channel) { - return new AMQFrame(channel, new ConnectionCloseBodyImpl(getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId)); + return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); } diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java index 32c1e76a39..0222fd9b4e 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,7 @@ */ package org.apache.qpid; +import org.apache.log4j.Logger; import org.apache.qpid.protocol.AMQConstant; /** Generic AMQ exception. */ @@ -30,14 +31,14 @@ public class AMQException extends Exception public AMQException(String message) { super(message); - // fixme This method needs removed and all AMQExceptions need a valid error code + //fixme This method needs removed and all AMQExceptions need a valid error code _errorCode = AMQConstant.getConstant(-1); } public AMQException(String msg, Throwable t) { super(msg, t); - // fixme This method needs removed and all AMQExceptions need a valid error code + //fixme This method needs removed and all AMQExceptions need a valid error code _errorCode = AMQConstant.getConstant(-1); } @@ -53,6 +54,24 @@ public class AMQException extends Exception _errorCode = errorCode; } + public AMQException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public AMQException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public AMQException(Logger logger, AMQConstant errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + public AMQConstant getErrorCode() { return _errorCode; diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 6e0a5c3786..bb981a242f 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -48,21 +48,13 @@ public class AMQDecoder extends CumulativeProtocolDecoder protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - try + if (_expectProtocolInitiation) { - if (_expectProtocolInitiation) - { - return doDecodePI(session, in, out); - } - else - { - return doDecodeDataBlock(session, in, out); - } + return doDecodePI(session, in, out); } - catch (Exception e) + else { - e.printStackTrace(); - throw e; + return doDecodeDataBlock(session, in, out); } } diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java index 1f1911aa35..07371b5182 100644 --- a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java @@ -20,28 +20,10 @@ */ package org.apache.qpid.common; -import org.apache.qpid.framing.AMQShortString; - public enum ClientProperties { - - instance, product, version, - platform; - - - private final AMQShortString _name; - - private ClientProperties() - { - _name = new AMQShortString(toString()); - } - - public AMQShortString getName() - { - return _name; - } - + platform } diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java index 022e7b8a76..958f59191f 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,7 @@ */ package org.apache.qpid.configuration; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -47,4 +48,19 @@ public class PropertyException extends AMQException { super(errorCode, msg); } + + public PropertyException(Logger logger, String msg, Throwable t) + { + super(logger, msg, t); + } + + public PropertyException(Logger logger, String msg) + { + super(logger, msg); + } + + public PropertyException(Logger logger, AMQConstant errorCode, String msg) + { + super(logger, errorCode, msg); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index 4dd5ab7a9a..c497717870 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -1,36 +1,39 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; - -public interface AMQBody -{ - byte getFrameType(); - - /** - * Get the size of the body - * @return unsigned short - */ - int getSize(); - - void writePayload(ByteBuffer buffer); -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public abstract class AMQBody +{ + public abstract byte getFrameType(); + + /** + * Get the size of the body + * @return unsigned short + */ + protected abstract int getSize(); + + protected abstract void writePayload(ByteBuffer buffer); + + protected abstract void populateFromBuffer(ByteBuffer buffer, long size) + throws AMQFrameDecodingException, AMQProtocolVersionException; +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 2ecd4d4650..43f888c029 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -72,7 +72,7 @@ public class AMQDataBlockDecoder final byte type = in.get(); BodyFactory bodyFactory; - if(type == AMQMethodBodyImpl.TYPE) + if(type == AMQMethodBody.TYPE) { bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); if(bodyFactory == null) diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java index a3d4513240..a24bd6aaa9 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,7 @@ */ package org.apache.qpid.framing; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; public class AMQFrameDecodingException extends AMQException @@ -33,4 +34,15 @@ public class AMQFrameDecodingException extends AMQException { super(message, t); } + + public AMQFrameDecodingException(Logger log, String message) + { + super(log, message); + } + + public AMQFrameDecodingException(Logger log, String message, Throwable t) + { + super(log, message, t); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index ddd03145c4..23a1ce367e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -1,28 +1,132 @@ -package org.apache.qpid.framing; - -import org.apache.qpid.AMQChannelException; -import org.apache.qpid.AMQConnectionException; -import org.apache.qpid.protocol.AMQConstant; - -/** - * Created by IntelliJ IDEA. - * User: U146758 - * Date: 08-Mar-2007 - * Time: 11:30:28 - * To change this template use File | Settings | File Templates. - */ -public interface AMQMethodBody extends AMQBody -{ - public int getClazz(); - public int getMethod(); - - AMQChannelException getChannelNotFoundException(int channelId); - - AMQChannelException getChannelException(AMQConstant code, String message); - - AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause); - - AMQConnectionException getConnectionException(AMQConstant code, String message); - - AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause); -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.protocol.AMQConstant; + +public abstract class AMQMethodBody extends AMQBody +{ + public static final byte TYPE = 1; + + /** AMQP version */ + protected byte major; + protected byte minor; + + public byte getMajor() + { + return major; + } + + public byte getMinor() + { + return minor; + } + + public AMQMethodBody(byte major, byte minor) + { + this.major = major; + this.minor = minor; + } + + /** unsigned short */ + protected abstract int getBodySize(); + + /** @return unsigned short */ + protected abstract int getClazz(); + + /** @return unsigned short */ + protected abstract int getMethod(); + + protected abstract void writeMethodPayload(ByteBuffer buffer); + + public byte getFrameType() + { + return TYPE; + } + + protected int getSize() + { + return 2 + 2 + getBodySize(); + } + + protected void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, getClazz()); + EncodingUtils.writeUnsignedShort(buffer, getMethod()); + writeMethodPayload(buffer); + } + + protected abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + populateMethodBodyFromBuffer(buffer); + } + + public String toString() + { + StringBuffer buf = new StringBuffer(getClass().getName()); + buf.append("[ Class: ").append(getClazz()); + buf.append(" Method: ").append(getMethod()).append(']'); + return buf.toString(); + } + + /** + * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and + * method ids of the body it resulted from). + */ + + /** + * Convenience Method to create a channel not found exception + * + * @param channelId The channel id that is not found + * + * @return new AMQChannelException + */ + public AMQChannelException getChannelNotFoundException(int channelId) + { + return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId); + } + + public AMQChannelException getChannelException(AMQConstant code, String message) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor); + } + + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause); + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor); + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index f5cd971c0e..5293c00379 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -37,6 +37,6 @@ public class AMQMethodBodyFactory implements BodyFactory public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - return _protocolSession.getRegistry().convertToBody(in, bodySize); + return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java index 9a7868f3cd..cfbc9d1828 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -5,5 +5,6 @@ import org.apache.mina.common.ByteBuffer; public abstract interface AMQMethodBodyInstanceFactory { - public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException; + public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException; + public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index a1aaab06c6..be38695384 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class ContentBody extends AMQBodyImpl +public class ContentBody extends AMQBody { public static final byte TYPE = 3; diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java index 7b6a92e691..5636229d53 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -39,7 +39,7 @@ public class ContentBodyFactory implements BodyFactory _log.debug("Creating content body factory"); } - public AMQBodyImpl createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { return new ContentBody(in, bodySize); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index c71f47bad2..02631a5f88 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class ContentHeaderBody extends AMQBodyImpl +public class ContentHeaderBody extends AMQBody { public static final byte TYPE = 2; @@ -110,7 +110,7 @@ public class ContentHeaderBody extends AMQBodyImpl properties.writePropertyListPayload(buffer); } - public static AMQFrame createAMQFrame(int channelId, int classId, int weight, CommonContentHeaderProperties properties, + public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, long bodySize) { return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize)); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java index 9570ec800d..818fc9cf0c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -39,7 +39,7 @@ public class ContentHeaderBodyFactory implements BodyFactory _log.debug("Creating content header body factory"); } - public AMQBodyImpl createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { // all content headers are the same - it is only the properties that differ. // the content header body further delegates construction of properties diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index a8a8097fd2..7dac018872 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; - import org.apache.mina.common.ByteBuffer; public class ContentHeaderPropertiesFactory @@ -45,7 +43,7 @@ public class ContentHeaderPropertiesFactory // AMQP version change: "Hardwired" version to major=8, minor=0 // TODO: Change so that the actual version is obtained from // the ProtocolInitiation object for this session. - if (classId == BasicConsumeBodyImpl.CLASS_ID) + if (classId == BasicConsumeBody.getClazz((byte)8, (byte)0)) { properties = new BasicContentHeaderProperties(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 17b2a2f9c2..7246c4a1cf 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class HeartbeatBody extends AMQBodyImpl +public class HeartbeatBody extends AMQBody { public static final byte TYPE = 8; public static AMQFrame FRAME = new HeartbeatBody().toFrame(); @@ -46,12 +46,12 @@ public class HeartbeatBody extends AMQBodyImpl return TYPE; } - public int getSize() + protected int getSize() { return 0;//heartbeats we generate have no payload } - public void writePayload(ByteBuffer buffer) + protected void writePayload(ByteBuffer buffer) { } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java index 2249f1d1cf..c7ada708dc 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer; public class HeartbeatBodyFactory implements BodyFactory { - public AMQBodyImpl createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { return new HeartbeatBody(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java index f253372a65..dd93cc97fa 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java @@ -4,7 +4,6 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.AbstractMethodConverter; -import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl; import org.apache.mina.common.ByteBuffer; @@ -20,7 +19,7 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot } - public AMQBodyImpl convertToBody(ContentChunk contentChunk) + public AMQBody convertToBody(ContentChunk contentChunk) { return new ContentBody(contentChunk.getData()); } @@ -53,8 +52,8 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot public void configure() { - _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID; - _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; + _basicPublishClassId = BasicPublishBody.getClazz(getProtocolMajorVersion(),getProtocolMinorVersion()); + _basicPublishMethodId = BasicPublishBody.getMethod(getProtocolMajorVersion(),getProtocolMinorVersion()); } @@ -88,15 +87,18 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot } - public AMQMethodBodyImpl convertToBody(MessagePublishInfo info) + public AMQMethodBody convertToBody(MessagePublishInfo info) { - return new BasicPublishBodyImpl(0, // ticket - info.getExchange(), - info.getRoutingKey(), + return new BasicPublishBody(getProtocolMajorVersion(), + getProtocolMinorVersion(), + _basicPublishClassId, + _basicPublishMethodId, + info.getExchange(), + info.isImmediate(), info.isMandatory(), - info.isImmediate() - ) ; + info.getRoutingKey(), + 0) ; // ticket } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java index 085479f227..ec371453aa 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -20,16 +20,16 @@ */ package org.apache.qpid.framing; -import org.apache.log4j.Logger; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; - public class VersionSpecificRegistry { private static final Logger _log = Logger.getLogger(VersionSpecificRegistry.class); + private final byte _protocolMajorVersion; private final byte _protocolMinorVersion; @@ -48,31 +48,26 @@ public class VersionSpecificRegistry _protocolVersionConverter = loadProtocolVersionConverters(major, minor); } - private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion, - byte protocolMinorVersion) + private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion, byte protocolMinorVersion) { try { Class versionMethodConverterClass = - (Class) Class.forName("org.apache.qpid.framing.MethodConverter_" - + protocolMajorVersion + "_" + protocolMinorVersion); - + (Class) Class.forName("org.apache.qpid.framing.MethodConverter_"+protocolMajorVersion + "_" + protocolMinorVersion); return versionMethodConverterClass.newInstance(); } catch (ClassNotFoundException e) { _log.warn("Could not find protocol conversion classes for " + protocolMajorVersion + "-" + protocolMinorVersion); - if (protocolMinorVersion != 0) + if(protocolMinorVersion != 0) { protocolMinorVersion--; - return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion); } else if (protocolMajorVersion != 0) { protocolMajorVersion--; - return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion); } else @@ -80,6 +75,7 @@ public class VersionSpecificRegistry return null; } + } catch (IllegalAccessException e) { @@ -87,7 +83,7 @@ public class VersionSpecificRegistry } catch (InstantiationException e) { - throw new IllegalStateException("Unable to load protocol version converter: ", e); + throw new IllegalStateException("Unable to load protocol version converter: ", e); } } @@ -119,67 +115,72 @@ public class VersionSpecificRegistry public void registerMethod(final short classID, final short methodID, final AMQMethodBodyInstanceFactory instanceFactory) { - if (_registry.length <= classID) + if(_registry.length <= classID) { AMQMethodBodyInstanceFactory[][] oldRegistry = _registry; - _registry = new AMQMethodBodyInstanceFactory[classID + 1][]; + _registry = new AMQMethodBodyInstanceFactory[classID+1][]; System.arraycopy(oldRegistry, 0, _registry, 0, oldRegistry.length); } - if (_registry[classID] == null) + if(_registry[classID] == null) { - _registry[classID] = - new AMQMethodBodyInstanceFactory[(methodID > DEFAULT_MAX_METHOD_ID) ? (methodID + 1) - : (DEFAULT_MAX_METHOD_ID + 1)]; + _registry[classID] = new AMQMethodBodyInstanceFactory[methodID > DEFAULT_MAX_METHOD_ID ? methodID + 1 : DEFAULT_MAX_METHOD_ID + 1]; } - else if (_registry[classID].length <= methodID) + else if(_registry[classID].length <= methodID) { AMQMethodBodyInstanceFactory[] oldMethods = _registry[classID]; - _registry[classID] = new AMQMethodBodyInstanceFactory[methodID + 1]; - System.arraycopy(oldMethods, 0, _registry[classID], 0, oldMethods.length); + _registry[classID] = new AMQMethodBodyInstanceFactory[methodID+1]; + System.arraycopy(oldMethods,0,_registry[classID],0,oldMethods.length); } _registry[classID][methodID] = instanceFactory; } - public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size) throws AMQFrameDecodingException + + public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size) + throws AMQFrameDecodingException { AMQMethodBodyInstanceFactory bodyFactory; try { bodyFactory = _registry[classID][methodID]; } - catch (NullPointerException e) + catch(NullPointerException e) { - throw new AMQFrameDecodingException("Class " + classID + " unknown in AMQP version " + _protocolMajorVersion - + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID + "."); + throw new AMQFrameDecodingException(_log, + "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion + + " (while trying to decode class " + classID + " method " + methodID + "."); } - catch (IndexOutOfBoundsException e) + catch(IndexOutOfBoundsException e) { - if (classID >= _registry.length) + if(classID >= _registry.length) { - throw new AMQFrameDecodingException("Class " + classID + " unknown in AMQP version " + _protocolMajorVersion - + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID - + "."); + throw new AMQFrameDecodingException(_log, + "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion + + " (while trying to decode class " + classID + " method " + methodID + "."); } else { - throw new AMQFrameDecodingException("Method " + methodID + " unknown in AMQP version " - + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID - + " method " + methodID + "."); + throw new AMQFrameDecodingException(_log, + "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion + + " (while trying to decode class " + classID + " method " + methodID + "."); } } + if (bodyFactory == null) { - throw new AMQFrameDecodingException("Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion - + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + " method " + methodID + "."); + throw new AMQFrameDecodingException(_log, + "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion + + " (while trying to decode class " + classID + " method " + methodID + "."); } - return bodyFactory.newInstance(in, size); + + return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size); + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java index d5da133837..c9e15f18e3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java @@ -18,13 +18,12 @@ package org.apache.qpid.framing.abstraction; -import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.AMQMethodBody; public interface MessagePublishInfoConverter { public MessagePublishInfo convertToInfo(AMQMethodBody body); - public AMQMethodBodyImpl convertToBody(MessagePublishInfo info); + public AMQMethodBody convertToBody(MessagePublishInfo info); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java index b8e460eb05..52e82cdf07 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java @@ -18,12 +18,11 @@ package org.apache.qpid.framing.abstraction; -import org.apache.qpid.framing.AMQBodyImpl; import org.apache.qpid.framing.AMQBody; public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter { - AMQBodyImpl convertToBody(ContentChunk contentBody); + AMQBody convertToBody(ContentChunk contentBody); ContentChunk convertToContentChunk(AMQBody body); void configure(); diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index 17a2ec5d4e..8126ca4bc8 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -136,7 +136,6 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH public void exceptionCaught(final NextFilter nextFilter, final IoSession session, final Throwable cause) throws Exception { - cause.printStackTrace(); nextFilter.exceptionCaught(session,cause); } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java index db76b6fe7e..ab36041cb8 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.AMQMethodBody; /** diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java index 75ae6645e8..85bbe50b11 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java @@ -21,7 +21,6 @@ package org.apache.qpid.protocol; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.AMQMethodBody; /** @@ -33,6 +32,7 @@ public interface AMQMethodListener /** * Invoked when a method frame has been received * @param evt the event that contains the method and channel + * @param protocolSession the protocol session associated with the event * @return true if the handler has processed the method frame, false otherwise. Note * that this does not prohibit the method event being delivered to subsequent listeners * but can be used to determine if nobody has dealt with an incoming method frame. @@ -40,7 +40,7 @@ public interface AMQMethodListener * to all registered listeners using the error() method (see below) allowing them to * perform cleanup if necessary. */ - boolean methodReceived(AMQMethodEvent evt) throws AMQException; + boolean methodReceived(AMQMethodEvent evt) throws Exception; /** * Callback when an error has occurred. Allows listeners to clean up. diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 65f60e7f59..b57c26e496 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.VersionSpecificRegistry; public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, ProtocolVersionAware { - public MethodRegistry getRegistry(); + public VersionSpecificRegistry getRegistry(); } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java index c2c0bf29b7..64db953bc2 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java @@ -20,10 +20,9 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.framing.ProtocolVersion; - public interface ProtocolVersionAware { - public ProtocolVersion getProtocolVersion(); + public byte getProtocolMinorVersion(); + public byte getProtocolMajorVersion(); } -- cgit v1.2.1