From 3fb9be28593263e12623ce09084a230b59b81f4f Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 22 Mar 2007 13:14:42 +0000 Subject: made a copy git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/java.multi_version@521253 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/AMQChannelException.java | 4 +- .../org/apache/qpid/AMQConnectionException.java | 3 +- .../java/org/apache/qpid/codec/AMQDecoder.java | 16 ++- .../org/apache/qpid/common/ClientProperties.java | 20 ++- .../main/java/org/apache/qpid/framing/AMQBody.java | 21 ++- .../java/org/apache/qpid/framing/AMQBodyImpl.java | 27 ++++ .../apache/qpid/framing/AMQDataBlockDecoder.java | 2 +- .../org/apache/qpid/framing/AMQMethodBody.java | 133 ++--------------- .../apache/qpid/framing/AMQMethodBodyFactory.java | 2 +- .../org/apache/qpid/framing/AMQMethodBodyImpl.java | 102 +++++++++++++ .../qpid/framing/AMQMethodBodyInstanceFactory.java | 3 +- .../org/apache/qpid/framing/AMQMethodFactory.java | 90 ++++++++++++ .../qpid/framing/BasicContentHeaderProperties.java | 125 +++++++++++++--- .../framing/CommonContentHeaderProperties.java | 65 +++++++++ .../java/org/apache/qpid/framing/ContentBody.java | 2 +- .../apache/qpid/framing/ContentBodyFactory.java | 2 +- .../org/apache/qpid/framing/ContentHeaderBody.java | 4 +- .../qpid/framing/ContentHeaderBodyFactory.java | 2 +- .../framing/ContentHeaderPropertiesFactory.java | 4 +- .../java/org/apache/qpid/framing/FieldTable.java | 68 ++++++++- .../org/apache/qpid/framing/HeartbeatBody.java | 6 +- .../apache/qpid/framing/HeartbeatBodyFactory.java | 2 +- .../java/org/apache/qpid/framing/MainRegistry.java | 35 +++++ .../apache/qpid/framing/MethodConverter_8_0.java | 22 ++- .../apache/qpid/framing/ProtocolInitiation.java | 155 +++++++++++--------- .../qpid/framing/VersionSpecificRegistry.java | 2 +- .../abstraction/MessagePublishInfoConverter.java | 3 +- .../ProtocolVersionMethodConverter.java | 3 +- .../qpid/framing/amqp_8_0/AMQMethodBody_8_0.java | 160 +++++++++++++++++++++ .../framing/amqp_8_0/AMQMethodFactory_8_0.java | 117 +++++++++++++++ .../java/org/apache/qpid/pool/PoolingFilter.java | 1 + .../org/apache/qpid/protocol/AMQMethodEvent.java | 1 + .../apache/qpid/protocol/AMQMethodListener.java | 4 +- .../protocol/AMQVersionAwareProtocolSession.java | 4 +- .../apache/qpid/protocol/ProtocolVersionAware.java | 5 +- .../framing/BasicContentHeaderPropertiesTest.java | 20 ++- 36 files changed, 955 insertions(+), 280 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/framing/AMQBodyImpl.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/MainRegistry.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodFactory_8_0.java (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index d8c9b287bd..12120bd10d 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -23,6 +23,8 @@ package org.apache.qpid; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl; +import org.apache.qpid.framing.amqp_8_0.ChannelCloseBodyImpl; import org.apache.qpid.protocol.AMQConstant; public class AMQChannelException extends AMQException @@ -53,6 +55,6 @@ public class AMQChannelException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); + return new AMQFrame(channel, new ChannelCloseBodyImpl(getErrorCode().getCode(), new AMQShortString(getMessage()),0,0)); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index c4f80191a3..094e26802d 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -24,6 +24,7 @@ package org.apache.qpid; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionCloseBodyImpl; import org.apache.qpid.protocol.AMQConstant; public class AMQConnectionException extends AMQException @@ -57,7 +58,7 @@ public class AMQConnectionException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); + return new AMQFrame(channel, new ConnectionCloseBodyImpl(getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId)); } diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index bb981a242f..6e0a5c3786 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -48,13 +48,21 @@ public class AMQDecoder extends CumulativeProtocolDecoder protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - if (_expectProtocolInitiation) + try { - return doDecodePI(session, in, out); + if (_expectProtocolInitiation) + { + return doDecodePI(session, in, out); + } + else + { + return doDecodeDataBlock(session, in, out); + } } - else + catch (Exception e) { - return doDecodeDataBlock(session, in, out); + e.printStackTrace(); + throw e; } } diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java index 07371b5182..1f1911aa35 100644 --- a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java @@ -20,10 +20,28 @@ */ package org.apache.qpid.common; +import org.apache.qpid.framing.AMQShortString; + public enum ClientProperties { + + instance, product, version, - platform + platform; + + + private final AMQShortString _name; + + private ClientProperties() + { + _name = new AMQShortString(toString()); + } + + public AMQShortString getName() + { + return _name; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index ebeea8d2b4..4dd5ab7a9a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,18 +22,15 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public abstract class AMQBody +public interface AMQBody { - public abstract byte getFrameType(); - - /** + byte getFrameType(); + + /** * Get the size of the body * @return unsigned short */ - protected abstract int getSize(); - - protected abstract void writePayload(ByteBuffer buffer); - - protected abstract void populateFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException; + int getSize(); + + void writePayload(ByteBuffer buffer); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBodyImpl.java new file mode 100644 index 0000000000..6b2d1feae5 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBodyImpl.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + + +public abstract class AMQBodyImpl implements AMQBody +{ + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 43f888c029..2ecd4d4650 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -72,7 +72,7 @@ public class AMQDataBlockDecoder final byte type = in.get(); BodyFactory bodyFactory; - if(type == AMQMethodBody.TYPE) + if(type == AMQMethodBodyImpl.TYPE) { bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); if(bodyFactory == null) diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 111d9a8f20..d61c1c3d36 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -1,132 +1,25 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQConstant; -public abstract class AMQMethodBody extends AMQBody +/** + * Created by IntelliJ IDEA. + * User: U146758 + * Date: 08-Mar-2007 + * Time: 11:30:28 + * To change this template use File | Settings | File Templates. + */ +public interface AMQMethodBody extends AMQBody { - public static final byte TYPE = 1; - - /** AMQP version */ - protected byte major; - protected byte minor; - - public byte getMajor() - { - return major; - } - - public byte getMinor() - { - return minor; - } - - public AMQMethodBody(byte major, byte minor) - { - this.major = major; - this.minor = minor; - } - - /** unsigned short */ - protected abstract int getBodySize(); - - /** @return unsigned short */ - protected abstract int getClazz(); - - /** @return unsigned short */ - protected abstract int getMethod(); - - protected abstract void writeMethodPayload(ByteBuffer buffer); - - public byte getFrameType() - { - return TYPE; - } - - protected int getSize() - { - return 2 + 2 + getBodySize(); - } - - protected void writePayload(ByteBuffer buffer) - { - EncodingUtils.writeUnsignedShort(buffer, getClazz()); - EncodingUtils.writeUnsignedShort(buffer, getMethod()); - writeMethodPayload(buffer); - } - - protected abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; - - protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException - { - populateMethodBodyFromBuffer(buffer); - } - - public String toString() - { - StringBuffer buf = new StringBuffer(getClass().toString()); - buf.append(" Class: ").append(getClazz()); - buf.append(" Method: ").append(getMethod()); - return buf.toString(); - } - - /** - * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and - * method ids of the body it resulted from). - */ - - /** - * Convenience Method to create a channel not found exception - * - * @param channelId The channel id that is not found - * - * @return new AMQChannelException - */ - public AMQChannelException getChannelNotFoundException(int channelId) - { - return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId); - } - - public AMQChannelException getChannelException(AMQConstant code, String message) - { - return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor); - } + AMQChannelException getChannelNotFoundException(int channelId); - public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) - { - return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause); - } + AMQChannelException getChannelException(AMQConstant code, String message); - public AMQConnectionException getConnectionException(AMQConstant code, String message) - { - return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor); - } + AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause); - public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) - { - return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause); - } + AMQConnectionException getConnectionException(AMQConstant code, String message); + AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index 5293c00379..f5cd971c0e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -37,6 +37,6 @@ public class AMQMethodBodyFactory implements BodyFactory public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize); + return _protocolSession.getRegistry().convertToBody(in, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java new file mode 100644 index 0000000000..1951970a72 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.protocol.AMQConstant; + +public abstract class AMQMethodBodyImpl extends AMQBodyImpl implements AMQMethodBody +{ + public static final byte TYPE = 1; + + + public abstract byte getMajor(); + public abstract byte getMinor(); + + /** unsigned short */ + protected abstract int getBodySize(); + + /** @return unsigned short */ + protected abstract int getClazz(); + + /** @return unsigned short */ + protected abstract int getMethod(); + + protected abstract void writeMethodPayload(ByteBuffer buffer); + + public byte getFrameType() + { + return TYPE; + } + + public int getSize() + { + return 2 + 2 + getBodySize(); + } + + public void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, getClazz()); + EncodingUtils.writeUnsignedShort(buffer, getMethod()); + writeMethodPayload(buffer); + } + + + /** + * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and + * method ids of the body it resulted from). + */ + + /** + * Convenience Method to create a channel not found exception + * + * @param channelId The channel id that is not found + * + * @return new AMQChannelException + */ + public AMQChannelException getChannelNotFoundException(int channelId) + { + return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId); + } + + public AMQChannelException getChannelException(AMQConstant code, String message) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor()); + } + + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause); + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor()); + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java index cfbc9d1828..9a7868f3cd 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -5,6 +5,5 @@ import org.apache.mina.common.ByteBuffer; public abstract interface AMQMethodBodyInstanceFactory { - public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException; - public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException; + public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java new file mode 100644 index 0000000000..4ffc9e0066 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + + +public interface AMQMethodFactory +{ + + // Connection Methods + + ConnectionCloseBody createConnectionClose(); + + // Access Methods + + AccessRequestBody createAccessRequest(boolean active, boolean exclusive, boolean passive, boolean read, AMQShortString realm, boolean write); + + + // Tx Methods + + TxSelectBody createTxSelect(); + + TxCommitBody createTxCommit(); + + TxRollbackBody createTxRollback(); + + // Channel Methods + + ChannelOpenBody createChannelOpen(); + + ChannelCloseBody createChannelClose(int replyCode, AMQShortString replyText); + + ChannelFlowBody createChannelFlow(boolean active); + + + // Exchange Methods + + + ExchangeBoundBody createExchangeBound(AMQShortString exchangeName, + AMQShortString queueName, + AMQShortString routingKey); + + ExchangeDeclareBody createExchangeDeclare(AMQShortString name, AMQShortString type, int ticket); + + + // Queue Methods + + QueueDeclareBody createQueueDeclare(AMQShortString name, FieldTable arguments, boolean autoDelete, boolean durable, boolean exclusive, boolean passive, int ticket); + + QueueBindBody createQueueBind(AMQShortString queueName, AMQShortString exchangeName, AMQShortString routingKey, FieldTable arguments, int ticket); + + QueueDeleteBody createQueueDelete(AMQShortString queueName, boolean ifEmpty, boolean ifUnused, int ticket); + + + // Message Methods + + // In different versions of the protocol we change the class used for message transfer + // abstract this out so the appropriate methods are created + AMQMethodBody createRecover(boolean requeue); + + AMQMethodBody createConsumer(AMQShortString tag, AMQShortString queueName, FieldTable arguments, boolean noAck, boolean exclusive, boolean noLocal, int ticket); + + AMQMethodBody createConsumerCancel(AMQShortString consumerTag); + + AMQMethodBody createAcknowledge(long deliveryTag, boolean multiple); + + AMQMethodBody createRejectBody(long deliveryTag, boolean requeue); + + AMQMethodBody createMessageQos(int prefetchCount, int prefetchSize); + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index 1045b02868..8b784fa3f7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; -public class BasicContentHeaderProperties implements ContentHeaderProperties +public class BasicContentHeaderProperties implements CommonContentHeaderProperties { private static final Logger _logger = Logger.getLogger(BasicContentHeaderProperties.class); @@ -421,14 +421,14 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties } } - public AMQShortString getContentTypeShortString() + public AMQShortString getContentType() { decodeContentTypeIfNecessary(); return _contentType; } - public String getContentType() + public String getContentTypeAsString() { decodeContentTypeIfNecessary(); return _contentType == null ? null : _contentType.toString(); @@ -444,15 +444,19 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties public void setContentType(String contentType) { - clearEncodedForm(); - _propertyFlags |= (1 << 15); - _contentType = contentType == null ? null : new AMQShortString(contentType); + setContentType(contentType == null ? null : new AMQShortString(contentType)); + } + + public String getEncodingAsString() + { + + return getEncoding() == null ? null : getEncoding().toString(); } - public String getEncoding() + public AMQShortString getEncoding() { decodeIfNecessary(); - return _encoding == null ? null : _encoding.toString(); + return _encoding; } public void setEncoding(String encoding) @@ -462,6 +466,14 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties _encoding = encoding == null ? null : new AMQShortString(encoding); } + public void setEncoding(AMQShortString encoding) + { + clearEncodedForm(); + _propertyFlags |= (1 << 14); + _encoding = encoding; + } + + public FieldTable getHeaders() { decodeHeadersIfNecessary(); @@ -508,26 +520,37 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties _priority = priority; } - public String getCorrelationId() + public AMQShortString getCorrelationId() + { + decodeIfNecessary(); + return _correlationId; + } + + public String getCorrelationIdAsString() { decodeIfNecessary(); return _correlationId == null ? null : _correlationId.toString(); } public void setCorrelationId(String correlationId) + { + setCorrelationId(correlationId == null ? null : new AMQShortString(correlationId)); + } + + public void setCorrelationId(AMQShortString correlationId) { clearEncodedForm(); _propertyFlags |= (1 << 10); - _correlationId = correlationId == null ? null : new AMQShortString(correlationId); + _correlationId = correlationId; } - public String getReplyTo() + public String getReplyToAsString() { decodeIfNecessary(); return _replyTo == null ? null : _replyTo.toString(); } - public AMQShortString getReplyToAsShortString() + public AMQShortString getReplyTo() { decodeIfNecessary(); return _replyTo; @@ -561,7 +584,13 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties } - public String getMessageId() + public AMQShortString getMessageId() + { + decodeIfNecessary(); + return _messageId; + } + + public String getMessageIdAsString() { decodeIfNecessary(); return _messageId == null ? null : _messageId.toString(); @@ -574,6 +603,14 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties _messageId = messageId == null ? null : new AMQShortString(messageId); } + public void setMessageId(AMQShortString messageId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 7); + _messageId = messageId; + } + + public long getTimestamp() { decodeIfNecessary(); @@ -587,56 +624,102 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties _timestamp = timestamp; } - public String getType() + public String getTypeAsString() { decodeIfNecessary(); return _type == null ? null : _type.toString(); } + + public AMQShortString getType() + { + decodeIfNecessary(); + return _type; + } + + public void setType(String type) + { + setType(type == null ? null : new AMQShortString(type)); + } + + public void setType(AMQShortString type) { clearEncodedForm(); _propertyFlags |= (1 << 5); - _type = type == null ? null : new AMQShortString(type); + _type = type; } - public String getUserId() + public String getUserIdAsString() { decodeIfNecessary(); return _userId == null ? null : _userId.toString(); } + public AMQShortString getUserId() + { + decodeIfNecessary(); + return _userId; + } + public void setUserId(String userId) + { + setUserId(userId == null ? null : new AMQShortString(userId)); + } + + public void setUserId(AMQShortString userId) { clearEncodedForm(); _propertyFlags |= (1 << 4); - _userId = userId == null ? null : new AMQShortString(userId); + _userId = userId; } - public String getAppId() + public String getAppIdAsString() { decodeIfNecessary(); return _appId == null ? null : _appId.toString(); } + public AMQShortString getAppId() + { + decodeIfNecessary(); + return _appId; + } + public void setAppId(String appId) + { + setAppId(appId == null ? null : new AMQShortString(appId)); + } + + public void setAppId(AMQShortString appId) { clearEncodedForm(); _propertyFlags |= (1 << 3); - _appId = appId == null ? null : new AMQShortString(appId); + _appId = appId; } - public String getClusterId() + public String getClusterIdAsString() { decodeIfNecessary(); return _clusterId == null ? null : _clusterId.toString(); } + public AMQShortString getClusterId() + { + decodeIfNecessary(); + return _clusterId; + } + public void setClusterId(String clusterId) + { + setClusterId(clusterId == null ? null : new AMQShortString(clusterId)); + } + + public void setClusterId(AMQShortString clusterId) { clearEncodedForm(); _propertyFlags |= (1 << 2); - _clusterId = clusterId == null ? null : new AMQShortString(clusterId); + _clusterId = clusterId; } public String toString() diff --git a/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java new file mode 100644 index 0000000000..1641cbf4e8 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java @@ -0,0 +1,65 @@ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.log4j.Logger; + +public interface CommonContentHeaderProperties extends ContentHeaderProperties +{ + + AMQShortString getContentType(); + + void setContentType(AMQShortString contentType); + + FieldTable getHeaders(); + + void setHeaders(FieldTable headers); + + byte getDeliveryMode(); + + void setDeliveryMode(byte deliveryMode); + + byte getPriority(); + + void setPriority(byte priority); + + AMQShortString getCorrelationId(); + + void setCorrelationId(AMQShortString correlationId); + + AMQShortString getReplyTo(); + + void setReplyTo(AMQShortString replyTo); + + long getExpiration(); + + void setExpiration(long expiration); + + AMQShortString getMessageId(); + + void setMessageId(AMQShortString messageId); + + long getTimestamp(); + + void setTimestamp(long timestamp); + + AMQShortString getType(); + + void setType(AMQShortString type); + + AMQShortString getUserId(); + + void setUserId(AMQShortString userId); + + AMQShortString getAppId(); + + void setAppId(AMQShortString appId); + + AMQShortString getClusterId(); + + void setClusterId(AMQShortString clusterId); + + AMQShortString getEncoding(); + + void setEncoding(AMQShortString encoding); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index be38695384..a1aaab06c6 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class ContentBody extends AMQBody +public class ContentBody extends AMQBodyImpl { public static final byte TYPE = 3; diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java index 5636229d53..7b6a92e691 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -39,7 +39,7 @@ public class ContentBodyFactory implements BodyFactory _log.debug("Creating content body factory"); } - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBodyImpl createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { return new ContentBody(in, bodySize); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 02631a5f88..c71f47bad2 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class ContentHeaderBody extends AMQBody +public class ContentHeaderBody extends AMQBodyImpl { public static final byte TYPE = 2; @@ -110,7 +110,7 @@ public class ContentHeaderBody extends AMQBody properties.writePropertyListPayload(buffer); } - public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, + public static AMQFrame createAMQFrame(int channelId, int classId, int weight, CommonContentHeaderProperties properties, long bodySize) { return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize)); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java index 818fc9cf0c..9570ec800d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -39,7 +39,7 @@ public class ContentHeaderBodyFactory implements BodyFactory _log.debug("Creating content header body factory"); } - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBodyImpl createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { // all content headers are the same - it is only the properties that differ. // the content header body further delegates construction of properties diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index 7dac018872..a8a8097fd2 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; + import org.apache.mina.common.ByteBuffer; public class ContentHeaderPropertiesFactory @@ -43,7 +45,7 @@ public class ContentHeaderPropertiesFactory // AMQP version change: "Hardwired" version to major=8, minor=0 // TODO: Change so that the actual version is obtained from // the ProtocolInitiation object for this session. - if (classId == BasicConsumeBody.getClazz((byte)8, (byte)0)) + if (classId == BasicConsumeBodyImpl.CLASS_ID) { properties = new BasicContentHeaderProperties(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 246e5ebc90..a7544c5747 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -41,10 +41,14 @@ public class FieldTable private LinkedHashMap _properties; private long _encodedSize; private static final int INITIAL_HASHMAP_CAPACITY = 16; + private static final int INITIAL_ENCODED_FORM_SIZE = 256; public FieldTable() { super(); +// _encodedForm = ByteBuffer.allocate(INITIAL_ENCODED_FORM_SIZE); +// _encodedForm.setAutoExpand(true); +// _encodedForm.limit(0); } /** @@ -109,11 +113,28 @@ public class FieldTable private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val) { initMapIfNecessary(); - _encodedForm = null; - if(val == null) + if(_properties.containsKey(key)) + { + _encodedForm = null; + + if(val == null) + { + return removeKey(key); + } + } + else if(_encodedForm != null && val != null) + { + EncodingUtils.writeShortStringBytes(_encodedForm, key); + val.writeToBuffer(_encodedForm); + + } + else if (val == null) { - return removeKey(key); + return null; } + + + AMQTypedValue oldVal = _properties.put(key,val); if(oldVal != null) { @@ -134,7 +155,7 @@ public class FieldTable { if(_properties == null) { - if(_encodedForm == null) + if(_encodedForm == null || _encodedSize == 0) { _properties = new LinkedHashMap(); } @@ -655,6 +676,7 @@ public class FieldTable if (trace) { _logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "..."); + _logger.trace(_properties); } EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize()); @@ -701,6 +723,7 @@ public class FieldTable public void addAll(FieldTable fieldTable) { initMapIfNecessary(); + _encodedForm = null; _properties.putAll(fieldTable._properties); recalculateEncodedSize(); } @@ -836,7 +859,13 @@ public class FieldTable if(_encodedForm != null) { - buffer.put(_encodedForm); + + if(_encodedForm.position() != 0) + { + _encodedForm.flip(); + } +// _encodedForm.limit((int)getEncodedSize()); + buffer.put(_encodedForm); } else if(_properties != null) { @@ -924,4 +953,33 @@ public class FieldTable } } + public int hashCode() + { + initMapIfNecessary(); + return _properties.hashCode(); + } + + + public boolean equals(Object o) + { + if(o == this) + { + return true; + } + if(o == null) + { + return false; + } + if(!(o instanceof FieldTable)) + { + return false; + } + + initMapIfNecessary(); + + FieldTable f = (FieldTable) o; + f.initMapIfNecessary(); + + return _properties.equals(f._properties); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 7246c4a1cf..17b2a2f9c2 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -22,7 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -public class HeartbeatBody extends AMQBody +public class HeartbeatBody extends AMQBodyImpl { public static final byte TYPE = 8; public static AMQFrame FRAME = new HeartbeatBody().toFrame(); @@ -46,12 +46,12 @@ public class HeartbeatBody extends AMQBody return TYPE; } - protected int getSize() + public int getSize() { return 0;//heartbeats we generate have no payload } - protected void writePayload(ByteBuffer buffer) + public void writePayload(ByteBuffer buffer) { } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java index c7ada708dc..2249f1d1cf 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer; public class HeartbeatBodyFactory implements BodyFactory { - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBodyImpl createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { return new HeartbeatBody(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MainRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/MainRegistry.java new file mode 100644 index 0000000000..d75589f914 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/MainRegistry.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public class MainRegistry +{ + + public static VersionSpecificRegistry getVersionSpecificRegistry(byte versionMajor, byte versionMinor) + { + return null; //To change body of created methods use File | Settings | File Templates. + } + + public static VersionSpecificRegistry getVersionSpecificRegistry(ProtocolVersion pv) + { + return null; //To change body of created methods use File | Settings | File Templates. + } +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java index dd93cc97fa..f253372a65 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java @@ -4,6 +4,7 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.AbstractMethodConverter; +import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl; import org.apache.mina.common.ByteBuffer; @@ -19,7 +20,7 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot } - public AMQBody convertToBody(ContentChunk contentChunk) + public AMQBodyImpl convertToBody(ContentChunk contentChunk) { return new ContentBody(contentChunk.getData()); } @@ -52,8 +53,8 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot public void configure() { - _basicPublishClassId = BasicPublishBody.getClazz(getProtocolMajorVersion(),getProtocolMinorVersion()); - _basicPublishMethodId = BasicPublishBody.getMethod(getProtocolMajorVersion(),getProtocolMinorVersion()); + _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID; + _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; } @@ -87,18 +88,15 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot } - public AMQMethodBody convertToBody(MessagePublishInfo info) + public AMQMethodBodyImpl convertToBody(MessagePublishInfo info) { - return new BasicPublishBody(getProtocolMajorVersion(), - getProtocolMinorVersion(), - _basicPublishClassId, - _basicPublishMethodId, - info.getExchange(), - info.isImmediate(), + return new BasicPublishBodyImpl(0, // ticket + info.getExchange(), + info.getRoutingKey(), info.isMandatory(), - info.getRoutingKey(), - 0) ; // ticket + info.isImmediate() + ) ; } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 697a0f4249..8b40fe72eb 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -25,25 +25,50 @@ import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.qpid.AMQException; +import java.io.UnsupportedEncodingException; + public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { - public char[] header = new char[]{'A','M','Q','P'}; + // TODO: generate these constants automatically from the xml protocol spec file + public static final byte[] AMQP_HEADER = new byte[]{(byte)'A',(byte)'M',(byte)'Q',(byte)'P'}; - private static byte CURRENT_PROTOCOL_CLASS = 1; - private static final int CURRENT_PROTOCOL_INSTANCE = 1; + private static final byte CURRENT_PROTOCOL_CLASS = 1; + private static final byte TCP_PROTOCOL_INSTANCE = 1; + + public final byte[] _protocolHeader; + public final byte _protocolClass; + public final byte _protocolInstance; + public final byte _protocolMajor; + public final byte _protocolMinor; - public byte protocolClass = CURRENT_PROTOCOL_CLASS; - public byte protocolInstance = CURRENT_PROTOCOL_INSTANCE; - public byte protocolMajor; - public byte protocolMinor; // public ProtocolInitiation() {} - public ProtocolInitiation(byte major, byte minor) + public ProtocolInitiation(byte[] protocolHeader, byte protocolClass, byte protocolInstance, byte protocolMajor, byte protocolMinor) + { + _protocolHeader = protocolHeader; + _protocolClass = protocolClass; + _protocolInstance = protocolInstance; + _protocolMajor = protocolMajor; + _protocolMinor = protocolMinor; + } + + public ProtocolInitiation(ProtocolVersion pv) { - protocolMajor = major; - protocolMinor = minor; + this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion()); + } + + + public ProtocolInitiation(ByteBuffer in) + { + _protocolHeader = new byte[4]; + in.get(_protocolHeader); + + _protocolClass = in.get(); + _protocolInstance = in.get(); + _protocolMajor = in.get(); + _protocolMinor = in.get(); } public long getSize() @@ -53,19 +78,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData public void writePayload(ByteBuffer buffer) { - for (int i = 0; i < header.length; i++) - { - buffer.put((byte) header[i]); - } - buffer.put(protocolClass); - buffer.put(protocolInstance); - buffer.put(protocolMajor); - buffer.put(protocolMinor); - } - public void populateFromBuffer(ByteBuffer buffer) throws AMQException - { - throw new AMQException("Method not implemented"); + buffer.put(_protocolHeader); + buffer.put(_protocolClass); + buffer.put(_protocolInstance); + buffer.put(_protocolMajor); + buffer.put(_protocolMinor); } public boolean equals(Object o) @@ -76,36 +94,36 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData } ProtocolInitiation pi = (ProtocolInitiation) o; - if (pi.header == null) + if (pi._protocolHeader == null) { return false; } - if (header.length != pi.header.length) + if (_protocolHeader.length != pi._protocolHeader.length) { return false; } - for (int i = 0; i < header.length; i++) + for (int i = 0; i < _protocolHeader.length; i++) { - if (header[i] != pi.header[i]) + if (_protocolHeader[i] != pi._protocolHeader[i]) { return false; } } - return (protocolClass == pi.protocolClass && - protocolInstance == pi.protocolInstance && - protocolMajor == pi.protocolMajor && - protocolMinor == pi.protocolMinor); + return (_protocolClass == pi._protocolClass && + _protocolInstance == pi._protocolInstance && + _protocolMajor == pi._protocolMajor && + _protocolMinor == pi._protocolMinor); } public static class Decoder //implements MessageDecoder { /** * - * @param session - * @param in + * @param session the session + * @param in input buffer * @return true if we have enough data to decode the PI frame fully, false if more * data is required */ @@ -115,63 +133,62 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData } public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) - throws Exception { - byte[] theHeader = new byte[4]; - in.get(theHeader); - ProtocolInitiation pi = new ProtocolInitiation((byte)0, (byte)0); - pi.header = new char[]{(char) theHeader[0],(char) theHeader[CURRENT_PROTOCOL_INSTANCE],(char) theHeader[2], (char) theHeader[3]}; - String stringHeader = new String(pi.header); - if (!"AMQP".equals(stringHeader)) - { - throw new AMQProtocolHeaderException("Invalid protocol header - read " + stringHeader); - } - pi.protocolClass = in.get(); - pi.protocolInstance = in.get(); - pi.protocolMajor = in.get(); - pi.protocolMinor = in.get(); + ProtocolInitiation pi = new ProtocolInitiation(in); out.write(pi); } } - public void checkVersion(ProtocolVersionList pvl) throws AMQException + public void checkVersion() throws AMQException { - if (protocolClass != CURRENT_PROTOCOL_CLASS) - { - throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " + - protocolClass); - } - if (protocolInstance != CURRENT_PROTOCOL_INSTANCE) + + if(_protocolHeader.length != 4) { - throw new AMQProtocolInstanceException("Protocol instance " + CURRENT_PROTOCOL_INSTANCE + " was expected; received " + - protocolInstance); + throw new AMQProtocolHeaderException("Protocol header should have exactly four octets"); } - - /* Look through list of available protocol versions */ - boolean found = false; - for (int i=0; i boolean methodReceived(AMQMethodEvent evt) throws Exception; + boolean methodReceived(AMQMethodEvent evt) throws AMQException; /** * Callback when an error has occurred. Allows listeners to clean up. diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index b57c26e496..65f60e7f59 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.framing.VersionSpecificRegistry; +import org.apache.qpid.framing.MethodRegistry; public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, ProtocolVersionAware { - public VersionSpecificRegistry getRegistry(); + public MethodRegistry getRegistry(); } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java index 64db953bc2..c2c0bf29b7 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java @@ -20,9 +20,10 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.framing.ProtocolVersion; + public interface ProtocolVersionAware { - public byte getProtocolMinorVersion(); + public ProtocolVersion getProtocolVersion(); - public byte getProtocolMajorVersion(); } diff --git a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java index 0f706ac553..4fd1f60d69 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java @@ -22,8 +22,6 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -import java.util.HashMap; - import junit.framework.TestCase; @@ -94,14 +92,14 @@ public class BasicContentHeaderPropertiesTest extends TestCase { String contentType = "contentType"; _testProperties.setContentType(contentType); - assertEquals(contentType, _testProperties.getContentType()); + assertEquals(contentType, _testProperties.getContentTypeAsString()); } public void testSetGetEncoding() { String encoding = "encoding"; _testProperties.setEncoding(encoding); - assertEquals(encoding, _testProperties.getEncoding()); + assertEquals(encoding, _testProperties.getEncodingAsString()); } public void testSetGetHeaders() @@ -128,14 +126,14 @@ public class BasicContentHeaderPropertiesTest extends TestCase { String correlationId = "correlationId"; _testProperties.setCorrelationId(correlationId); - assertEquals(correlationId, _testProperties.getCorrelationId()); + assertEquals(correlationId, _testProperties.getCorrelationIdAsString()); } public void testSetGetReplyTo() { String replyTo = "replyTo"; _testProperties.setReplyTo(replyTo); - assertEquals(replyTo, _testProperties.getReplyTo()); + assertEquals(replyTo, _testProperties.getReplyToAsString()); } public void testSetGetExpiration() @@ -149,7 +147,7 @@ public class BasicContentHeaderPropertiesTest extends TestCase { String messageId = "messageId"; _testProperties.setMessageId(messageId); - assertEquals(messageId, _testProperties.getMessageId()); + assertEquals(messageId, _testProperties.getMessageIdAsString()); } public void testSetGetTimestamp() @@ -163,28 +161,28 @@ public class BasicContentHeaderPropertiesTest extends TestCase { String type = "type"; _testProperties.setType(type); - assertEquals(type, _testProperties.getType()); + assertEquals(type, _testProperties.getTypeAsString()); } public void testSetGetUserId() { String userId = "userId"; _testProperties.setUserId(userId); - assertEquals(userId, _testProperties.getUserId()); + assertEquals(userId, _testProperties.getUserIdAsString()); } public void testSetGetAppId() { String appId = "appId"; _testProperties.setAppId(appId); - assertEquals(appId, _testProperties.getAppId()); + assertEquals(appId, _testProperties.getAppIdAsString()); } public void testSetGetClusterId() { String clusterId = "clusterId"; _testProperties.setClusterId(clusterId); - assertEquals(clusterId, _testProperties.getClusterId()); + assertEquals(clusterId, _testProperties.getClusterIdAsString()); } } -- cgit v1.2.1