summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQBody.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java69
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java55
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java21
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java16
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java141
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java29
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java28
14 files changed, 283 insertions, 110 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
index 36287d2923..ebeea8d2b4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
@@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer;
public abstract class AMQBody
{
- protected abstract byte getFrameType();
+ public abstract byte getFrameType();
/**
* Get the size of the body
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 552c8e599e..e426651588 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -24,25 +24,29 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import java.util.HashMap;
import java.util.Map;
public class AMQDataBlockDecoder
{
- Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class);
+ private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
- private final Map _supportedBodies = new HashMap();
+ private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
- private final static BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
static
{
- _bodiesSupported[AMQMethodBody.TYPE] = AMQMethodBodyFactory.getInstance();
_bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
_bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
_bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
}
+
+ Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class);
+
+
+
public AMQDataBlockDecoder()
{
}
@@ -55,52 +59,57 @@ public class AMQDataBlockDecoder
{
return false;
}
-
- final byte type = in.get();
- final int channel = in.getUnsignedShort();
+ in.skip(1 + 2);
final long bodySize = in.getUnsignedInt();
- // bodySize can be zero
- if (type <= 0 || channel < 0 || bodySize < 0)
- {
- throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel +
- " bodySize = " + bodySize);
- }
+
return (remainingAfterAttributes >= bodySize);
}
- private boolean isSupportedFrameType(byte frameType)
+
+ protected Object createAndPopulateFrame(IoSession session, ByteBuffer in)
+ throws AMQFrameDecodingException, AMQProtocolVersionException
{
- final boolean result = _bodiesSupported[frameType] != null;
+ final byte type = in.get();
+
+ BodyFactory bodyFactory;
+ if(type == AMQMethodBody.TYPE)
+ {
+ bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
+ if(bodyFactory == null)
+ {
+ AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+ bodyFactory = new AMQMethodBodyFactory(protocolSession);
+ session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
+
+ }
- if (!result)
+ }
+ else
{
- _logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType);
+ bodyFactory = _bodiesSupported[type];
}
- return result;
- }
- protected Object createAndPopulateFrame(ByteBuffer in)
- throws AMQFrameDecodingException, AMQProtocolVersionException
- {
- final byte type = in.get();
- BodyFactory bodyFactory = _bodiesSupported[type];
- if (!isSupportedFrameType(type))
+
+
+ if(bodyFactory == null)
{
throw new AMQFrameDecodingException("Unsupported frame type: " + type);
}
+
final int channel = in.getUnsignedShort();
final long bodySize = in.getUnsignedInt();
- /*
- if (bodyFactory == null)
+ // bodySize can be zero
+ if (channel < 0 || bodySize < 0)
{
- throw new AMQFrameDecodingException("Unsupported body type: " + type);
+ throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel +
+ " bodySize = " + bodySize);
}
- */
+
AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
@@ -115,6 +124,6 @@ public class AMQDataBlockDecoder
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
throws Exception
{
- out.write(createAndPopulateFrame(in));
+ out.write(createAndPopulateFrame(session, in));
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
index 3446563d35..478cdeb406 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
@@ -28,17 +28,16 @@ import org.apache.mina.filter.codec.demux.MessageEncoder;
import java.util.HashSet;
import java.util.Set;
+import java.util.Collections;
-public class AMQDataBlockEncoder implements MessageEncoder
+public final class AMQDataBlockEncoder implements MessageEncoder
{
- Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class);
+ private static final Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class);
- private Set _messageTypes;
+ private final Set _messageTypes = Collections.singleton(EncodableAMQDataBlock.class);
public AMQDataBlockEncoder()
{
- _messageTypes = new HashSet();
- _messageTypes.add(EncodableAMQDataBlock.class);
}
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
index 9e98d9792b..11f505fd4b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -24,59 +24,52 @@ import org.apache.mina.common.ByteBuffer;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
- public int channel;
+ private final int _channel;
- public AMQBody bodyFrame;
+ private final AMQBody _bodyFrame;
- public AMQFrame()
- {
- }
- public AMQFrame(int channel, AMQBody bodyFrame)
+
+ public AMQFrame(final int channel, final AMQBody bodyFrame)
{
- this.channel = channel;
- this.bodyFrame = bodyFrame;
+ _channel = channel;
+ _bodyFrame = bodyFrame;
}
- public AMQFrame(ByteBuffer in, int channel, long bodySize, BodyFactory bodyFactory) throws AMQFrameDecodingException
+ public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
{
- this.channel = channel;
- this.bodyFrame = bodyFactory.createBody(in,bodySize);
+ this._channel = channel;
+ this._bodyFrame = bodyFactory.createBody(in,bodySize);
}
public long getSize()
{
- return 1 + 2 + 4 + bodyFrame.getSize() + 1;
+ return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
}
public void writePayload(ByteBuffer buffer)
{
- buffer.put(bodyFrame.getFrameType());
- // TODO: how does channel get populated
- EncodingUtils.writeUnsignedShort(buffer, channel);
- EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize());
- bodyFrame.writePayload(buffer);
+ buffer.put(_bodyFrame.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, _channel);
+ EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
+ _bodyFrame.writePayload(buffer);
buffer.put((byte) 0xCE);
}
- /**
- *
- * @param buffer
- * @param channel unsigned short
- * @param bodySize unsigned integer
- * @param bodyFactory
- * @throws AMQFrameDecodingException
- */
- public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory)
- throws AMQFrameDecodingException, AMQProtocolVersionException
+ public final int getChannel()
+ {
+ return _channel;
+ }
+
+ public final AMQBody getBodyFrame()
{
- this.channel = channel;
- bodyFrame = bodyFactory.createBody(buffer, bodySize);
-
+ return _bodyFrame;
}
+
+
public String toString()
{
- return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame);
+ return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
index cd178a6197..3fa5b150ab 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -57,7 +57,7 @@ public abstract class AMQMethodBody extends AMQBody
protected abstract void writeMethodPayload(ByteBuffer buffer);
- protected byte getFrameType()
+ public byte getFrameType()
{
return TYPE;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
index 95b461b6dc..5293c00379 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
@@ -22,30 +22,21 @@ package org.apache.qpid.framing;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class AMQMethodBodyFactory implements BodyFactory
{
private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class);
+
+ private final AMQVersionAwareProtocolSession _protocolSession;
- private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory();
-
- public static AMQMethodBodyFactory getInstance()
- {
- return _instance;
- }
-
- private AMQMethodBodyFactory()
+ public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession)
{
- _log.debug("Creating method body factory");
+ _protocolSession = protocolSession;
}
public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
- // AMQP version change: MethodBodyDecoderRegistry is obsolete, since all the XML
- // segments generated together are now handled by MainRegistry. The Cluster class,
- // if generated together with amqp.xml is a part of MainRegistry.
- // TODO: Connect with version acquired from ProtocolInitiation class.
- return MainRegistry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(),
- (byte)8, (byte)0, in, bodySize);
+ return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
index c0a12a9aad..cfbc9d1828 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
@@ -6,4 +6,5 @@ import org.apache.mina.common.ByteBuffer;
public abstract interface AMQMethodBodyInstanceFactory
{
public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+ public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index f536d73469..47d349a675 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -260,24 +260,14 @@ public final class AMQShortString implements CharSequence
final AMQShortString otherString = (AMQShortString) o;
- if(otherString.length() != length())
- {
- return false;
- }
if((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
{
return false;
}
- final int size = length();
- for(int i = 0; i < size; i++)
- {
- if(_data.get(i) != otherString._data.get(i))
- {
- return false;
- }
- }
- return true;
+ return _data.equals(otherString._data);
+
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index baeecaa17a..c35fc0a6c4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -49,7 +49,7 @@ public class ContentBody extends AMQBody
this.payload = payload;
}
- protected byte getFrameType()
+ public byte getFrameType()
{
return TYPE;
}
@@ -98,9 +98,7 @@ public class ContentBody extends AMQBody
public static AMQFrame createAMQFrame(int channelId, ContentBody body)
{
- final AMQFrame frame = new AMQFrame();
- frame.channel = channelId;
- frame.bodyFrame = body;
+ final AMQFrame frame = new AMQFrame(channelId, body);
return frame;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 45280bdae3..02631a5f88 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -65,7 +65,7 @@ public class ContentHeaderBody extends AMQBody
this.bodySize = bodySize;
}
- protected byte getFrameType()
+ public byte getFrameType()
{
return TYPE;
}
@@ -113,17 +113,11 @@ public class ContentHeaderBody extends AMQBody
public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,
long bodySize)
{
- final AMQFrame frame = new AMQFrame();
- frame.channel = channelId;
- frame.bodyFrame = new ContentHeaderBody(classId, weight, properties, bodySize);
- return frame;
+ return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize));
}
public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body)
{
- final AMQFrame frame = new AMQFrame();
- frame.channel = channelId;
- frame.bodyFrame = body;
- return frame;
+ return new AMQFrame(channelId, body);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
index ca03f29047..7246c4a1cf 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -41,7 +41,7 @@ public class HeartbeatBody extends AMQBody
}
}
- protected byte getFrameType()
+ public byte getFrameType()
{
return TYPE;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
new file mode 100644
index 0000000000..9bc8232d61
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
+
+public class VersionSpecificRegistry
+{
+ private static final Logger _log = Logger.getLogger(VersionSpecificRegistry.class);
+
+
+ private final byte _protocolMajorVersion;
+ private final byte _protocolMinorVersion;
+
+ private static final int DEFAULT_MAX_CLASS_ID = 200;
+ private static final int DEFAULT_MAX_METHOD_ID = 50;
+
+ private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][];
+
+ public VersionSpecificRegistry(byte major, byte minor)
+ {
+ _protocolMajorVersion = major;
+ _protocolMinorVersion = minor;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+
+ public AMQMethodBodyInstanceFactory getMethodBody(final short classID, final short methodID)
+ {
+ try
+ {
+ return _registry[classID][methodID];
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ return null;
+ }
+ catch (NullPointerException e)
+ {
+ return null;
+ }
+ }
+
+ public void registerMethod(final short classID, final short methodID, final AMQMethodBodyInstanceFactory instanceFactory)
+ {
+ if(_registry.length <= classID)
+ {
+ AMQMethodBodyInstanceFactory[][] oldRegistry = _registry;
+ _registry = new AMQMethodBodyInstanceFactory[classID+1][];
+ System.arraycopy(oldRegistry, 0, _registry, 0, oldRegistry.length);
+ }
+
+ if(_registry[classID] == null)
+ {
+ _registry[classID] = new AMQMethodBodyInstanceFactory[methodID > DEFAULT_MAX_METHOD_ID ? methodID + 1 : DEFAULT_MAX_METHOD_ID + 1];
+ }
+ else if(_registry[classID].length <= methodID)
+ {
+ AMQMethodBodyInstanceFactory[] oldMethods = _registry[classID];
+ _registry[classID] = new AMQMethodBodyInstanceFactory[methodID+1];
+ System.arraycopy(oldMethods,0,_registry[classID],0,oldMethods.length);
+ }
+
+ _registry[classID][methodID] = instanceFactory;
+
+ }
+
+
+ public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size)
+ throws AMQFrameDecodingException
+ {
+ AMQMethodBodyInstanceFactory bodyFactory;
+ try
+ {
+ bodyFactory = _registry[classID][methodID];
+ }
+ catch(NullPointerException e)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+ }
+ catch(IndexOutOfBoundsException e)
+ {
+ if(classID >= _registry.length)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+
+ }
+ else
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+
+ }
+ }
+
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+ }
+
+
+ return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size);
+
+
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
new file mode 100644
index 0000000000..a2d3de2f9e
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.VersionSpecificRegistry;
+
+public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, ProtocolVersionAware
+{
+ public VersionSpecificRegistry getRegistry();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java
new file mode 100644
index 0000000000..64db953bc2
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+public interface ProtocolVersionAware
+{
+ public byte getProtocolMinorVersion();
+
+ public byte getProtocolMajorVersion();
+}