summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-02-21 10:09:03 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-02-21 10:09:03 +0000
commit3047c0ec2d581f4b51c77fec84fbf0bec8599573 (patch)
tree7ba966b95105a3576cf2fc9150b6b9dd322f4b14 /java/common
parent3aed99f65d795c234faa9b584182cf3ea8c67b4a (diff)
downloadqpid-python-3047c0ec2d581f4b51c77fec84fbf0bec8599573.tar.gz
QPID-790 : Performance Improvements
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629731 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java528
-rw-r--r--java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java250
-rw-r--r--java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java440
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java97
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java58
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java356
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java31
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/Job.java22
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java107
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java12
18 files changed, 1816 insertions, 151 deletions
diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
new file mode 100644
index 0000000000..bed80d5954
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
@@ -0,0 +1,528 @@
+package org.apache.mina.common;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.nio.*;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
+{
+
+
+ private static final int MINIMUM_CAPACITY = 1;
+
+ public FixedSizeByteBufferAllocator ()
+ {
+ }
+
+ public ByteBuffer allocate( int capacity, boolean direct )
+ {
+ java.nio.ByteBuffer nioBuffer;
+ if( direct )
+ {
+ nioBuffer = java.nio.ByteBuffer.allocateDirect( capacity );
+ }
+ else
+ {
+ nioBuffer = java.nio.ByteBuffer.allocate( capacity );
+ }
+ return new FixedSizeByteBuffer( nioBuffer );
+ }
+
+ public ByteBuffer wrap( java.nio.ByteBuffer nioBuffer )
+ {
+ return new FixedSizeByteBuffer( nioBuffer );
+ }
+
+ public void dispose()
+ {
+ }
+
+
+
+ private static final class FixedSizeByteBuffer extends ByteBuffer
+ {
+ private java.nio.ByteBuffer buf;
+ private int refCount = 1;
+ private int mark = -1;
+
+
+ protected FixedSizeByteBuffer( java.nio.ByteBuffer buf )
+ {
+ this.buf = buf;
+ buf.order( ByteOrder.BIG_ENDIAN );
+ refCount = 1;
+ }
+
+ public synchronized void acquire()
+ {
+ if( refCount <= 0 )
+ {
+ throw new IllegalStateException( "Already released buffer." );
+ }
+
+ refCount ++;
+ }
+
+ public void release()
+ {
+ synchronized( this )
+ {
+ if( refCount <= 0 )
+ {
+ refCount = 0;
+ throw new IllegalStateException(
+ "Already released buffer. You released the buffer too many times." );
+ }
+
+ refCount --;
+ if( refCount > 0)
+ {
+ return;
+ }
+ }
+ }
+
+ public java.nio.ByteBuffer buf()
+ {
+ return buf;
+ }
+
+ public boolean isPooled()
+ {
+ return false;
+ }
+
+ public void setPooled( boolean pooled )
+ {
+ }
+
+ public ByteBuffer duplicate() {
+ return new FixedSizeByteBuffer( this.buf.duplicate() );
+ }
+
+ public ByteBuffer slice() {
+ return new FixedSizeByteBuffer( this.buf.slice() );
+ }
+
+ public ByteBuffer asReadOnlyBuffer() {
+ return new FixedSizeByteBuffer( this.buf.asReadOnlyBuffer() );
+ }
+
+ public byte[] array()
+ {
+ return buf.array();
+ }
+
+ public int arrayOffset()
+ {
+ return buf.arrayOffset();
+ }
+
+ public boolean isDirect()
+ {
+ return buf.isDirect();
+ }
+
+ public boolean isReadOnly()
+ {
+ return buf.isReadOnly();
+ }
+
+ public int capacity()
+ {
+ return buf.capacity();
+ }
+
+ public ByteBuffer capacity( int newCapacity )
+ {
+ if( newCapacity > capacity() )
+ {
+ // Allocate a new buffer and transfer all settings to it.
+ int pos = position();
+ int limit = limit();
+ ByteOrder bo = order();
+
+ capacity0( newCapacity );
+ buf.limit( limit );
+ if( mark >= 0 )
+ {
+ buf.position( mark );
+ buf.mark();
+ }
+ buf.position( pos );
+ buf.order( bo );
+ }
+
+ return this;
+ }
+
+ protected void capacity0( int requestedCapacity )
+ {
+ int newCapacity = MINIMUM_CAPACITY;
+ while( newCapacity < requestedCapacity )
+ {
+ newCapacity <<= 1;
+ }
+
+ java.nio.ByteBuffer oldBuf = this.buf;
+ java.nio.ByteBuffer newBuf;
+ if( isDirect() )
+ {
+ newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity );
+ }
+ else
+ {
+ newBuf = java.nio.ByteBuffer.allocate( newCapacity );
+ }
+
+ newBuf.clear();
+ oldBuf.clear();
+ newBuf.put( oldBuf );
+ this.buf = newBuf;
+ }
+
+
+
+ public boolean isAutoExpand()
+ {
+ return false;
+ }
+
+ public ByteBuffer setAutoExpand( boolean autoExpand )
+ {
+ if(autoExpand) throw new IllegalArgumentException();
+ else return this;
+ }
+
+ public ByteBuffer expand( int pos, int expectedRemaining )
+ {
+ int end = pos + expectedRemaining;
+ if( end > capacity() )
+ {
+ // The buffer needs expansion.
+ capacity( end );
+ }
+
+ if( end > limit() )
+ {
+ // We call limit() directly to prevent StackOverflowError
+ buf.limit( end );
+ }
+ return this;
+ }
+
+ public int position()
+ {
+ return buf.position();
+ }
+
+ public ByteBuffer position( int newPosition )
+ {
+
+ buf.position( newPosition );
+ if( mark > newPosition )
+ {
+ mark = -1;
+ }
+ return this;
+ }
+
+ public int limit()
+ {
+ return buf.limit();
+ }
+
+ public ByteBuffer limit( int newLimit )
+ {
+ buf.limit( newLimit );
+ if( mark > newLimit )
+ {
+ mark = -1;
+ }
+ return this;
+ }
+
+ public ByteBuffer mark()
+ {
+ buf.mark();
+ mark = position();
+ return this;
+ }
+
+ public int markValue()
+ {
+ return mark;
+ }
+
+ public ByteBuffer reset()
+ {
+ buf.reset();
+ return this;
+ }
+
+ public ByteBuffer clear()
+ {
+ buf.clear();
+ mark = -1;
+ return this;
+ }
+
+ public ByteBuffer flip()
+ {
+ buf.flip();
+ mark = -1;
+ return this;
+ }
+
+ public ByteBuffer rewind()
+ {
+ buf.rewind();
+ mark = -1;
+ return this;
+ }
+
+ public byte get()
+ {
+ return buf.get();
+ }
+
+ public ByteBuffer put( byte b )
+ {
+ buf.put( b );
+ return this;
+ }
+
+ public byte get( int index )
+ {
+ return buf.get( index );
+ }
+
+ public ByteBuffer put( int index, byte b )
+ {
+ buf.put( index, b );
+ return this;
+ }
+
+ public ByteBuffer get( byte[] dst, int offset, int length )
+ {
+ buf.get( dst, offset, length );
+ return this;
+ }
+
+ public ByteBuffer put( java.nio.ByteBuffer src )
+ {
+ buf.put( src );
+ return this;
+ }
+
+ public ByteBuffer put( byte[] src, int offset, int length )
+ {
+ buf.put( src, offset, length );
+ return this;
+ }
+
+ public ByteBuffer compact()
+ {
+ buf.compact();
+ mark = -1;
+ return this;
+ }
+
+ public ByteOrder order()
+ {
+ return buf.order();
+ }
+
+ public ByteBuffer order( ByteOrder bo )
+ {
+ buf.order( bo );
+ return this;
+ }
+
+ public char getChar()
+ {
+ return buf.getChar();
+ }
+
+ public ByteBuffer putChar( char value )
+ {
+ buf.putChar( value );
+ return this;
+ }
+
+ public char getChar( int index )
+ {
+ return buf.getChar( index );
+ }
+
+ public ByteBuffer putChar( int index, char value )
+ {
+ buf.putChar( index, value );
+ return this;
+ }
+
+ public CharBuffer asCharBuffer()
+ {
+ return buf.asCharBuffer();
+ }
+
+ public short getShort()
+ {
+ return buf.getShort();
+ }
+
+ public ByteBuffer putShort( short value )
+ {
+ buf.putShort( value );
+ return this;
+ }
+
+ public short getShort( int index )
+ {
+ return buf.getShort( index );
+ }
+
+ public ByteBuffer putShort( int index, short value )
+ {
+ buf.putShort( index, value );
+ return this;
+ }
+
+ public ShortBuffer asShortBuffer()
+ {
+ return buf.asShortBuffer();
+ }
+
+ public int getInt()
+ {
+ return buf.getInt();
+ }
+
+ public ByteBuffer putInt( int value )
+ {
+ buf.putInt( value );
+ return this;
+ }
+
+ public int getInt( int index )
+ {
+ return buf.getInt( index );
+ }
+
+ public ByteBuffer putInt( int index, int value )
+ {
+ buf.putInt( index, value );
+ return this;
+ }
+
+ public IntBuffer asIntBuffer()
+ {
+ return buf.asIntBuffer();
+ }
+
+ public long getLong()
+ {
+ return buf.getLong();
+ }
+
+ public ByteBuffer putLong( long value )
+ {
+ buf.putLong( value );
+ return this;
+ }
+
+ public long getLong( int index )
+ {
+ return buf.getLong( index );
+ }
+
+ public ByteBuffer putLong( int index, long value )
+ {
+ buf.putLong( index, value );
+ return this;
+ }
+
+ public LongBuffer asLongBuffer()
+ {
+ return buf.asLongBuffer();
+ }
+
+ public float getFloat()
+ {
+ return buf.getFloat();
+ }
+
+ public ByteBuffer putFloat( float value )
+ {
+ buf.putFloat( value );
+ return this;
+ }
+
+ public float getFloat( int index )
+ {
+ return buf.getFloat( index );
+ }
+
+ public ByteBuffer putFloat( int index, float value )
+ {
+ buf.putFloat( index, value );
+ return this;
+ }
+
+ public FloatBuffer asFloatBuffer()
+ {
+ return buf.asFloatBuffer();
+ }
+
+ public double getDouble()
+ {
+ return buf.getDouble();
+ }
+
+ public ByteBuffer putDouble( double value )
+ {
+ buf.putDouble( value );
+ return this;
+ }
+
+ public double getDouble( int index )
+ {
+ return buf.getDouble( index );
+ }
+
+ public ByteBuffer putDouble( int index, double value )
+ {
+ buf.putDouble( index, value );
+ return this;
+ }
+
+ public DoubleBuffer asDoubleBuffer()
+ {
+ return buf.asDoubleBuffer();
+ }
+
+
+ }
+
+
+}
diff --git a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
new file mode 100644
index 0000000000..09cf785108
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
@@ -0,0 +1,250 @@
+package org.apache.mina.common.support;
+
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoFutureListener;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * A default implementation of {@link org.apache.mina.common.IoFuture}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 440259 $, $Date: 2006-09-05 14:01:47 +0900 (화, 05 9월 2006) $
+ */
+public class DefaultIoFuture implements IoFuture
+{
+ private final IoSession session;
+ private final Object lock;
+ private List listeners;
+ private Object result;
+ private boolean ready;
+
+
+ /**
+ * Creates a new instance.
+ *
+ * @param session an {@link IoSession} which is associated with this future
+ */
+ public DefaultIoFuture( IoSession session )
+ {
+ this.session = session;
+ this.lock = this;
+ }
+
+ /**
+ * Creates a new instance which uses the specified object as a lock.
+ */
+ public DefaultIoFuture( IoSession session, Object lock )
+ {
+ if( lock == null )
+ {
+ throw new NullPointerException( "lock" );
+ }
+ this.session = session;
+ this.lock = lock;
+ }
+
+ public IoSession getSession()
+ {
+ return session;
+ }
+
+ public Object getLock()
+ {
+ return lock;
+ }
+
+ public void join()
+ {
+ synchronized( lock )
+ {
+ while( !ready )
+ {
+ try
+ {
+ lock.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+ }
+
+ public boolean join( long timeoutInMillis )
+ {
+ long startTime = ( timeoutInMillis <= 0 ) ? 0 : System
+ .currentTimeMillis();
+ long waitTime = timeoutInMillis;
+
+ synchronized( lock )
+ {
+ if( ready )
+ {
+ return ready;
+ }
+ else if( waitTime <= 0 )
+ {
+ return ready;
+ }
+
+ for( ;; )
+ {
+ try
+ {
+ lock.wait( waitTime );
+ }
+ catch( InterruptedException e )
+ {
+ }
+
+ if( ready )
+ return true;
+ else
+ {
+ waitTime = timeoutInMillis - ( System.currentTimeMillis() - startTime );
+ if( waitTime <= 0 )
+ {
+ return ready;
+ }
+ }
+ }
+ }
+ }
+
+ public boolean isReady()
+ {
+ synchronized( lock )
+ {
+ return ready;
+ }
+ }
+
+ /**
+ * Sets the result of the asynchronous operation, and mark it as finished.
+ */
+ protected void setValue( Object newValue )
+ {
+ synchronized( lock )
+ {
+ // Allow only once.
+ if( ready )
+ {
+ return;
+ }
+
+ result = newValue;
+ ready = true;
+ lock.notifyAll();
+
+ notifyListeners();
+ }
+ }
+
+ /**
+ * Returns the result of the asynchronous operation.
+ */
+ protected Object getValue()
+ {
+ synchronized( lock )
+ {
+ return result;
+ }
+ }
+
+ public void addListener( IoFutureListener listener )
+ {
+ if( listener == null )
+ {
+ throw new NullPointerException( "listener" );
+ }
+
+ synchronized( lock )
+ {
+ if(listeners == null)
+ {
+ listeners = new ArrayList();
+ }
+ listeners.add( listener );
+ if( ready )
+ {
+ listener.operationComplete( this );
+ }
+ }
+ }
+
+ public void removeListener( IoFutureListener listener )
+ {
+ if( listener == null )
+ {
+ throw new NullPointerException( "listener" );
+ }
+
+ synchronized( lock )
+ {
+ listeners.remove( listener );
+ }
+ }
+
+ private void notifyListeners()
+ {
+ synchronized( lock )
+ {
+
+ if(listeners != null)
+ {
+
+ for( Iterator i = listeners.iterator(); i.hasNext(); ) {
+ ( ( IoFutureListener ) i.next() ).operationComplete( this );
+ }
+ }
+ }
+ }
+}
+
+
+
diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java b/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
new file mode 100644
index 0000000000..b8c6f29720
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
@@ -0,0 +1,440 @@
+package org.apache.mina.filter.codec;
+
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+import org.apache.mina.common.*;
+import org.apache.mina.common.support.DefaultWriteFuture;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+import org.apache.mina.util.SessionLog;
+import org.apache.mina.util.Queue;
+
+
+public class QpidProtocolCodecFilter extends IoFilterAdapter
+{
+ public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder";
+ public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder";
+
+ private static final Class[] EMPTY_PARAMS = new Class[0];
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] );
+
+ private final ProtocolCodecFactory factory;
+
+ public QpidProtocolCodecFilter( ProtocolCodecFactory factory )
+ {
+ if( factory == null )
+ {
+ throw new NullPointerException( "factory" );
+ }
+ this.factory = factory;
+ }
+
+ public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
+ {
+ if( encoder == null )
+ {
+ throw new NullPointerException( "encoder" );
+ }
+ if( decoder == null )
+ {
+ throw new NullPointerException( "decoder" );
+ }
+
+ this.factory = new ProtocolCodecFactory()
+ {
+ public ProtocolEncoder getEncoder()
+ {
+ return encoder;
+ }
+
+ public ProtocolDecoder getDecoder()
+ {
+ return decoder;
+ }
+ };
+ }
+
+ public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
+ {
+ if( encoderClass == null )
+ {
+ throw new NullPointerException( "encoderClass" );
+ }
+ if( decoderClass == null )
+ {
+ throw new NullPointerException( "decoderClass" );
+ }
+ if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) )
+ {
+ throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() );
+ }
+ if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) )
+ {
+ throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() );
+ }
+ try
+ {
+ encoderClass.getConstructor( EMPTY_PARAMS );
+ }
+ catch( NoSuchMethodException e )
+ {
+ throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." );
+ }
+ try
+ {
+ decoderClass.getConstructor( EMPTY_PARAMS );
+ }
+ catch( NoSuchMethodException e )
+ {
+ throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
+ }
+
+ this.factory = new ProtocolCodecFactory()
+ {
+ public ProtocolEncoder getEncoder() throws Exception
+ {
+ return ( ProtocolEncoder ) encoderClass.newInstance();
+ }
+
+ public ProtocolDecoder getDecoder() throws Exception
+ {
+ return ( ProtocolDecoder ) decoderClass.newInstance();
+ }
+ };
+ }
+
+ public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception
+ {
+ if( parent.contains( ProtocolCodecFilter.class ) )
+ {
+ throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." );
+ }
+ }
+
+ public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ {
+ if( !( message instanceof ByteBuffer ) )
+ {
+ nextFilter.messageReceived( session, message );
+ return;
+ }
+
+ ByteBuffer in = ( ByteBuffer ) message;
+ ProtocolDecoder decoder = getDecoder( session );
+ ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
+
+ try
+ {
+ decoder.decode( session, in, decoderOut );
+ }
+ catch( Throwable t )
+ {
+ ProtocolDecoderException pde;
+ if( t instanceof ProtocolDecoderException )
+ {
+ pde = ( ProtocolDecoderException ) t;
+ }
+ else
+ {
+ pde = new ProtocolDecoderException( t );
+ }
+ pde.setHexdump( in.getHexDump() );
+ throw pde;
+ }
+ finally
+ {
+ // Dispose the decoder if this session is connectionless.
+ if( session.getTransportType().isConnectionless() )
+ {
+ disposeDecoder( session );
+ }
+
+ // Release the read buffer.
+ in.release();
+
+ decoderOut.flush();
+ }
+ }
+
+ public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ {
+ if( message instanceof HiddenByteBuffer )
+ {
+ return;
+ }
+
+ if( !( message instanceof MessageByteBuffer ) )
+ {
+ nextFilter.messageSent( session, message );
+ return;
+ }
+
+ nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message );
+ }
+
+ public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception
+ {
+ Object message = writeRequest.getMessage();
+ if( message instanceof ByteBuffer )
+ {
+ nextFilter.filterWrite( session, writeRequest );
+ return;
+ }
+
+ ProtocolEncoder encoder = getEncoder( session );
+ ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest );
+
+ try
+ {
+ encoder.encode( session, message, encoderOut );
+ encoderOut.flush();
+ nextFilter.filterWrite(
+ session,
+ new IoFilter.WriteRequest(
+ new MessageByteBuffer( writeRequest.getMessage() ),
+ writeRequest.getFuture(), writeRequest.getDestination() ) );
+ }
+ catch( Throwable t )
+ {
+ ProtocolEncoderException pee;
+ if( t instanceof ProtocolEncoderException )
+ {
+ pee = ( ProtocolEncoderException ) t;
+ }
+ else
+ {
+ pee = new ProtocolEncoderException( t );
+ }
+ throw pee;
+ }
+ finally
+ {
+ // Dispose the encoder if this session is connectionless.
+ if( session.getTransportType().isConnectionless() )
+ {
+ disposeEncoder( session );
+ }
+ }
+ }
+
+ public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception
+ {
+ // Call finishDecode() first when a connection is closed.
+ ProtocolDecoder decoder = getDecoder( session );
+ ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
+ try
+ {
+ decoder.finishDecode( session, decoderOut );
+ }
+ catch( Throwable t )
+ {
+ ProtocolDecoderException pde;
+ if( t instanceof ProtocolDecoderException )
+ {
+ pde = ( ProtocolDecoderException ) t;
+ }
+ else
+ {
+ pde = new ProtocolDecoderException( t );
+ }
+ throw pde;
+ }
+ finally
+ {
+ // Dispose all.
+ disposeEncoder( session );
+ disposeDecoder( session );
+
+ decoderOut.flush();
+ }
+
+ nextFilter.sessionClosed( session );
+ }
+
+ private ProtocolEncoder getEncoder( IoSession session ) throws Exception
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
+ if( encoder == null )
+ {
+ encoder = factory.getEncoder();
+ session.setAttribute( ENCODER, encoder );
+ }
+ return encoder;
+ }
+
+ private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
+ {
+ return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest );
+ }
+
+ private ProtocolDecoder getDecoder( IoSession session ) throws Exception
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
+ if( decoder == null )
+ {
+ decoder = factory.getDecoder();
+ session.setAttribute( DECODER, decoder );
+ }
+ return decoder;
+ }
+
+ private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter )
+ {
+ return new SimpleProtocolDecoderOutput( session, nextFilter );
+ }
+
+ private void disposeEncoder( IoSession session )
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
+ if( encoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ encoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ SessionLog.warn(
+ session,
+ "Failed to dispose: " + encoder.getClass().getName() +
+ " (" + encoder + ')' );
+ }
+ }
+
+ private void disposeDecoder( IoSession session )
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
+ if( decoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ decoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ SessionLog.warn(
+ session,
+ "Falied to dispose: " + decoder.getClass().getName() +
+ " (" + decoder + ')' );
+ }
+ }
+
+ private static class HiddenByteBuffer extends ByteBufferProxy
+ {
+ private HiddenByteBuffer( ByteBuffer buf )
+ {
+ super( buf );
+ }
+ }
+
+ private static class MessageByteBuffer extends ByteBufferProxy
+ {
+ private final Object message;
+
+ private MessageByteBuffer( Object message )
+ {
+ super( EMPTY_BUFFER );
+ this.message = message;
+ }
+
+ public void acquire()
+ {
+ // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
+ }
+
+ public void release()
+ {
+ // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
+ }
+ }
+
+ private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput
+ {
+ private ByteBuffer buffer;
+
+ private final IoSession session;
+ private final IoFilter.NextFilter nextFilter;
+ private final IoFilter.WriteRequest writeRequest;
+
+ public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
+ {
+ this.session = session;
+ this.nextFilter = nextFilter;
+ this.writeRequest = writeRequest;
+ }
+
+
+
+ public void write( ByteBuffer buf )
+ {
+ if(buffer != null)
+ {
+ flush();
+ }
+ buffer = buf;
+ }
+
+ public void mergeAll()
+ {
+ }
+
+ public WriteFuture flush()
+ {
+ WriteFuture future = null;
+ if( buffer == null )
+ {
+ return null;
+ }
+ else
+ {
+ ByteBuffer buf = buffer;
+ // Flush only when the buffer has remaining.
+ if( buf.hasRemaining() )
+ {
+ future = doFlush( buf );
+ }
+
+ }
+
+ return future;
+ }
+
+
+ protected WriteFuture doFlush( ByteBuffer buf )
+ {
+ WriteFuture future = new DefaultWriteFuture( session );
+ nextFilter.filterWrite(
+ session,
+ new IoFilter.WriteRequest(
+ buf,
+ future, writeRequest.getDestination() ) );
+ return future;
+ }
+ }
+}
+
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index ff0bc798da..7eef73f337 100644
--- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -22,6 +22,7 @@ package org.apache.qpid.codec;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
@@ -48,6 +49,9 @@ import org.apache.qpid.framing.ProtocolInitiation;
*/
public class AMQDecoder extends CumulativeProtocolDecoder
{
+
+ private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+
/** Holds the 'normal' AMQP data decoder. */
private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
@@ -171,4 +175,97 @@ public class AMQDecoder extends CumulativeProtocolDecoder
{
_expectProtocolInitiation = expectProtocolInitiation;
}
+
+
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode( IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out ) throws Exception
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ // if we have a session buffer, append data to that otherwise
+ // use the buffer read from the network directly
+ if( buf != null )
+ {
+ buf.put( in );
+ buf.flip();
+ }
+ else
+ {
+ buf = in;
+ }
+
+ for( ;; )
+ {
+ int oldPos = buf.position();
+ boolean decoded = doDecode( session, buf, out );
+ if( decoded )
+ {
+ if( buf.position() == oldPos )
+ {
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed." );
+ }
+
+ if( !buf.hasRemaining() )
+ {
+ break;
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if ( buf.hasRemaining() )
+ {
+ storeRemainingInSession( buf, session );
+ }
+ else
+ {
+ removeSessionBuffer( session );
+ }
+ }
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose( IoSession session ) throws Exception
+ {
+ removeSessionBuffer( session );
+ }
+
+ private void removeSessionBuffer(IoSession session)
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ if( buf != null )
+ {
+ buf.release();
+ session.removeAttribute( BUFFER );
+ }
+ }
+
+ private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
+
+ private void storeRemainingInSession(ByteBuffer buf, IoSession session)
+ {
+ ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
+ remainingBuf.setAutoExpand( true );
+ remainingBuf.put( buf );
+ session.setAttribute( BUFFER, remainingBuf );
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
index 3abd97ddb7..fcd336b180 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public interface AMQBody
{
@@ -36,4 +38,6 @@ public interface AMQBody
//public void populateFromBuffer(ByteBuffer buffer, long size)
// throws AMQFrameDecodingException, AMQProtocolVersionException;
+
+ void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
index 11f505fd4b..02a46f3748 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -27,7 +27,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
private final int _channel;
private final AMQBody _bodyFrame;
-
+ public static final byte FRAME_END_BYTE = (byte) 0xCE;
public AMQFrame(final int channel, final AMQBody bodyFrame)
@@ -47,13 +47,19 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
}
+ public static final int getFrameOverhead()
+ {
+ return 1 + 2 + 4 + 1;
+ }
+
+
public void writePayload(ByteBuffer buffer)
{
buffer.put(_bodyFrame.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, _channel);
EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
_bodyFrame.writePayload(buffer);
- buffer.put((byte) 0xCE);
+ buffer.put(FRAME_END_BYTE);
}
public final int getChannel()
@@ -66,10 +72,54 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
return _bodyFrame;
}
-
-
public String toString()
{
return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
}
+
+ public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body)
+ {
+ buffer.put(body.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
+ body.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
+ public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2)
+ {
+ buffer.put(body1.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+ body1.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body2.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+ body2.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
+ public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3)
+ {
+ buffer.put(body1.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+ body1.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body2.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+ body2.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body3.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body3.getSize());
+ body3.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
index 5215bcbd66..64af717342 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
@@ -24,7 +24,9 @@ package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
@@ -86,4 +88,9 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
}
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException
+ {
+ session.methodFrameReceived(channelId, this);
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index cf64b0475a..07e2faaf7e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -26,8 +26,7 @@ import org.apache.mina.common.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.*;
import java.lang.ref.WeakReference;
/**
@@ -38,6 +37,62 @@ import java.lang.ref.WeakReference;
*/
public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
{
+ private static final byte MINUS = (byte)'-';
+ private static final byte ZERO = (byte) '0';
+
+
+
+ private final class TokenizerImpl implements AMQShortStringTokenizer
+ {
+ private final byte _delim;
+ private int _count = -1;
+ private int _pos = 0;
+
+ public TokenizerImpl(final byte delim)
+ {
+ _delim = delim;
+ }
+
+ public int countTokens()
+ {
+ if(_count == -1)
+ {
+ _count = 1 + AMQShortString.this.occurences(_delim);
+ }
+ return _count;
+ }
+
+ public AMQShortString nextToken()
+ {
+ if(_pos <= AMQShortString.this.length())
+ {
+ int nextDelim = AMQShortString.this.indexOf(_delim, _pos);
+ if(nextDelim == -1)
+ {
+ nextDelim = AMQShortString.this.length();
+ }
+
+ AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++);
+ _pos = nextDelim;
+ return nextToken;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public boolean hasMoreTokens()
+ {
+ return _pos <= AMQShortString.this.length();
+ }
+ }
+
+ private AMQShortString substring(final int from, final int to)
+ {
+ return new AMQShortString(_data, from, to);
+ }
+
private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap =
new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>()
@@ -53,7 +108,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
- private final ByteBuffer _data;
+ private final byte[] _data;
+ private final int _offset;
private int _hashCode;
private final int _length;
private static final char[] EMPTY_CHAR_ARRAY = new char[0];
@@ -63,17 +119,25 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public AMQShortString(byte[] data)
{
- _data = ByteBuffer.wrap(data);
+ _data = data.clone();
_length = data.length;
+ _offset = 0;
+ }
+
+ public AMQShortString(byte[] data, int pos)
+ {
+ final int size = data[pos++];
+ final byte[] dataCopy = new byte[size];
+ System.arraycopy(data,pos,dataCopy,0,size);
+ _length = size;
+ _data = dataCopy;
+ _offset = 0;
}
public AMQShortString(String data)
{
this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
- if (data != null)
- {
- _hashCode = data.hashCode();
- }
+
}
public AMQShortString(char[] data)
@@ -85,14 +149,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
final int length = data.length;
final byte[] stringBytes = new byte[length];
+ int hash = 0;
for (int i = 0; i < length; i++)
{
stringBytes[i] = (byte) (0xFF & data[i]);
+ hash = (31 * hash) + stringBytes[i];
}
+ _hashCode = hash;
+ _data = stringBytes;
- _data = ByteBuffer.wrap(stringBytes);
- _data.rewind();
_length = length;
+ _offset = 0;
}
@@ -108,20 +175,31 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
}
- _data = ByteBuffer.wrap(stringBytes);
- _data.rewind();
+ _data = stringBytes;
_hashCode = hash;
_length = length;
+ _offset = 0;
}
- private AMQShortString(ByteBuffer data)
+ private AMQShortString(ByteBuffer data, final int length)
{
- _data = data;
- _length = data.limit();
+ byte[] dataBytes = new byte[length];
+ data.get(dataBytes);
+ _data = dataBytes;
+ _length = length;
+ _offset = 0;
+
+ }
+ private AMQShortString(final byte[] data, final int from, final int to)
+ {
+ _offset = from;
+ _length = to - from;
+ _data = data;
}
+
/**
* Get the length of the short string
* @return length of the underlying byte array
@@ -134,7 +212,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public char charAt(int index)
{
- return (char) _data.get(index);
+ return (char) _data[_offset + index];
}
@@ -146,27 +224,24 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public int writeToByteArray(byte[] encoding, int pos)
{
final int size = length();
- encoding[pos++] = (byte) length();
- for (int i = 0; i < size; i++)
- {
- encoding[pos++] = _data.get(i);
- }
-
- return pos;
+ encoding[pos++] = (byte) size;
+ System.arraycopy(_data,_offset,encoding,pos,size);
+ return pos+size;
}
public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
{
- final byte len = byteEncodedDestination[pos];
- if (len == 0)
+
+ final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos);
+ if(shortString.length() == 0)
{
return null;
}
-
- ByteBuffer data = ByteBuffer.wrap(byteEncodedDestination, pos + 1, len).slice();
-
- return new AMQShortString(data);
+ else
+ {
+ return shortString;
+ }
}
public static AMQShortString readFromBuffer(ByteBuffer buffer)
@@ -178,90 +253,59 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
}
else
{
- ByteBuffer data = buffer.slice();
- data.limit(length);
- data.rewind();
- buffer.skip(length);
- return new AMQShortString(data);
+ return new AMQShortString(buffer, length);
}
}
public byte[] getBytes()
{
-
- if (_data.buf().hasArray() && (_data.arrayOffset() == 0))
+ if(_offset == 0 && _length == _data.length)
{
- return _data.array();
+ return _data.clone();
}
else
{
- final int size = length();
- byte[] b = new byte[size];
- ByteBuffer buf = _data.duplicate();
- buf.rewind();
- buf.get(b);
-
- return b;
+ byte[] data = new byte[_length];
+ System.arraycopy(_data,_offset,data,0,_length);
+ return data;
}
-
}
public void writeToBuffer(ByteBuffer buffer)
{
final int size = length();
- if (size != 0)
- {
-
- buffer.setAutoExpand(true);
- buffer.put((byte) size);
- if (_data.buf().hasArray())
- {
- buffer.put(_data.array(), _data.arrayOffset(), length());
- }
- else
- {
-
- for (int i = 0; i < size; i++)
- {
-
- buffer.put(_data.get(i));
- }
- }
- }
- else
- {
- // really writing out unsigned byte
- buffer.put((byte) 0);
- }
+ //buffer.setAutoExpand(true);
+ buffer.put((byte) size);
+ buffer.put(_data, _offset, size);
}
private final class CharSubSequence implements CharSequence
{
- private final int _offset;
+ private final int _sequenceOffset;
private final int _end;
public CharSubSequence(final int offset, final int end)
{
- _offset = offset;
+ _sequenceOffset = offset;
_end = end;
}
public int length()
{
- return _end - _offset;
+ return _end - _sequenceOffset;
}
public char charAt(int index)
{
- return AMQShortString.this.charAt(index + _offset);
+ return AMQShortString.this.charAt(index + _sequenceOffset);
}
public CharSequence subSequence(int start, int end)
{
- return new CharSubSequence(start + _offset, end + _offset);
+ return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset);
}
}
@@ -272,7 +316,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
for (int i = 0; i < size; i++)
{
- chars[i] = (char) _data.get(i);
+ chars[i] = (char) _data[i + _offset];
}
return chars;
@@ -285,6 +329,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public boolean equals(Object o)
{
+
+
+ if(o instanceof AMQShortString)
+ {
+ return equals((AMQShortString)o);
+ }
+ if(o instanceof CharSequence)
+ {
+ return equals((CharSequence)o);
+ }
+
if (o == null)
{
return false;
@@ -295,26 +350,40 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return true;
}
- if (o instanceof AMQShortString)
- {
- final AMQShortString otherString = (AMQShortString) o;
+ return false;
- if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
- {
- return false;
- }
+ }
+
+ public boolean equals(final AMQShortString otherString)
+ {
+ if (otherString == this)
+ {
+ return true;
+ }
- return _data.equals(otherString._data);
+ if (otherString == null)
+ {
+ return false;
+ }
+ if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+ {
+ return false;
}
- return (o instanceof CharSequence) && equals((CharSequence) o);
+ return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
+ || Arrays.equals(getBytes(),otherString.getBytes());
}
public boolean equals(CharSequence s)
{
+ if(s instanceof AMQShortString)
+ {
+ return equals((AMQShortString)s);
+ }
+
if (s == null)
{
return false;
@@ -345,7 +414,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
for (int i = 0; i < size; i++)
{
- hash = (31 * hash) + _data.get(i);
+ hash = (31 * hash) + _data[i+_offset];
}
_hashCode = hash;
@@ -380,8 +449,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
for (int i = 0; i < length(); i++)
{
- final byte d = _data.get(i);
- final byte n = name._data.get(i);
+ final byte d = _data[i+_offset];
+ final byte n = name._data[i+name._offset];
if (d < n)
{
return -1;
@@ -398,6 +467,12 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
}
+ public AMQShortStringTokenizer tokenize(byte delim)
+ {
+ return new TokenizerImpl(delim);
+ }
+
+
public AMQShortString intern()
{
@@ -435,4 +510,111 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return internString;
}
+
+ private int occurences(final byte delim)
+ {
+ int count = 0;
+ final int end = _offset + _length;
+ for(int i = _offset ; i < end ; i++ )
+ {
+ if(_data[i] == delim)
+ {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private int indexOf(final byte val, final int pos)
+ {
+
+ for(int i = pos; i < length(); i++)
+ {
+ if(_data[_offset+i] == val)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+
+ public static AMQShortString join(final Collection<AMQShortString> terms,
+ final AMQShortString delim)
+ {
+ if(terms.size() == 0)
+ {
+ return EMPTY_STRING;
+ }
+
+ int size = delim.length() * (terms.size() - 1);
+ for(AMQShortString term : terms)
+ {
+ size += term.length();
+ }
+
+ byte[] data = new byte[size];
+ int pos = 0;
+ final byte[] delimData = delim._data;
+ final int delimOffset = delim._offset;
+ final int delimLength = delim._length;
+
+
+ for(AMQShortString term : terms)
+ {
+
+ if(pos!=0)
+ {
+ System.arraycopy(delimData, delimOffset,data,pos, delimLength);
+ pos+=delimLength;
+ }
+ System.arraycopy(term._data,term._offset,data,pos,term._length);
+ pos+=term._length;
+ }
+
+
+
+ return new AMQShortString(data,0,size);
+ }
+
+ public int toIntValue()
+ {
+ int pos = 0;
+ int val = 0;
+
+
+ boolean isNegative = (_data[pos] == MINUS);
+ if(isNegative)
+ {
+ pos++;
+ }
+ while(pos < _length)
+ {
+ int digit = (int) (_data[pos++] - ZERO);
+ if((digit < 0) || (digit > 9))
+ {
+ throw new NumberFormatException("\""+toString()+"\" is not a valid number");
+ }
+ val = val * 10;
+ val += digit;
+ }
+ if(isNegative)
+ {
+ val = val * -1;
+ }
+ return val;
+ }
+
+ public boolean contains(final byte b)
+ {
+ for(int i = 0; i < _length; i++)
+ {
+ if(_data[i] == b)
+ {
+ return true;
+ }
+ }
+ return false; //To change body of created methods use File | Settings | File Templates.
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java
new file mode 100644
index 0000000000..e2db8906a1
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java
@@ -0,0 +1,31 @@
+package org.apache.qpid.framing;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface AMQShortStringTokenizer
+{
+
+ public int countTokens();
+
+ public AMQShortString nextToken();
+
+ boolean hasMoreTokens();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
index 7b6699b783..94030f383e 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
@@ -24,7 +24,6 @@ import org.apache.mina.common.ByteBuffer;
public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
{
- private AMQDataBlock _firstFrame;
private AMQDataBlock[] _blocks;
@@ -33,27 +32,12 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD
_blocks = blocks;
}
- /**
- * The encoded block will be logically first before the AMQDataBlocks which are encoded
- * into the buffer afterwards.
- * @param encodedBlock already-encoded data
- * @param blocks some blocks to be encoded.
- */
- public CompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock[] blocks)
- {
- this(blocks);
- _firstFrame = encodedBlock;
- }
public AMQDataBlock[] getBlocks()
{
return _blocks;
}
- public AMQDataBlock getFirstFrame()
- {
- return _firstFrame;
- }
public long getSize()
{
@@ -62,19 +46,11 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD
{
frameSize += _blocks[i].getSize();
}
- if (_firstFrame != null)
- {
- frameSize += _firstFrame.getSize();
- }
return frameSize;
}
public void writePayload(ByteBuffer buffer)
{
- if (_firstFrame != null)
- {
- _firstFrame.writePayload(buffer);
- }
for (int i = 0; i < _blocks.length; i++)
{
_blocks[i].writePayload(buffer);
@@ -90,7 +66,7 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD
else
{
StringBuilder buf = new StringBuilder(this.getClass().getName());
- buf.append("{encodedBlock=").append(_firstFrame);
+ buf.append("{");
for (int i = 0 ; i < _blocks.length; i++)
{
buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]");
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index cbee1680f7..969df954ce 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public class ContentBody implements AMQBody
{
@@ -68,6 +70,12 @@ public class ContentBody implements AMQBody
}
}
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.contentBodyReceived(channelId, this);
+ }
+
protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
{
if (size > 0)
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 80a61544b3..83e5a7e341 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public class ContentHeaderBody implements AMQBody
{
@@ -110,6 +112,12 @@ public class ContentHeaderBody implements AMQBody
properties.writePropertyListPayload(buffer);
}
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.contentHeaderReceived(channelId, this);
+ }
+
public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,
long bodySize)
{
diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
index ef7163bd40..15a43345b5 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public class HeartbeatBody implements AMQBody
{
@@ -55,6 +57,12 @@ public class HeartbeatBody implements AMQBody
{
}
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.heartbeatBodyReceived(channelId, this);
+ }
+
protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
{
if(size > 0)
diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
index d8b6b25b92..d7194640d4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
@@ -70,7 +70,7 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot
return new MethodConverter_0_9.MessagePublishInfoImpl(exchange,
publishBody.getImmediate(),
publishBody.getMandatory(),
- routingKey == null ? null : routingKey.intern());
+ routingKey);
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java
index ba3c5d03fa..b2a09ac592 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -94,21 +94,23 @@ public class Job implements Runnable
/**
* Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job.
*/
- void processAll()
+ boolean processAll()
{
// limit the number of events processed in one run
- for (int i = 0; i < _maxEvents; i++)
+ int i = _maxEvents;
+ while( --i != 0 )
{
Event e = _eventQueue.poll();
if (e == null)
{
- break;
+ return true;
}
else
{
e.process(_session);
}
}
+ return false;
}
/**
@@ -144,9 +146,15 @@ public class Job implements Runnable
*/
public void run()
{
- processAll();
- deactivate();
- _completionHandler.completed(_session, this);
+ if(processAll())
+ {
+ deactivate();
+ _completionHandler.completed(_session, this);
+ }
+ else
+ {
+ _completionHandler.notCompleted(_session, this);
+ }
}
/**
@@ -158,5 +166,7 @@ public class Job implements Runnable
static interface JobCompletionHandler
{
public void completed(IoSession session, Job job);
+
+ public void notCompleted(final IoSession session, final Job job);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
index cbe08a192e..8352b5af77 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -29,8 +29,8 @@ import org.apache.qpid.pool.Event.CloseEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ExecutorService;
/**
* PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it
@@ -84,9 +84,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
/** Used for debugging purposes. */
private static final Logger _logger = LoggerFactory.getLogger(PoolingFilter.class);
- /** Holds a mapping from Mina sessions to batched jobs for execution. */
- private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
-
/** Holds the managed reference to obtain the executor for the batched jobs. */
private final ReferenceCountingExecutorService _poolReference;
@@ -94,7 +91,9 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
private final String _name;
/** Defines the maximum number of events that will be batched into a single job. */
- private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+ static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+
+ private final int _maxEvents;
/**
* Creates a named pooling filter, on the specified shared thread pool.
@@ -102,10 +101,11 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
* @param refCountingPool The thread pool reference.
* @param name The identifying name of the filter type.
*/
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
+ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents)
{
_poolReference = refCountingPool;
_name = name;
+ _maxEvents = maxEvents;
}
/**
@@ -160,20 +160,34 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
/**
* Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
*
- * @param session The Mina session to work in.
+ * @param job The job.
* @param event The event to hand off asynchronously.
*/
- void fireAsynchEvent(IoSession session, Event event)
+ void fireAsynchEvent(Job job, Event event)
{
- Job job = getJobForSession(session);
+
// job.acquire(); //prevents this job being removed from _jobs
job.add(event);
- // Additional checks on pool to check that it hasn't shutdown.
- // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown())
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
{
- _poolReference.getPool().execute(job);
+ return;
+ }
+
+ // rather than perform additional checks on pool to check that it hasn't shutdown.
+ // catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate())
+ {
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
}
}
@@ -186,7 +200,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public void createNewJobForSession(IoSession session)
{
- Job job = new Job(session, this, _maxEvents);
+ Job job = new Job(session, this, MAX_JOB_EVENTS);
session.setAttribute(_name, job);
}
@@ -197,7 +211,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*
* @return The Job for this filter to place asynchronous events into.
*/
- private Job getJobForSession(IoSession session)
+ public Job getJobForSession(IoSession session)
{
return (Job) session.getAttribute(_name);
}
@@ -233,17 +247,57 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
// }
// }
// else
+
+
if (!job.isComplete())
{
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+
// ritchiem : 2006-12-13 Do we need to perform the additional checks here?
// Can the pool be shutdown at this point?
- if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown())
+ if (job.activate())
{
- _poolReference.getPool().execute(job);
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+
}
}
}
+ public void notCompleted(IoSession session, Job job)
+ {
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+
+ }
+
+
+
/**
* No-op pass through filter to the next filter in the chain.
*
@@ -400,7 +454,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name);
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS));
}
/**
@@ -412,8 +466,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public void messageReceived(NextFilter nextFilter, final IoSession session, Object message)
{
-
- fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message));
+ Job job = getJobForSession(session);
+ fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message));
}
/**
@@ -424,7 +478,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public void sessionClosed(final NextFilter nextFilter, final IoSession session)
{
- fireAsynchEvent(session, new CloseEvent(nextFilter));
+ Job job = getJobForSession(session);
+ fireAsynchEvent(job, new CloseEvent(nextFilter));
}
}
@@ -442,7 +497,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name);
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS));
}
/**
@@ -454,7 +509,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
{
- fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest));
+ Job job = getJobForSession(session);
+ fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest));
}
/**
@@ -465,7 +521,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo
*/
public void sessionClosed(final NextFilter nextFilter, final IoSession session)
{
- fireAsynchEvent(session, new CloseEvent(nextFilter));
+ Job job = getJobForSession(session);
+ fireAsynchEvent(job, new CloseEvent(nextFilter));
}
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
index 2fbeeda1d4..5a7679a972 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
@@ -21,6 +21,7 @@
package org.apache.qpid.protocol;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.AMQException;
/**
* AMQMethodListener is a listener that receives notifications of AMQP methods. The methods are packaged as events in
@@ -57,7 +58,7 @@ public interface AMQMethodListener
*
* @todo Consider narrowing the exception.
*/
- <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception;
+ <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException;
/**
* Notifies the listener of an error on the event context to which it is listening. The listener should perform
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
index 035645aad2..b56a05f725 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.protocol;
-import org.apache.qpid.framing.VersionSpecificRegistry;
-import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
/**
* AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to
@@ -46,4 +46,12 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto
// public VersionSpecificRegistry getRegistry();
MethodRegistry getMethodRegistry();
+
+
+ public void methodFrameReceived(int channelId, AMQMethodBody body) throws AMQException;
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException;
+ public void contentBodyReceived(int channelId, ContentBody body) throws AMQException;
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
+
+
}