diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-04-24 01:54:20 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-04-24 01:54:20 +0000 |
| commit | 559e9702d5a7c26dddee708e267f2f685d4de89e (patch) | |
| tree | b3114d58b39092b4699186533a50553715e42b96 /java/common/src | |
| parent | 04fe7be0efbc3687a5a302fea6fbec82adbfedae (diff) | |
| download | qpid-python-559e9702d5a7c26dddee708e267f2f685d4de89e.tar.gz | |
QPID-832 merge M2.x
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@651133 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
57 files changed, 7971 insertions, 888 deletions
diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java new file mode 100644 index 0000000000..bed80d5954 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java @@ -0,0 +1,528 @@ +package org.apache.mina.common; + +import org.apache.mina.common.ByteBuffer; + +import java.nio.*; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public class FixedSizeByteBufferAllocator implements ByteBufferAllocator +{ + + + private static final int MINIMUM_CAPACITY = 1; + + public FixedSizeByteBufferAllocator () + { + } + + public ByteBuffer allocate( int capacity, boolean direct ) + { + java.nio.ByteBuffer nioBuffer; + if( direct ) + { + nioBuffer = java.nio.ByteBuffer.allocateDirect( capacity ); + } + else + { + nioBuffer = java.nio.ByteBuffer.allocate( capacity ); + } + return new FixedSizeByteBuffer( nioBuffer ); + } + + public ByteBuffer wrap( java.nio.ByteBuffer nioBuffer ) + { + return new FixedSizeByteBuffer( nioBuffer ); + } + + public void dispose() + { + } + + + + private static final class FixedSizeByteBuffer extends ByteBuffer + { + private java.nio.ByteBuffer buf; + private int refCount = 1; + private int mark = -1; + + + protected FixedSizeByteBuffer( java.nio.ByteBuffer buf ) + { + this.buf = buf; + buf.order( ByteOrder.BIG_ENDIAN ); + refCount = 1; + } + + public synchronized void acquire() + { + if( refCount <= 0 ) + { + throw new IllegalStateException( "Already released buffer." ); + } + + refCount ++; + } + + public void release() + { + synchronized( this ) + { + if( refCount <= 0 ) + { + refCount = 0; + throw new IllegalStateException( + "Already released buffer. You released the buffer too many times." ); + } + + refCount --; + if( refCount > 0) + { + return; + } + } + } + + public java.nio.ByteBuffer buf() + { + return buf; + } + + public boolean isPooled() + { + return false; + } + + public void setPooled( boolean pooled ) + { + } + + public ByteBuffer duplicate() { + return new FixedSizeByteBuffer( this.buf.duplicate() ); + } + + public ByteBuffer slice() { + return new FixedSizeByteBuffer( this.buf.slice() ); + } + + public ByteBuffer asReadOnlyBuffer() { + return new FixedSizeByteBuffer( this.buf.asReadOnlyBuffer() ); + } + + public byte[] array() + { + return buf.array(); + } + + public int arrayOffset() + { + return buf.arrayOffset(); + } + + public boolean isDirect() + { + return buf.isDirect(); + } + + public boolean isReadOnly() + { + return buf.isReadOnly(); + } + + public int capacity() + { + return buf.capacity(); + } + + public ByteBuffer capacity( int newCapacity ) + { + if( newCapacity > capacity() ) + { + // Allocate a new buffer and transfer all settings to it. + int pos = position(); + int limit = limit(); + ByteOrder bo = order(); + + capacity0( newCapacity ); + buf.limit( limit ); + if( mark >= 0 ) + { + buf.position( mark ); + buf.mark(); + } + buf.position( pos ); + buf.order( bo ); + } + + return this; + } + + protected void capacity0( int requestedCapacity ) + { + int newCapacity = MINIMUM_CAPACITY; + while( newCapacity < requestedCapacity ) + { + newCapacity <<= 1; + } + + java.nio.ByteBuffer oldBuf = this.buf; + java.nio.ByteBuffer newBuf; + if( isDirect() ) + { + newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity ); + } + else + { + newBuf = java.nio.ByteBuffer.allocate( newCapacity ); + } + + newBuf.clear(); + oldBuf.clear(); + newBuf.put( oldBuf ); + this.buf = newBuf; + } + + + + public boolean isAutoExpand() + { + return false; + } + + public ByteBuffer setAutoExpand( boolean autoExpand ) + { + if(autoExpand) throw new IllegalArgumentException(); + else return this; + } + + public ByteBuffer expand( int pos, int expectedRemaining ) + { + int end = pos + expectedRemaining; + if( end > capacity() ) + { + // The buffer needs expansion. + capacity( end ); + } + + if( end > limit() ) + { + // We call limit() directly to prevent StackOverflowError + buf.limit( end ); + } + return this; + } + + public int position() + { + return buf.position(); + } + + public ByteBuffer position( int newPosition ) + { + + buf.position( newPosition ); + if( mark > newPosition ) + { + mark = -1; + } + return this; + } + + public int limit() + { + return buf.limit(); + } + + public ByteBuffer limit( int newLimit ) + { + buf.limit( newLimit ); + if( mark > newLimit ) + { + mark = -1; + } + return this; + } + + public ByteBuffer mark() + { + buf.mark(); + mark = position(); + return this; + } + + public int markValue() + { + return mark; + } + + public ByteBuffer reset() + { + buf.reset(); + return this; + } + + public ByteBuffer clear() + { + buf.clear(); + mark = -1; + return this; + } + + public ByteBuffer flip() + { + buf.flip(); + mark = -1; + return this; + } + + public ByteBuffer rewind() + { + buf.rewind(); + mark = -1; + return this; + } + + public byte get() + { + return buf.get(); + } + + public ByteBuffer put( byte b ) + { + buf.put( b ); + return this; + } + + public byte get( int index ) + { + return buf.get( index ); + } + + public ByteBuffer put( int index, byte b ) + { + buf.put( index, b ); + return this; + } + + public ByteBuffer get( byte[] dst, int offset, int length ) + { + buf.get( dst, offset, length ); + return this; + } + + public ByteBuffer put( java.nio.ByteBuffer src ) + { + buf.put( src ); + return this; + } + + public ByteBuffer put( byte[] src, int offset, int length ) + { + buf.put( src, offset, length ); + return this; + } + + public ByteBuffer compact() + { + buf.compact(); + mark = -1; + return this; + } + + public ByteOrder order() + { + return buf.order(); + } + + public ByteBuffer order( ByteOrder bo ) + { + buf.order( bo ); + return this; + } + + public char getChar() + { + return buf.getChar(); + } + + public ByteBuffer putChar( char value ) + { + buf.putChar( value ); + return this; + } + + public char getChar( int index ) + { + return buf.getChar( index ); + } + + public ByteBuffer putChar( int index, char value ) + { + buf.putChar( index, value ); + return this; + } + + public CharBuffer asCharBuffer() + { + return buf.asCharBuffer(); + } + + public short getShort() + { + return buf.getShort(); + } + + public ByteBuffer putShort( short value ) + { + buf.putShort( value ); + return this; + } + + public short getShort( int index ) + { + return buf.getShort( index ); + } + + public ByteBuffer putShort( int index, short value ) + { + buf.putShort( index, value ); + return this; + } + + public ShortBuffer asShortBuffer() + { + return buf.asShortBuffer(); + } + + public int getInt() + { + return buf.getInt(); + } + + public ByteBuffer putInt( int value ) + { + buf.putInt( value ); + return this; + } + + public int getInt( int index ) + { + return buf.getInt( index ); + } + + public ByteBuffer putInt( int index, int value ) + { + buf.putInt( index, value ); + return this; + } + + public IntBuffer asIntBuffer() + { + return buf.asIntBuffer(); + } + + public long getLong() + { + return buf.getLong(); + } + + public ByteBuffer putLong( long value ) + { + buf.putLong( value ); + return this; + } + + public long getLong( int index ) + { + return buf.getLong( index ); + } + + public ByteBuffer putLong( int index, long value ) + { + buf.putLong( index, value ); + return this; + } + + public LongBuffer asLongBuffer() + { + return buf.asLongBuffer(); + } + + public float getFloat() + { + return buf.getFloat(); + } + + public ByteBuffer putFloat( float value ) + { + buf.putFloat( value ); + return this; + } + + public float getFloat( int index ) + { + return buf.getFloat( index ); + } + + public ByteBuffer putFloat( int index, float value ) + { + buf.putFloat( index, value ); + return this; + } + + public FloatBuffer asFloatBuffer() + { + return buf.asFloatBuffer(); + } + + public double getDouble() + { + return buf.getDouble(); + } + + public ByteBuffer putDouble( double value ) + { + buf.putDouble( value ); + return this; + } + + public double getDouble( int index ) + { + return buf.getDouble( index ); + } + + public ByteBuffer putDouble( int index, double value ) + { + buf.putDouble( index, value ); + return this; + } + + public DoubleBuffer asDoubleBuffer() + { + return buf.asDoubleBuffer(); + } + + + } + + +} diff --git a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java new file mode 100644 index 0000000000..c515263317 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.common.support; + +import org.apache.mina.common.IoFuture; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoFutureListener; + +import java.util.List; +import java.util.ArrayList; +import java.util.Iterator; + +/** + * A default implementation of {@link org.apache.mina.common.IoFuture}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 440259 $, $Date: 2006-09-05 14:01:47 +0900 (í™”, 05 9ì›” 2006) $ + */ +public class DefaultIoFuture implements IoFuture +{ + private final IoSession session; + private final Object lock; + private List listeners; + private Object result; + private boolean ready; + + + /** + * Creates a new instance. + * + * @param session an {@link IoSession} which is associated with this future + */ + public DefaultIoFuture( IoSession session ) + { + this.session = session; + this.lock = this; + } + + /** + * Creates a new instance which uses the specified object as a lock. + */ + public DefaultIoFuture( IoSession session, Object lock ) + { + if( lock == null ) + { + throw new NullPointerException( "lock" ); + } + this.session = session; + this.lock = lock; + } + + public IoSession getSession() + { + return session; + } + + public Object getLock() + { + return lock; + } + + public void join() + { + synchronized( lock ) + { + while( !ready ) + { + try + { + lock.wait(); + } + catch( InterruptedException e ) + { + } + } + } + } + + public boolean join( long timeoutInMillis ) + { + long startTime = ( timeoutInMillis <= 0 ) ? 0 : System + .currentTimeMillis(); + long waitTime = timeoutInMillis; + + synchronized( lock ) + { + if( ready ) + { + return ready; + } + else if( waitTime <= 0 ) + { + return ready; + } + + for( ;; ) + { + try + { + lock.wait( waitTime ); + } + catch( InterruptedException e ) + { + } + + if( ready ) + return true; + else + { + waitTime = timeoutInMillis - ( System.currentTimeMillis() - startTime ); + if( waitTime <= 0 ) + { + return ready; + } + } + } + } + } + + public boolean isReady() + { + synchronized( lock ) + { + return ready; + } + } + + /** + * Sets the result of the asynchronous operation, and mark it as finished. + */ + protected void setValue( Object newValue ) + { + synchronized( lock ) + { + // Allow only once. + if( ready ) + { + return; + } + + result = newValue; + ready = true; + lock.notifyAll(); + + notifyListeners(); + } + } + + /** + * Returns the result of the asynchronous operation. + */ + protected Object getValue() + { + synchronized( lock ) + { + return result; + } + } + + public void addListener( IoFutureListener listener ) + { + if( listener == null ) + { + throw new NullPointerException( "listener" ); + } + + synchronized( lock ) + { + if(listeners == null) + { + listeners = new ArrayList(); + } + listeners.add( listener ); + if( ready ) + { + listener.operationComplete( this ); + } + } + } + + public void removeListener( IoFutureListener listener ) + { + if( listener == null ) + { + throw new NullPointerException( "listener" ); + } + + synchronized( lock ) + { + listeners.remove( listener ); + } + } + + private void notifyListeners() + { + synchronized( lock ) + { + + if(listeners != null) + { + + for( Iterator i = listeners.iterator(); i.hasNext(); ) { + ( ( IoFutureListener ) i.next() ).operationComplete( this ); + } + } + } + } +} + + + diff --git a/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java b/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java new file mode 100644 index 0000000000..47f19aa76d --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java @@ -0,0 +1,48 @@ +package org.apache.mina.filter; + +import org.apache.mina.common.IoFilter;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +public class WriteBufferFullExeception extends RuntimeException +{ + private IoFilter.WriteRequest _writeRequest; + + public WriteBufferFullExeception() + { + this(null); + } + + public WriteBufferFullExeception(IoFilter.WriteRequest writeRequest) + { + _writeRequest = writeRequest; + } + + + public void setWriteRequest(IoFilter.WriteRequest writeRequest) + { + _writeRequest = writeRequest; + } + + public IoFilter.WriteRequest getWriteRequest() + { + return _writeRequest; + } +} diff --git a/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java b/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java new file mode 100644 index 0000000000..4e9db9071a --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.DefaultIoFilterChainBuilder; +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.executor.ExecutorFilter; + +import java.util.Iterator; +import java.util.List; + +/** + * This filter will turn the asynchronous filterWrite method in to a blocking send when there are more than + * the prescribed number of messages awaiting filterWrite. It should be used in conjunction with the + * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to + * cause an Out of Memory exception due to a back log of unprocessed messages. + * + * This is should only be viewed as a temporary work around for DIRMINA-302. + * + * A true solution should not be implemented as a filter as this issue will always occur. On a machine + * where the network is slower than the local producer. + * + * Suggested improvement is to allow implementation of policices on what to do when buffer is full. + * + * They could be: + * Block - As this does + * Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks + * Throw Exception - through the client filterWrite() method to allow them to get immediate feedback on buffer state + * + * <p/> + * <p>Usage: + * <p/> + * <pre><code> + * DefaultFilterChainBuilder builder = ... + * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder(); + * filter.attach( builder ); + * </code></pre> + * <p/> + * or + * <p/> + * <pre><code> + * IoFilterChain chain = ... + * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder(); + * filter.attach( chain ); + * </code></pre> + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +public class WriteBufferLimitFilterBuilder +{ + public static final String PENDING_SIZE = WriteBufferLimitFilterBuilder.class.getName() + ".pendingSize"; + + private static int DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT = 5000; + + private volatile boolean throwNotBlock = false; + + private volatile int maximumConnectionBufferCount; + private volatile long maximumConnectionBufferSize; + + private final Object _blockLock = new Object(); + + private int _blockWaiters = 0; + + + public WriteBufferLimitFilterBuilder() + { + this(DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT); + } + + public WriteBufferLimitFilterBuilder(int maxWriteBufferSize) + { + setMaximumConnectionBufferCount(maxWriteBufferSize); + } + + + /** + * Set the maximum amount pending items in the writeQueue for a given session. + * Changing the value will only take effect when new data is received for a + * connection, including existing connections. Default value is 5000 msgs. + * + * @param maximumConnectionBufferCount New buffer size. Must be > 0 + */ + public void setMaximumConnectionBufferCount(int maximumConnectionBufferCount) + { + this.maximumConnectionBufferCount = maximumConnectionBufferCount; + this.maximumConnectionBufferSize = 0; + } + + public void setMaximumConnectionBufferSize(long maximumConnectionBufferSize) + { + this.maximumConnectionBufferSize = maximumConnectionBufferSize; + this.maximumConnectionBufferCount = 0; + } + + /** + * Attach this filter to the specified filter chain. It will search for the ThreadPoolFilter, and attach itself + * before and after that filter. + * + * @param chain {@link IoFilterChain} to attach self to. + */ + public void attach(IoFilterChain chain) + { + String name = getThreadPoolFilterEntryName(chain.getAll()); + + chain.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit()); + } + + /** + * Attach this filter to the specified builder. It will search for the + * {@link ExecutorFilter}, and attach itself before and after that filter. + * + * @param builder {@link DefaultIoFilterChainBuilder} to attach self to. + */ + public void attach(DefaultIoFilterChainBuilder builder) + { + String name = getThreadPoolFilterEntryName(builder.getAll()); + + builder.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit()); + } + + private String getThreadPoolFilterEntryName(List entries) + { + Iterator i = entries.iterator(); + + while (i.hasNext()) + { + IoFilterChain.Entry entry = (IoFilterChain.Entry) i.next(); + + if (entry.getFilter().getClass().isAssignableFrom(ExecutorFilter.class)) + { + return entry.getName(); + } + } + + throw new IllegalStateException("Chain does not contain a ExecutorFilter"); + } + + + public class SendLimit extends IoFilterAdapter + { + public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception + { + try + { + waitTillSendAllowed(session); + } + catch (WriteBufferFullExeception wbfe) + { + nextFilter.exceptionCaught(session, wbfe); + } + + if (writeRequest.getMessage() instanceof ByteBuffer) + { + increasePendingWriteSize(session, (ByteBuffer) writeRequest.getMessage()); + } + + nextFilter.filterWrite(session, writeRequest); + } + + private void increasePendingWriteSize(IoSession session, ByteBuffer message) + { + synchronized (session) + { + Long pendingSize = getScheduledWriteBytes(session) + message.remaining(); + session.setAttribute(PENDING_SIZE, pendingSize); + } + } + + private boolean sendAllowed(IoSession session) + { + if (session.isClosing()) + { + return true; + } + + int lmswm = maximumConnectionBufferCount; + long lmswb = maximumConnectionBufferSize; + + return (lmswm == 0 || session.getScheduledWriteRequests() < lmswm) + && (lmswb == 0 || getScheduledWriteBytes(session) < lmswb); + } + + private long getScheduledWriteBytes(IoSession session) + { + synchronized (session) + { + Long i = (Long) session.getAttribute(PENDING_SIZE); + return null == i ? 0 : i; + } + } + + private void waitTillSendAllowed(IoSession session) + { + synchronized (_blockLock) + { + if (throwNotBlock) + { + throw new WriteBufferFullExeception(); + } + + _blockWaiters++; + + while (!sendAllowed(session)) + { + try + { + _blockLock.wait(); + } + catch (InterruptedException e) + { + // Ignore. + } + } + _blockWaiters--; + } + } + + public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception + { + if (message instanceof ByteBuffer) + { + decrementPendingWriteSize(session, (ByteBuffer) message); + } + notifyWaitingWriters(); + nextFilter.messageSent(session, message); + } + + private void decrementPendingWriteSize(IoSession session, ByteBuffer message) + { + synchronized (session) + { + session.setAttribute(PENDING_SIZE, getScheduledWriteBytes(session) - message.remaining()); + } + } + + private void notifyWaitingWriters() + { + synchronized (_blockLock) + { + if (_blockWaiters != 0) + { + _blockLock.notifyAll(); + } + } + + } + + }//SentLimit + + +} diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java new file mode 100644 index 0000000000..3f7e206cb4 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.codec; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; + +/** + * A {@link ProtocolDecoder} that cumulates the content of received + * buffers to a <em>cumulative buffer</em> to help users implement decoders. + * <p> + * If the received {@link ByteBuffer} is only a part of a message. + * decoders should cumulate received buffers to make a message complete or + * to postpone decoding until more buffers arrive. + * <p> + * Here is an example decoder that decodes CRLF terminated lines into + * <code>Command</code> objects: + * <pre> + * public class CRLFTerminatedCommandLineDecoder + * extends CumulativeProtocolDecoder { + * + * private Command parseCommand(ByteBuffer in) { + * // Convert the bytes in the specified buffer to a + * // Command object. + * ... + * } + * + * protected boolean doDecode(IoSession session, ByteBuffer in, + * ProtocolDecoderOutput out) + * throws Exception { + * + * // Remember the initial position. + * int start = in.position(); + * + * // Now find the first CRLF in the buffer. + * byte previous = 0; + * while (in.hasRemaining()) { + * byte current = in.get(); + * + * if (previous == '\r' && current == '\n') { + * // Remember the current position and limit. + * int position = in.position(); + * int limit = in.limit(); + * try { + * in.position(start); + * in.limit(position); + * // The bytes between in.position() and in.limit() + * // now contain a full CRLF terminated line. + * out.write(parseCommand(in.slice())); + * } finally { + * // Set the position to point right after the + * // detected line and set the limit to the old + * // one. + * in.position(position); + * in.limit(limit); + * } + * // Decoded one line; CumulativeProtocolDecoder will + * // call me again until I return false. So just + * // return true until there are no more lines in the + * // buffer. + * return true; + * } + * + * previous = current; + * } + * + * // Could not find CRLF in the buffer. Reset the initial + * // position to the one we recorded above. + * in.position(start); + * + * return false; + * } + * } + * </pre> + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter { + + private static final String BUFFER = OurCumulativeProtocolDecoder.class + .getName() + + ".Buffer"; + + /** + * Creates a new instance. + */ + protected OurCumulativeProtocolDecoder() { + } + + /** + * Cumulates content of <tt>in</tt> into internal buffer and forwards + * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. + * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> + * and the cumulative buffer is NOT compacted after decoding ends. + * + * @throws IllegalStateException if your <tt>doDecode()</tt> returned + * <tt>true</tt> not consuming the cumulative buffer. + */ + public void decode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception { + boolean usingSessionBuffer = true; + ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER); + // If we have a session buffer, append data to that; otherwise + // use the buffer read from the network directly. + if (buf != null) { + buf.put(in); + buf.flip(); + } else { + buf = in; + usingSessionBuffer = false; + } + + for (;;) { + int oldPos = buf.position(); + boolean decoded = doDecode(session, buf, out); + if (decoded) { + if (buf.position() == oldPos) { + throw new IllegalStateException( + "doDecode() can't return true when buffer is not consumed."); + } + + if (!buf.hasRemaining()) { + break; + } + } else { + break; + } + } + + + // if there is any data left that cannot be decoded, we store + // it in a buffer in the session and next time this decoder is + // invoked the session buffer gets appended to + if (buf.hasRemaining()) { + storeRemainingInSession(buf, session); + } else { + if (usingSessionBuffer) + removeSessionBuffer(session); + } + } + + /** + * Implement this method to consume the specified cumulative buffer and + * decode its content into message(s). + * + * @param in the cumulative buffer + * @return <tt>true</tt> if and only if there's more to decode in the buffer + * and you want to have <tt>doDecode</tt> method invoked again. + * Return <tt>false</tt> if remaining data is not enough to decode, + * then this method will be invoked again when more data is cumulated. + * @throws Exception if cannot decode <tt>in</tt>. + */ + protected abstract boolean doDecode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception; + + /** + * Releases the cumulative buffer used by the specified <tt>session</tt>. + * Please don't forget to call <tt>super.dispose( session )</tt> when + * you override this method. + */ + public void dispose(IoSession session) throws Exception { + removeSessionBuffer(session); + } + + private void removeSessionBuffer(IoSession session) { + ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER); + if (buf != null) { + buf.release(); + } + } + + private void storeRemainingInSession(ByteBuffer buf, IoSession session) { + ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity()); + remainingBuf.setAutoExpand(true); + remainingBuf.order(buf.order()); + remainingBuf.put(buf); + session.setAttribute(BUFFER, remainingBuf); + } +} diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java b/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java new file mode 100644 index 0000000000..b8c6f29720 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java @@ -0,0 +1,440 @@ +package org.apache.mina.filter.codec; + + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +import org.apache.mina.common.*; +import org.apache.mina.common.support.DefaultWriteFuture; +import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; +import org.apache.mina.util.SessionLog; +import org.apache.mina.util.Queue; + + +public class QpidProtocolCodecFilter extends IoFilterAdapter +{ + public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder"; + public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder"; + + private static final Class[] EMPTY_PARAMS = new Class[0]; + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] ); + + private final ProtocolCodecFactory factory; + + public QpidProtocolCodecFilter( ProtocolCodecFactory factory ) + { + if( factory == null ) + { + throw new NullPointerException( "factory" ); + } + this.factory = factory; + } + + public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder ) + { + if( encoder == null ) + { + throw new NullPointerException( "encoder" ); + } + if( decoder == null ) + { + throw new NullPointerException( "decoder" ); + } + + this.factory = new ProtocolCodecFactory() + { + public ProtocolEncoder getEncoder() + { + return encoder; + } + + public ProtocolDecoder getDecoder() + { + return decoder; + } + }; + } + + public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass ) + { + if( encoderClass == null ) + { + throw new NullPointerException( "encoderClass" ); + } + if( decoderClass == null ) + { + throw new NullPointerException( "decoderClass" ); + } + if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) ) + { + throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() ); + } + if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) ) + { + throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() ); + } + try + { + encoderClass.getConstructor( EMPTY_PARAMS ); + } + catch( NoSuchMethodException e ) + { + throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." ); + } + try + { + decoderClass.getConstructor( EMPTY_PARAMS ); + } + catch( NoSuchMethodException e ) + { + throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." ); + } + + this.factory = new ProtocolCodecFactory() + { + public ProtocolEncoder getEncoder() throws Exception + { + return ( ProtocolEncoder ) encoderClass.newInstance(); + } + + public ProtocolDecoder getDecoder() throws Exception + { + return ( ProtocolDecoder ) decoderClass.newInstance(); + } + }; + } + + public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception + { + if( parent.contains( ProtocolCodecFilter.class ) ) + { + throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." ); + } + } + + public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception + { + if( !( message instanceof ByteBuffer ) ) + { + nextFilter.messageReceived( session, message ); + return; + } + + ByteBuffer in = ( ByteBuffer ) message; + ProtocolDecoder decoder = getDecoder( session ); + ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); + + try + { + decoder.decode( session, in, decoderOut ); + } + catch( Throwable t ) + { + ProtocolDecoderException pde; + if( t instanceof ProtocolDecoderException ) + { + pde = ( ProtocolDecoderException ) t; + } + else + { + pde = new ProtocolDecoderException( t ); + } + pde.setHexdump( in.getHexDump() ); + throw pde; + } + finally + { + // Dispose the decoder if this session is connectionless. + if( session.getTransportType().isConnectionless() ) + { + disposeDecoder( session ); + } + + // Release the read buffer. + in.release(); + + decoderOut.flush(); + } + } + + public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception + { + if( message instanceof HiddenByteBuffer ) + { + return; + } + + if( !( message instanceof MessageByteBuffer ) ) + { + nextFilter.messageSent( session, message ); + return; + } + + nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message ); + } + + public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception + { + Object message = writeRequest.getMessage(); + if( message instanceof ByteBuffer ) + { + nextFilter.filterWrite( session, writeRequest ); + return; + } + + ProtocolEncoder encoder = getEncoder( session ); + ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest ); + + try + { + encoder.encode( session, message, encoderOut ); + encoderOut.flush(); + nextFilter.filterWrite( + session, + new IoFilter.WriteRequest( + new MessageByteBuffer( writeRequest.getMessage() ), + writeRequest.getFuture(), writeRequest.getDestination() ) ); + } + catch( Throwable t ) + { + ProtocolEncoderException pee; + if( t instanceof ProtocolEncoderException ) + { + pee = ( ProtocolEncoderException ) t; + } + else + { + pee = new ProtocolEncoderException( t ); + } + throw pee; + } + finally + { + // Dispose the encoder if this session is connectionless. + if( session.getTransportType().isConnectionless() ) + { + disposeEncoder( session ); + } + } + } + + public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception + { + // Call finishDecode() first when a connection is closed. + ProtocolDecoder decoder = getDecoder( session ); + ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); + try + { + decoder.finishDecode( session, decoderOut ); + } + catch( Throwable t ) + { + ProtocolDecoderException pde; + if( t instanceof ProtocolDecoderException ) + { + pde = ( ProtocolDecoderException ) t; + } + else + { + pde = new ProtocolDecoderException( t ); + } + throw pde; + } + finally + { + // Dispose all. + disposeEncoder( session ); + disposeDecoder( session ); + + decoderOut.flush(); + } + + nextFilter.sessionClosed( session ); + } + + private ProtocolEncoder getEncoder( IoSession session ) throws Exception + { + ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER ); + if( encoder == null ) + { + encoder = factory.getEncoder(); + session.setAttribute( ENCODER, encoder ); + } + return encoder; + } + + private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest ) + { + return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest ); + } + + private ProtocolDecoder getDecoder( IoSession session ) throws Exception + { + ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER ); + if( decoder == null ) + { + decoder = factory.getDecoder(); + session.setAttribute( DECODER, decoder ); + } + return decoder; + } + + private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter ) + { + return new SimpleProtocolDecoderOutput( session, nextFilter ); + } + + private void disposeEncoder( IoSession session ) + { + ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER ); + if( encoder == null ) + { + return; + } + + try + { + encoder.dispose( session ); + } + catch( Throwable t ) + { + SessionLog.warn( + session, + "Failed to dispose: " + encoder.getClass().getName() + + " (" + encoder + ')' ); + } + } + + private void disposeDecoder( IoSession session ) + { + ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER ); + if( decoder == null ) + { + return; + } + + try + { + decoder.dispose( session ); + } + catch( Throwable t ) + { + SessionLog.warn( + session, + "Falied to dispose: " + decoder.getClass().getName() + + " (" + decoder + ')' ); + } + } + + private static class HiddenByteBuffer extends ByteBufferProxy + { + private HiddenByteBuffer( ByteBuffer buf ) + { + super( buf ); + } + } + + private static class MessageByteBuffer extends ByteBufferProxy + { + private final Object message; + + private MessageByteBuffer( Object message ) + { + super( EMPTY_BUFFER ); + this.message = message; + } + + public void acquire() + { + // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message + } + + public void release() + { + // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message + } + } + + private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput + { + private ByteBuffer buffer; + + private final IoSession session; + private final IoFilter.NextFilter nextFilter; + private final IoFilter.WriteRequest writeRequest; + + public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest ) + { + this.session = session; + this.nextFilter = nextFilter; + this.writeRequest = writeRequest; + } + + + + public void write( ByteBuffer buf ) + { + if(buffer != null) + { + flush(); + } + buffer = buf; + } + + public void mergeAll() + { + } + + public WriteFuture flush() + { + WriteFuture future = null; + if( buffer == null ) + { + return null; + } + else + { + ByteBuffer buf = buffer; + // Flush only when the buffer has remaining. + if( buf.hasRemaining() ) + { + future = doFlush( buf ); + } + + } + + return future; + } + + + protected WriteFuture doFlush( ByteBuffer buf ) + { + WriteFuture future = new DefaultWriteFuture( session ); + nextFilter.filterWrite( + session, + new IoFilter.WriteRequest( + buf, + future, writeRequest.getDestination() ) ); + return future; + } + } +} + diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java new file mode 100644 index 0000000000..e5360d32e0 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.support.BaseIoAcceptor; +import org.apache.mina.util.Queue; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.util.NamePreservingRunnable; +import edu.emory.mathcs.backport.java.util.concurrent.Executor; + +/** + * {@link IoAcceptor} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +public class MultiThreadSocketAcceptor extends SocketAcceptor +{ + /** + * @noinspection StaticNonFinalField + */ + private static volatile int nextId = 0; + + private final Executor executor; + private final Object lock = new Object(); + private final int id = nextId ++; + private final String threadName = "SocketAcceptor-" + id; + private final Map channels = new HashMap(); + + private final Queue registerQueue = new Queue(); + private final Queue cancelQueue = new Queue(); + + private final MultiThreadSocketIoProcessor[] ioProcessors; + private final int processorCount; + + /** + * @noinspection FieldAccessedSynchronizedAndUnsynchronized + */ + private Selector selector; + private Worker worker; + private int processorDistributor = 0; + + /** + * Create an acceptor with a single processing thread using a NewThreadExecutor + */ + public MultiThreadSocketAcceptor() + { + this( 1, new NewThreadExecutor() ); + } + + /** + * Create an acceptor with the desired number of processing threads + * + * @param processorCount Number of processing threads + * @param executor Executor to use for launching threads + */ + public MultiThreadSocketAcceptor( int processorCount, Executor executor ) + { + if( processorCount < 1 ) + { + throw new IllegalArgumentException( "Must have at least one processor" ); + } + + this.executor = executor; + this.processorCount = processorCount; + ioProcessors = new MultiThreadSocketIoProcessor[processorCount]; + + for( int i = 0; i < processorCount; i++ ) + { + ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor ); + } + } + + + /** + * Binds to the specified <code>address</code> and handles incoming connections with the specified + * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property. + * + * @throws IOException if failed to bind + */ + public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException + { + if( handler == null ) + { + throw new NullPointerException( "handler" ); + } + + if( address != null && !( address instanceof InetSocketAddress ) ) + { + throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() ); + } + + if( config == null ) + { + config = getDefaultConfig(); + } + + RegistrationRequest request = new RegistrationRequest( address, handler, config ); + + synchronized( registerQueue ) + { + registerQueue.push( request ); + } + + startupWorker(); + + selector.wakeup(); + + synchronized( request ) + { + while( !request.done ) + { + try + { + request.wait(); + } + catch( InterruptedException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + + if( request.exception != null ) + { + throw request.exception; + } + } + + + private synchronized void startupWorker() throws IOException + { + synchronized( lock ) + { + if( worker == null ) + { + selector = Selector.open(); + worker = new Worker(); + + executor.execute( new NamePreservingRunnable( worker ) ); + } + } + } + + public void unbind( SocketAddress address ) + { + if( address == null ) + { + throw new NullPointerException( "address" ); + } + + CancellationRequest request = new CancellationRequest( address ); + + try + { + startupWorker(); + } + catch( IOException e ) + { + // IOException is thrown only when Worker thread is not + // running and failed to open a selector. We simply throw + // IllegalArgumentException here because we can simply + // conclude that nothing is bound to the selector. + throw new IllegalArgumentException( "Address not bound: " + address ); + } + + synchronized( cancelQueue ) + { + cancelQueue.push( request ); + } + + selector.wakeup(); + + synchronized( request ) + { + while( !request.done ) + { + try + { + request.wait(); + } + catch( InterruptedException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + + if( request.exception != null ) + { + request.exception.fillInStackTrace(); + + throw request.exception; + } + } + + + private class Worker implements Runnable + { + public void run() + { + Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName ); + + for( ; ; ) + { + try + { + int nKeys = selector.select(); + + registerNew(); + + if( nKeys > 0 ) + { + processSessions( selector.selectedKeys() ); + } + + cancelKeys(); + + if( selector.keys().isEmpty() ) + { + synchronized( lock ) + { + if( selector.keys().isEmpty() && + registerQueue.isEmpty() && + cancelQueue.isEmpty() ) + { + worker = null; + try + { + selector.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + finally + { + selector = null; + } + break; + } + } + } + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + + try + { + Thread.sleep( 1000 ); + } + catch( InterruptedException e1 ) + { + ExceptionMonitor.getInstance().exceptionCaught( e1 ); + } + } + } + } + + private void processSessions( Set keys ) throws IOException + { + Iterator it = keys.iterator(); + while( it.hasNext() ) + { + SelectionKey key = ( SelectionKey ) it.next(); + + it.remove(); + + if( !key.isAcceptable() ) + { + continue; + } + + ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel(); + + SocketChannel ch = ssc.accept(); + + if( ch == null ) + { + continue; + } + + boolean success = false; + try + { + + RegistrationRequest req = ( RegistrationRequest ) key.attachment(); + + MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl( + MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(), + req.config, ch, req.handler, req.address ); + + // New Interface +// SocketSessionImpl session = new SocketSessionImpl( +// SocketAcceptor.this, nextProcessor(), getListeners(), +// req.config, ch, req.handler, req.address ); + + + getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); + req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); + req.config.getThreadModel().buildFilterChain( session.getFilterChain() ); + session.getIoProcessor().addNew( session ); + success = true; + } + catch( Throwable t ) + { + ExceptionMonitor.getInstance().exceptionCaught( t ); + } + finally + { + if( !success ) + { + ch.close(); + } + } + } + } + } + + private MultiThreadSocketIoProcessor nextProcessor() + { + return ioProcessors[processorDistributor++ % processorCount]; + } + + + private void registerNew() + { + if( registerQueue.isEmpty() ) + { + return; + } + + for( ; ; ) + { + RegistrationRequest req; + + synchronized( registerQueue ) + { + req = ( RegistrationRequest ) registerQueue.pop(); + } + + if( req == null ) + { + break; + } + + ServerSocketChannel ssc = null; + + try + { + ssc = ServerSocketChannel.open(); + ssc.configureBlocking( false ); + + // Configure the server socket, + SocketAcceptorConfig cfg; + if( req.config instanceof SocketAcceptorConfig ) + { + cfg = ( SocketAcceptorConfig ) req.config; + } + else + { + cfg = ( SocketAcceptorConfig ) getDefaultConfig(); + } + + ssc.socket().setReuseAddress( cfg.isReuseAddress() ); + ssc.socket().setReceiveBufferSize( + ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() ); + + // and bind. + ssc.socket().bind( req.address, cfg.getBacklog() ); + if( req.address == null || req.address.getPort() == 0 ) + { + req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress(); + } + ssc.register( selector, SelectionKey.OP_ACCEPT, req ); + + synchronized( channels ) + { + channels.put( req.address, ssc ); + } + + getListeners().fireServiceActivated( + this, req.address, req.handler, req.config ); + } + catch( IOException e ) + { + req.exception = e; + } + finally + { + synchronized( req ) + { + req.done = true; + + req.notifyAll(); + } + + if( ssc != null && req.exception != null ) + { + try + { + ssc.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + } + } + + + private void cancelKeys() + { + if( cancelQueue.isEmpty() ) + { + return; + } + + for( ; ; ) + { + CancellationRequest request; + + synchronized( cancelQueue ) + { + request = ( CancellationRequest ) cancelQueue.pop(); + } + + if( request == null ) + { + break; + } + + ServerSocketChannel ssc; + synchronized( channels ) + { + ssc = ( ServerSocketChannel ) channels.remove( request.address ); + } + + // close the channel + try + { + if( ssc == null ) + { + request.exception = new IllegalArgumentException( "Address not bound: " + request.address ); + } + else + { + SelectionKey key = ssc.keyFor( selector ); + request.registrationRequest = ( RegistrationRequest ) key.attachment(); + key.cancel(); + + selector.wakeup(); // wake up again to trigger thread death + + ssc.close(); + } + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + finally + { + synchronized( request ) + { + request.done = true; + request.notifyAll(); + } + + if( request.exception == null ) + { + getListeners().fireServiceDeactivated( + this, request.address, + request.registrationRequest.handler, + request.registrationRequest.config ); + } + } + } + } + + private static class RegistrationRequest + { + private InetSocketAddress address; + private final IoHandler handler; + private final IoServiceConfig config; + private IOException exception; + private boolean done; + + private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config ) + { + this.address = ( InetSocketAddress ) address; + this.handler = handler; + this.config = config; + } + } + + + private static class CancellationRequest + { + private final SocketAddress address; + private boolean done; + private RegistrationRequest registrationRequest; + private RuntimeException exception; + + private CancellationRequest( SocketAddress address ) + { + this.address = address; + } + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java new file mode 100644 index 0000000000..b1612840db --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoConnectorConfig; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.util.NamePreservingRunnable; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.util.Queue; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; + +/** + * {@link IoConnector} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +public class MultiThreadSocketConnector extends SocketConnector +{ + /** @noinspection StaticNonFinalField */ + private static volatile int nextId = 0; + + private final Object lock = new Object(); + private final int id = nextId++; + private final String threadName = "SocketConnector-" + id; + private SocketConnectorConfig defaultConfig = new SocketConnectorConfig(); + private final Queue connectQueue = new Queue(); + private final MultiThreadSocketIoProcessor[] ioProcessors; + private final int processorCount; + private final Executor executor; + + /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ + private Selector selector; + private Worker worker; + private int processorDistributor = 0; + private int workerTimeout = 60; // 1 min. + + /** Create a connector with a single processing thread using a NewThreadExecutor */ + public MultiThreadSocketConnector() + { + this(1, new NewThreadExecutor()); + } + + /** + * Create a connector with the desired number of processing threads + * + * @param processorCount Number of processing threads + * @param executor Executor to use for launching threads + */ + public MultiThreadSocketConnector(int processorCount, Executor executor) + { + if (processorCount < 1) + { + throw new IllegalArgumentException("Must have at least one processor"); + } + + this.executor = executor; + this.processorCount = processorCount; + ioProcessors = new MultiThreadSocketIoProcessor[processorCount]; + + for (int i = 0; i < processorCount; i++) + { + ioProcessors[i] = new MultiThreadSocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor); + } + } + + /** + * How many seconds to keep the connection thread alive between connection requests + * + * @return Number of seconds to keep connection thread alive + */ + public int getWorkerTimeout() + { + return workerTimeout; + } + + /** + * Set how many seconds the connection worker thread should remain alive once idle before terminating itself. + * + * @param workerTimeout Number of seconds to keep thread alive. Must be >=0 + */ + public void setWorkerTimeout(int workerTimeout) + { + if (workerTimeout < 0) + { + throw new IllegalArgumentException("Must be >= 0"); + } + this.workerTimeout = workerTimeout; + } + + public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) + { + return connect(address, null, handler, config); + } + + public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, + IoHandler handler, IoServiceConfig config) + { + if (address == null) + { + throw new NullPointerException("address"); + } + if (handler == null) + { + throw new NullPointerException("handler"); + } + + if (!(address instanceof InetSocketAddress)) + { + throw new IllegalArgumentException("Unexpected address type: " + + address.getClass()); + } + + if (localAddress != null && !(localAddress instanceof InetSocketAddress)) + { + throw new IllegalArgumentException("Unexpected local address type: " + + localAddress.getClass()); + } + + if (config == null) + { + config = getDefaultConfig(); + } + + SocketChannel ch = null; + boolean success = false; + try + { + ch = SocketChannel.open(); + ch.socket().setReuseAddress(true); + if (localAddress != null) + { + ch.socket().bind(localAddress); + } + + ch.configureBlocking(false); + + if (ch.connect(address)) + { + DefaultConnectFuture future = new DefaultConnectFuture(); + newSession(ch, handler, config, future); + success = true; + return future; + } + + success = true; + } + catch (IOException e) + { + return DefaultConnectFuture.newFailedFuture(e); + } + finally + { + if (!success && ch != null) + { + try + { + ch.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + + ConnectionRequest request = new ConnectionRequest(ch, handler, config); + synchronized (lock) + { + try + { + startupWorker(); + } + catch (IOException e) + { + try + { + ch.close(); + } + catch (IOException e2) + { + ExceptionMonitor.getInstance().exceptionCaught(e2); + } + + return DefaultConnectFuture.newFailedFuture(e); + } + } + + synchronized (connectQueue) + { + connectQueue.push(request); + } + selector.wakeup(); + + return request; + } + + private synchronized void startupWorker() throws IOException + { + if (worker == null) + { + selector = Selector.open(); + worker = new Worker(); + executor.execute(new NamePreservingRunnable(worker)); + } + } + + private void registerNew() + { + if (connectQueue.isEmpty()) + { + return; + } + + for (; ;) + { + ConnectionRequest req; + synchronized (connectQueue) + { + req = (ConnectionRequest) connectQueue.pop(); + } + + if (req == null) + { + break; + } + + SocketChannel ch = req.channel; + try + { + ch.register(selector, SelectionKey.OP_CONNECT, req); + } + catch (IOException e) + { + req.setException(e); + } + } + } + + private void processSessions(Set keys) + { + Iterator it = keys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + if (!key.isConnectable()) + { + continue; + } + + SocketChannel ch = (SocketChannel) key.channel(); + ConnectionRequest entry = (ConnectionRequest) key.attachment(); + + boolean success = false; + try + { + ch.finishConnect(); + newSession(ch, entry.handler, entry.config, entry); + success = true; + } + catch (Throwable e) + { + entry.setException(e); + } + finally + { + key.cancel(); + if (!success) + { + try + { + ch.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + } + + keys.clear(); + } + + private void processTimedOutSessions(Set keys) + { + long currentTime = System.currentTimeMillis(); + Iterator it = keys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + if (!key.isValid()) + { + continue; + } + + ConnectionRequest entry = (ConnectionRequest) key.attachment(); + + if (currentTime >= entry.deadline) + { + entry.setException(new ConnectException()); + try + { + key.channel().close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + key.cancel(); + } + } + } + } + + private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) + throws IOException + { + MultiThreadSocketSessionImpl session = + new MultiThreadSocketSessionImpl(this, nextProcessor(), getListeners(), + config, ch, handler, ch.socket().getRemoteSocketAddress()); + + //new interface +// SocketSessionImpl session = new SocketSessionImpl( +// this, nextProcessor(), getListeners(), +// config, ch, handler, ch.socket().getRemoteSocketAddress() ); + try + { + getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getThreadModel().buildFilterChain(session.getFilterChain()); + } + catch (Throwable e) + { + throw (IOException) new IOException("Failed to create a session.").initCause(e); + } + + // Set the ConnectFuture of the specified session, which will be + // removed and notified by AbstractIoFilterChain eventually. + session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); + + // Forward the remaining process to the SocketIoProcessor. + session.getIoProcessor().addNew(session); + } + + private MultiThreadSocketIoProcessor nextProcessor() + { + return ioProcessors[processorDistributor++ % processorCount]; + } + + private class Worker implements Runnable + { + private long lastActive = System.currentTimeMillis(); + + public void run() + { + Thread.currentThread().setName(MultiThreadSocketConnector.this.threadName); + + for (; ;) + { + try + { + int nKeys = selector.select(1000); + + registerNew(); + + if (nKeys > 0) + { + processSessions(selector.selectedKeys()); + } + + processTimedOutSessions(selector.keys()); + + if (selector.keys().isEmpty()) + { + if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) + { + synchronized (lock) + { + if (selector.keys().isEmpty() && + connectQueue.isEmpty()) + { + worker = null; + try + { + selector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + selector = null; + } + break; + } + } + } + } + else + { + lastActive = System.currentTimeMillis(); + } + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + ExceptionMonitor.getInstance().exceptionCaught(e1); + } + } + } + } + } + + private class ConnectionRequest extends DefaultConnectFuture + { + private final SocketChannel channel; + private final long deadline; + private final IoHandler handler; + private final IoServiceConfig config; + + private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config) + { + this.channel = channel; + long timeout; + if (config instanceof IoConnectorConfig) + { + timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis(); + } + else + { + timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis(); + } + this.deadline = System.currentTimeMillis() + timeout; + this.handler = handler; + this.config = config; + } + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java new file mode 100644 index 0000000000..67b8c8d820 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import java.io.IOException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.util.Queue; + +/** + * An {@link IoFilterChain} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + */ +class MultiThreadSocketFilterChain extends AbstractIoFilterChain { + + MultiThreadSocketFilterChain( IoSession parent ) + { + super( parent ); + } + + protected void doWrite( IoSession session, WriteRequest writeRequest ) + { + MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session; + Queue writeRequestQueue = s.getWriteRequestQueue(); + + // SocketIoProcessor.doFlush() will reset it after write is finished + // because the buffer will be passed with messageSent event. + ( ( ByteBuffer ) writeRequest.getMessage() ).mark(); + synchronized( writeRequestQueue ) + { + writeRequestQueue.push( writeRequest ); + if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() ) + { + // Notify SocketIoProcessor only when writeRequestQueue was empty. + s.getIoProcessor().flush( s ); + } + } + } + + protected void doClose( IoSession session ) throws IOException + { + MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session; + s.getIoProcessor().remove( s ); + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java new file mode 100644 index 0000000000..c23ad8686f --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java @@ -0,0 +1,1026 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; +import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.WriteTimeoutException; +import org.apache.mina.util.IdentityHashSet; +import org.apache.mina.util.NamePreservingRunnable; +import org.apache.mina.util.Queue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $, + */ +class MultiThreadSocketIoProcessor extends SocketIoProcessor +{ + Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class); + Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader"); + Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer"); + + private static final long SELECTOR_TIMEOUT = 1000L; + + private int MAX_READ_BYTES_PER_SESSION = 524288; //512K + private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K + + private final Object readLock = new Object(); + private final Object writeLock = new Object(); + + private final String threadName; + private final Executor executor; + + private ReentrantLock trafficMaskUpdateLock = new ReentrantLock(); + + /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ + private volatile Selector selector, writeSelector; + + private final Queue newSessions = new Queue(); + private final Queue removingSessions = new Queue(); + private final BlockingQueue flushingSessions = new LinkedBlockingQueue(); + private final IdentityHashSet flushingSessionsSet = new IdentityHashSet(); + + private final Queue trafficControllingSessions = new Queue(); + + private ReadWorker readWorker; + private WriteWorker writeWorker; + private long lastIdleReadCheckTime = System.currentTimeMillis(); + private long lastIdleWriteCheckTime = System.currentTimeMillis(); + + MultiThreadSocketIoProcessor(String threadName, Executor executor) + { + super(threadName, executor); + this.threadName = threadName; + this.executor = executor; + } + + void addNew(SocketSessionImpl session) throws IOException + { + synchronized (newSessions) + { + newSessions.push(session); + } + + startupWorker(); + + selector.wakeup(); + writeSelector.wakeup(); + } + + void remove(SocketSessionImpl session) throws IOException + { + scheduleRemove(session); + startupWorker(); + selector.wakeup(); + } + + private void startupWorker() throws IOException + { + synchronized (readLock) + { + if (readWorker == null) + { + selector = Selector.open(); + readWorker = new ReadWorker(); + executor.execute(new NamePreservingRunnable(readWorker)); + } + } + + synchronized (writeLock) + { + if (writeWorker == null) + { + writeSelector = Selector.open(); + writeWorker = new WriteWorker(); + executor.execute(new NamePreservingRunnable(writeWorker)); + } + } + + } + + void flush(SocketSessionImpl session) + { + scheduleFlush(session); + Selector selector = this.writeSelector; + + if (selector != null) + { + selector.wakeup(); + } + } + + void updateTrafficMask(SocketSessionImpl session) + { + scheduleTrafficControl(session); + Selector selector = this.selector; + if (selector != null) + { + selector.wakeup(); + } + } + + private void scheduleRemove(SocketSessionImpl session) + { + synchronized (removingSessions) + { + removingSessions.push(session); + } + } + + private void scheduleFlush(SocketSessionImpl session) + { + synchronized (flushingSessionsSet) + { + //if flushingSessions grows to contain Integer.MAX_VALUE sessions + // then this will fail. + if (flushingSessionsSet.add(session)) + { + flushingSessions.offer(session); + } + } + } + + private void scheduleTrafficControl(SocketSessionImpl session) + { + synchronized (trafficControllingSessions) + { + trafficControllingSessions.push(session); + } + } + + private void doAddNewReader() throws InterruptedException + { + if (newSessions.isEmpty()) + { + return; + } + + for (; ;) + { + MultiThreadSocketSessionImpl session; + + synchronized (newSessions) + { + session = (MultiThreadSocketSessionImpl) newSessions.peek(); + } + + if (session == null) + { + break; + } + + SocketChannel ch = session.getChannel(); + + + try + { + + ch.configureBlocking(false); + session.setSelectionKey(ch.register(selector, + SelectionKey.OP_READ, + session)); + + //System.out.println("ReadDebug:"+"Awaiting Registration"); + session.awaitRegistration(); + sessionCreated(session); + } + catch (IOException e) + { + // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute + // and call ConnectFuture.setException(). + session.getFilterChain().fireExceptionCaught(session, e); + } + } + } + + + private void doAddNewWrite() throws InterruptedException + { + if (newSessions.isEmpty()) + { + return; + } + + for (; ;) + { + MultiThreadSocketSessionImpl session; + + synchronized (newSessions) + { + session = (MultiThreadSocketSessionImpl) newSessions.peek(); + } + + if (session == null) + { + break; + } + + SocketChannel ch = session.getChannel(); + + try + { + ch.configureBlocking(false); + synchronized (flushingSessionsSet) + { + flushingSessionsSet.add(session); + } + + session.setWriteSelectionKey(ch.register(writeSelector, + SelectionKey.OP_WRITE, + session)); + + //System.out.println("WriteDebug:"+"Awaiting Registration"); + session.awaitRegistration(); + sessionCreated(session); + } + catch (IOException e) + { + + // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute + // and call ConnectFuture.setException(). + session.getFilterChain().fireExceptionCaught(session, e); + } + } + } + + + private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException + { + MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; + synchronized (newSessions) + { + if (!session.created()) + { + _logger.debug("Popping new session"); + newSessions.pop(); + + // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here + // in AbstractIoFilterChain.fireSessionOpened(). + session.getServiceListeners().fireSessionCreated(session); + + session.doneCreation(); + } + } + } + + private void doRemove() + { + if (removingSessions.isEmpty()) + { + return; + } + + for (; ;) + { + MultiThreadSocketSessionImpl session; + + synchronized (removingSessions) + { + session = (MultiThreadSocketSessionImpl) removingSessions.pop(); + } + + if (session == null) + { + break; + } + + SocketChannel ch = session.getChannel(); + SelectionKey key = session.getReadSelectionKey(); + SelectionKey writeKey = session.getWriteSelectionKey(); + + // Retry later if session is not yet fully initialized. + // (In case that Session.close() is called before addSession() is processed) + if (key == null || writeKey == null) + { + scheduleRemove(session); + break; + } + // skip if channel is already closed + if (!key.isValid() || !writeKey.isValid()) + { + continue; + } + + try + { + //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session)); + synchronized (readLock) + { + key.cancel(); + } + synchronized (writeLock) + { + writeKey.cancel(); + } + ch.close(); + } + catch (IOException e) + { + session.getFilterChain().fireExceptionCaught(session, e); + } + finally + { + releaseWriteBuffers(session); + session.getServiceListeners().fireSessionDestroyed(session); + } + } + } + + private void processRead(Set selectedKeys) + { + Iterator it = selectedKeys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment(); + + synchronized (readLock) + { + if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable()) + { + read(session); + } + } + + } + + selectedKeys.clear(); + } + + private void processWrite(Set selectedKeys) + { + Iterator it = selectedKeys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); + + synchronized (writeLock) + { + if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable()) + { + + // Clear OP_WRITE + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + + synchronized (flushingSessionsSet) + { + flushingSessions.offer(session); + } + } + } + } + + selectedKeys.clear(); + } + + private void read(SocketSessionImpl session) + { + + //if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session)); + } + + int totalReadBytes = 0; + + while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION) + { + ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); + SocketChannel ch = session.getChannel(); + + try + { + buf.clear(); + + int readBytes = 0; + int ret; + + try + { + while ((ret = ch.read(buf.buf())) > 0) + { + readBytes += ret; + totalReadBytes += ret; + } + } + finally + { + buf.flip(); + } + + + if (readBytes > 0) + { + session.increaseReadBytes(readBytes); + + session.getFilterChain().fireMessageReceived(session, buf); + buf = null; + } + + if (ret <= 0) + { + if (ret == 0) + { + if (readBytes == session.getReadBufferSize()) + { + continue; + } + } + else + { + scheduleRemove(session); + } + + break; + } + } + catch (Throwable e) + { + if (e instanceof IOException) + { + scheduleRemove(session); + } + session.getFilterChain().fireExceptionCaught(session, e); + + //Stop Reading this session. + return; + } + finally + { + if (buf != null) + { + buf.release(); + } + } + }//for + + // if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes); + } + } + + + private void notifyReadIdleness() + { + // process idle sessions + long currentTime = System.currentTimeMillis(); + if ((currentTime - lastIdleReadCheckTime) >= 1000) + { + lastIdleReadCheckTime = currentTime; + Set keys = selector.keys(); + if (keys != null) + { + for (Iterator it = keys.iterator(); it.hasNext();) + { + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); + notifyReadIdleness(session, currentTime); + } + } + } + } + + private void notifyWriteIdleness() + { + // process idle sessions + long currentTime = System.currentTimeMillis(); + if ((currentTime - lastIdleWriteCheckTime) >= 1000) + { + lastIdleWriteCheckTime = currentTime; + Set keys = writeSelector.keys(); + if (keys != null) + { + for (Iterator it = keys.iterator(); it.hasNext();) + { + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); + notifyWriteIdleness(session, currentTime); + } + } + } + } + + private void notifyReadIdleness(SocketSessionImpl session, long currentTime) + { + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), + IdleStatus.BOTH_IDLE, + Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.READER_IDLE), + IdleStatus.READER_IDLE, + Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); + + notifyWriteTimeout(session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime()); + } + + private void notifyWriteIdleness(SocketSessionImpl session, long currentTime) + { + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), + IdleStatus.BOTH_IDLE, + Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), + IdleStatus.WRITER_IDLE, + Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); + + notifyWriteTimeout(session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime()); + } + + private void notifyIdleness0(SocketSessionImpl session, long currentTime, + long idleTime, IdleStatus status, + long lastIoTime) + { + if (idleTime > 0 && lastIoTime != 0 + && (currentTime - lastIoTime) >= idleTime) + { + session.increaseIdleCount(status); + session.getFilterChain().fireSessionIdle(session, status); + } + } + + private void notifyWriteTimeout(SocketSessionImpl session, + long currentTime, + long writeTimeout, long lastIoTime) + { + + MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session; + SelectionKey key = sesh.getWriteSelectionKey(); + + synchronized (writeLock) + { + if (writeTimeout > 0 + && (currentTime - lastIoTime) >= writeTimeout + && key != null && key.isValid() + && (key.interestOps() & SelectionKey.OP_WRITE) != 0) + { + session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException()); + } + } + } + + private SocketSessionImpl getNextFlushingSession() + { + return (SocketSessionImpl) flushingSessions.poll(); + } + + private void releaseSession(SocketSessionImpl session) + { + synchronized (session.getWriteRequestQueue()) + { + synchronized (flushingSessionsSet) + { + if (session.getScheduledWriteRequests() > 0) + { + if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session)); + } + flushingSessions.offer(session); + } + else + { + if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session)); + } + flushingSessionsSet.remove(session); + } + } + } + } + + private void releaseWriteBuffers(SocketSessionImpl session) + { + Queue writeRequestQueue = session.getWriteRequestQueue(); + WriteRequest req; + + //Should this be synchronized? + synchronized (writeRequestQueue) + { + while ((req = (WriteRequest) writeRequestQueue.pop()) != null) + { + try + { + ((ByteBuffer) req.getMessage()).release(); + } + catch (IllegalStateException e) + { + session.getFilterChain().fireExceptionCaught(session, e); + } + finally + { + req.getFuture().setWritten(false); + } + } + } + } + + private void doFlush() + { + MultiThreadSocketSessionImpl session; + + while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null) + { + if (!session.isConnected()) + { + releaseWriteBuffers(session); + releaseSession(session); + continue; + } + + SelectionKey key = session.getWriteSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.write() is called before addSession() is processed) + if (key == null) + { + scheduleFlush(session); + releaseSession(session); + continue; + } + // skip if channel is already closed + if (!key.isValid()) + { + releaseSession(session); + continue; + } + + try + { + if (doFlush(session)) + { + releaseSession(session); + } + } + catch (IOException e) + { + releaseSession(session); + scheduleRemove(session); + session.getFilterChain().fireExceptionCaught(session, e); + } + + } + + } + + private boolean doFlush(SocketSessionImpl sessionParam) throws IOException + { + MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; + // Clear OP_WRITE + SelectionKey key = session.getWriteSelectionKey(); + synchronized (writeLock) + { + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + } + SocketChannel ch = session.getChannel(); + Queue writeRequestQueue = session.getWriteRequestQueue(); + + long totalFlushedBytes = 0; + while (true) + { + WriteRequest req; + + synchronized (writeRequestQueue) + { + req = (WriteRequest) writeRequestQueue.first(); + } + + if (req == null) + { + break; + } + + ByteBuffer buf = (ByteBuffer) req.getMessage(); + if (buf.remaining() == 0) + { + synchronized (writeRequestQueue) + { + writeRequestQueue.pop(); + } + + session.increaseWrittenMessages(); + + buf.reset(); + session.getFilterChain().fireMessageSent(session, req); + continue; + } + + + int writtenBytes = 0; + + // Reported as DIRMINA-362 + //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. + if (key.isWritable()) + { + writtenBytes = ch.write(buf.buf()); + totalFlushedBytes += writtenBytes; + } + + if (writtenBytes > 0) + { + session.increaseWrittenBytes(writtenBytes); + } + + if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) + { + // Kernel buffer is full + synchronized (writeLock) + { + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + } + if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); + } + return false; + } + } + + if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); + } + return true; + } + + private void doUpdateTrafficMask() + { + if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked()) + { + return; + } + + // Synchronize over entire operation as this method should be called + // from both read and write thread and we don't want the order of the + // updates to get changed. + trafficMaskUpdateLock.lock(); + try + { + for (; ;) + { + MultiThreadSocketSessionImpl session; + + session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop(); + + if (session == null) + { + break; + } + + SelectionKey key = session.getReadSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.suspend??() or session.resume??() is + // called before addSession() is processed) + if (key == null) + { + scheduleTrafficControl(session); + break; + } + // skip if channel is already closed + if (!key.isValid()) + { + continue; + } + + // The normal is OP_READ and, if there are write requests in the + // session's write queue, set OP_WRITE to trigger flushing. + + //Sset to Read and Write if there is nothing then the cost + // is one loop through the flusher. + int ops = SelectionKey.OP_READ; + + // Now mask the preferred ops with the mask of the current session + int mask = session.getTrafficMask().getInterestOps(); + synchronized (readLock) + { + key.interestOps(ops & mask); + } + //Change key to the WriteSelection Key + key = session.getWriteSelectionKey(); + if (key != null && key.isValid()) + { + Queue writeRequestQueue = session.getWriteRequestQueue(); + synchronized (writeRequestQueue) + { + if (!writeRequestQueue.isEmpty()) + { + ops = SelectionKey.OP_WRITE; + synchronized (writeLock) + { + key.interestOps(ops & mask); + } + } + } + } + } + } + finally + { + trafficMaskUpdateLock.unlock(); + } + + } + + private class WriteWorker implements Runnable + { + + public void run() + { + Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer"); + + //System.out.println("WriteDebug:"+"Startup"); + for (; ;) + { + try + { + int nKeys = writeSelector.select(SELECTOR_TIMEOUT); + + doAddNewWrite(); + doUpdateTrafficMask(); + + if (nKeys > 0) + { + //System.out.println("WriteDebug:"+nKeys + " keys from writeselector"); + processWrite(writeSelector.selectedKeys()); + } + else + { + //System.out.println("WriteDebug:"+"No keys from writeselector"); + } + + doRemove(); + notifyWriteIdleness(); + + if (flushingSessionsSet.size() > 0) + { + doFlush(); + } + + if (writeSelector.keys().isEmpty()) + { + synchronized (writeLock) + { + + if (writeSelector.keys().isEmpty() && newSessions.isEmpty()) + { + writeWorker = null; + try + { + writeSelector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + writeSelector = null; + } + + break; + } + } + } + + } + catch (Throwable t) + { + ExceptionMonitor.getInstance().exceptionCaught(t); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + ExceptionMonitor.getInstance().exceptionCaught(e1); + } + } + } + //System.out.println("WriteDebug:"+"Shutdown"); + } + + } + + private class ReadWorker implements Runnable + { + + public void run() + { + Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader"); + + //System.out.println("ReadDebug:"+"Startup"); + for (; ;) + { + try + { + int nKeys = selector.select(SELECTOR_TIMEOUT); + + doAddNewReader(); + doUpdateTrafficMask(); + + if (nKeys > 0) + { + //System.out.println("ReadDebug:"+nKeys + " keys from selector"); + + processRead(selector.selectedKeys()); + } + else + { + //System.out.println("ReadDebug:"+"No keys from selector"); + } + + + doRemove(); + notifyReadIdleness(); + + if (selector.keys().isEmpty()) + { + + synchronized (readLock) + { + if (selector.keys().isEmpty() && newSessions.isEmpty()) + { + readWorker = null; + try + { + selector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + selector = null; + } + + break; + } + } + } + } + catch (Throwable t) + { + ExceptionMonitor.getInstance().exceptionCaught(t); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + ExceptionMonitor.getInstance().exceptionCaught(e1); + } + } + } + //System.out.println("ReadDebug:"+"Shutdown"); + } + + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java new file mode 100644 index 0000000000..043d4800b6 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoConnectorConfig; +import org.apache.mina.common.support.BaseIoSessionConfig; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; + +/** + * An {@link IoConnectorConfig} for {@link SocketConnector}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +public class MultiThreadSocketSessionConfigImpl extends org.apache.mina.transport.socket.nio.SocketSessionConfigImpl +{ + private static boolean SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false; + private static boolean SET_SEND_BUFFER_SIZE_AVAILABLE = false; + private static boolean GET_TRAFFIC_CLASS_AVAILABLE = false; + private static boolean SET_TRAFFIC_CLASS_AVAILABLE = false; + + private static boolean DEFAULT_REUSE_ADDRESS; + private static int DEFAULT_RECEIVE_BUFFER_SIZE; + private static int DEFAULT_SEND_BUFFER_SIZE; + private static int DEFAULT_TRAFFIC_CLASS; + private static boolean DEFAULT_KEEP_ALIVE; + private static boolean DEFAULT_OOB_INLINE; + private static int DEFAULT_SO_LINGER; + private static boolean DEFAULT_TCP_NO_DELAY; + + static + { + initialize(); + } + + private static void initialize() + { + Socket socket = null; + + socket = new Socket(); + + try + { + DEFAULT_REUSE_ADDRESS = socket.getReuseAddress(); + DEFAULT_RECEIVE_BUFFER_SIZE = socket.getReceiveBufferSize(); + DEFAULT_SEND_BUFFER_SIZE = socket.getSendBufferSize(); + DEFAULT_KEEP_ALIVE = socket.getKeepAlive(); + DEFAULT_OOB_INLINE = socket.getOOBInline(); + DEFAULT_SO_LINGER = socket.getSoLinger(); + DEFAULT_TCP_NO_DELAY = socket.getTcpNoDelay(); + + // Check if setReceiveBufferSize is supported. + try + { + socket.setReceiveBufferSize(DEFAULT_RECEIVE_BUFFER_SIZE); + SET_RECEIVE_BUFFER_SIZE_AVAILABLE = true; + } + catch( SocketException e ) + { + SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false; + } + + // Check if setSendBufferSize is supported. + try + { + socket.setSendBufferSize(DEFAULT_SEND_BUFFER_SIZE); + SET_SEND_BUFFER_SIZE_AVAILABLE = true; + } + catch( SocketException e ) + { + SET_SEND_BUFFER_SIZE_AVAILABLE = false; + } + + // Check if getTrafficClass is supported. + try + { + DEFAULT_TRAFFIC_CLASS = socket.getTrafficClass(); + GET_TRAFFIC_CLASS_AVAILABLE = true; + } + catch( SocketException e ) + { + GET_TRAFFIC_CLASS_AVAILABLE = false; + DEFAULT_TRAFFIC_CLASS = 0; + } + } + catch( SocketException e ) + { + throw new ExceptionInInitializerError(e); + } + finally + { + if( socket != null ) + { + try + { + socket.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + } + + public static boolean isSetReceiveBufferSizeAvailable() { + return SET_RECEIVE_BUFFER_SIZE_AVAILABLE; + } + + public static boolean isSetSendBufferSizeAvailable() { + return SET_SEND_BUFFER_SIZE_AVAILABLE; + } + + public static boolean isGetTrafficClassAvailable() { + return GET_TRAFFIC_CLASS_AVAILABLE; + } + + public static boolean isSetTrafficClassAvailable() { + return SET_TRAFFIC_CLASS_AVAILABLE; + } + + private boolean reuseAddress = DEFAULT_REUSE_ADDRESS; + private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; + private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE; + private int trafficClass = DEFAULT_TRAFFIC_CLASS; + private boolean keepAlive = DEFAULT_KEEP_ALIVE; + private boolean oobInline = DEFAULT_OOB_INLINE; + private int soLinger = DEFAULT_SO_LINGER; + private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; + + /** + * Creates a new instance. + */ + MultiThreadSocketSessionConfigImpl() + { + } + + public boolean isReuseAddress() + { + return reuseAddress; + } + + public void setReuseAddress( boolean reuseAddress ) + { + this.reuseAddress = reuseAddress; + } + + public int getReceiveBufferSize() + { + return receiveBufferSize; + } + + public void setReceiveBufferSize( int receiveBufferSize ) + { + this.receiveBufferSize = receiveBufferSize; + } + + public int getSendBufferSize() + { + return sendBufferSize; + } + + public void setSendBufferSize( int sendBufferSize ) + { + this.sendBufferSize = sendBufferSize; + } + + public int getTrafficClass() + { + return trafficClass; + } + + public void setTrafficClass( int trafficClass ) + { + this.trafficClass = trafficClass; + } + + public boolean isKeepAlive() + { + return keepAlive; + } + + public void setKeepAlive( boolean keepAlive ) + { + this.keepAlive = keepAlive; + } + + public boolean isOobInline() + { + return oobInline; + } + + public void setOobInline( boolean oobInline ) + { + this.oobInline = oobInline; + } + + public int getSoLinger() + { + return soLinger; + } + + public void setSoLinger( int soLinger ) + { + this.soLinger = soLinger; + } + + public boolean isTcpNoDelay() + { + return tcpNoDelay; + } + + public void setTcpNoDelay( boolean tcpNoDelay ) + { + this.tcpNoDelay = tcpNoDelay; + } + + +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java new file mode 100644 index 0000000000..be4a2d289d --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoService; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.RuntimeIOException; +import org.apache.mina.common.TransportType; +import org.apache.mina.common.support.BaseIoSessionConfig; +import org.apache.mina.common.support.IoServiceListenerSupport; +import org.apache.mina.util.Queue; + +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * An {@link IoSession} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +class MultiThreadSocketSessionImpl extends SocketSessionImpl +{ + private final IoService manager; + private final IoServiceConfig serviceConfig; + private final SocketSessionConfig config = new SessionConfigImpl(); + private final MultiThreadSocketIoProcessor ioProcessor; + private final MultiThreadSocketFilterChain filterChain; + private final SocketChannel ch; + private final Queue writeRequestQueue; + private final IoHandler handler; + private final SocketAddress remoteAddress; + private final SocketAddress localAddress; + private final SocketAddress serviceAddress; + private final IoServiceListenerSupport serviceListeners; + private SelectionKey readKey, writeKey; + private int readBufferSize; + private CountDownLatch registeredReadyLatch = new CountDownLatch(2); + private AtomicBoolean created = new AtomicBoolean(false); + + /** + * Creates a new instance. + */ + MultiThreadSocketSessionImpl( IoService manager, + SocketIoProcessor ioProcessor, + IoServiceListenerSupport listeners, + IoServiceConfig serviceConfig, + SocketChannel ch, + IoHandler defaultHandler, + SocketAddress serviceAddress ) + { + super(manager, ioProcessor, listeners, serviceConfig, ch,defaultHandler,serviceAddress); + this.manager = manager; + this.serviceListeners = listeners; + this.ioProcessor = (MultiThreadSocketIoProcessor) ioProcessor; + this.filterChain = new MultiThreadSocketFilterChain(this); + this.ch = ch; + this.writeRequestQueue = new Queue(); + this.handler = defaultHandler; + this.remoteAddress = ch.socket().getRemoteSocketAddress(); + this.localAddress = ch.socket().getLocalSocketAddress(); + this.serviceAddress = serviceAddress; + this.serviceConfig = serviceConfig; + + // Apply the initial session settings + IoSessionConfig sessionConfig = serviceConfig.getSessionConfig(); + if( sessionConfig instanceof SocketSessionConfig ) + { + SocketSessionConfig cfg = ( SocketSessionConfig ) sessionConfig; + this.config.setKeepAlive( cfg.isKeepAlive() ); + this.config.setOobInline( cfg.isOobInline() ); + this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() ); + this.readBufferSize = cfg.getReceiveBufferSize(); + this.config.setReuseAddress( cfg.isReuseAddress() ); + this.config.setSendBufferSize( cfg.getSendBufferSize() ); + this.config.setSoLinger( cfg.getSoLinger() ); + this.config.setTcpNoDelay( cfg.isTcpNoDelay() ); + + if( this.config.getTrafficClass() != cfg.getTrafficClass() ) + { + this.config.setTrafficClass( cfg.getTrafficClass() ); + } + } + } + + void awaitRegistration() throws InterruptedException + { + registeredReadyLatch.countDown(); + + registeredReadyLatch.await(); + } + + boolean created() throws InterruptedException + { + return created.get(); + } + + void doneCreation() + { + created.getAndSet(true); + } + + public IoService getService() + { + return manager; + } + + public IoServiceConfig getServiceConfig() + { + return serviceConfig; + } + + public IoSessionConfig getConfig() + { + return config; + } + + SocketIoProcessor getIoProcessor() + { + return ioProcessor; + } + + public IoFilterChain getFilterChain() + { + return filterChain; + } + + SocketChannel getChannel() + { + return ch; + } + + IoServiceListenerSupport getServiceListeners() + { + return serviceListeners; + } + + SelectionKey getSelectionKey() + { + return readKey; + } + + SelectionKey getReadSelectionKey() + { + return readKey; + } + + SelectionKey getWriteSelectionKey() + { + return writeKey; + } + + void setSelectionKey(SelectionKey key) + { + this.readKey = key; + } + + void setWriteSelectionKey(SelectionKey key) + { + this.writeKey = key; + } + + public IoHandler getHandler() + { + return handler; + } + + protected void close0() + { + filterChain.fireFilterClose( this ); + } + + Queue getWriteRequestQueue() + { + return writeRequestQueue; + } + + /** + @return int Number of write scheduled write requests + @deprecated + */ + public int getScheduledWriteMessages() + { + return getScheduledWriteRequests(); + } + + public int getScheduledWriteRequests() + { + synchronized( writeRequestQueue ) + { + return writeRequestQueue.size(); + } + } + + public int getScheduledWriteBytes() + { + synchronized( writeRequestQueue ) + { + return writeRequestQueue.byteSize(); + } + } + + protected void write0( WriteRequest writeRequest ) + { + filterChain.fireFilterWrite( this, writeRequest ); + } + + public TransportType getTransportType() + { + return TransportType.SOCKET; + } + + public SocketAddress getRemoteAddress() + { + //This is what I had previously +// return ch.socket().getRemoteSocketAddress(); + return remoteAddress; + } + + public SocketAddress getLocalAddress() + { + //This is what I had previously +// return ch.socket().getLocalSocketAddress(); + return localAddress; + } + + public SocketAddress getServiceAddress() + { + return serviceAddress; + } + + protected void updateTrafficMask() + { + this.ioProcessor.updateTrafficMask( this ); + } + + int getReadBufferSize() + { + return readBufferSize; + } + + private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig + { + public boolean isKeepAlive() + { + try + { + return ch.socket().getKeepAlive(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setKeepAlive( boolean on ) + { + try + { + ch.socket().setKeepAlive( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public boolean isOobInline() + { + try + { + return ch.socket().getOOBInline(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setOobInline( boolean on ) + { + try + { + ch.socket().setOOBInline( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public boolean isReuseAddress() + { + try + { + return ch.socket().getReuseAddress(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setReuseAddress( boolean on ) + { + try + { + ch.socket().setReuseAddress( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public int getSoLinger() + { + try + { + return ch.socket().getSoLinger(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setSoLinger( int linger ) + { + try + { + if( linger < 0 ) + { + ch.socket().setSoLinger( false, 0 ); + } + else + { + ch.socket().setSoLinger( true, linger ); + } + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public boolean isTcpNoDelay() + { + try + { + return ch.socket().getTcpNoDelay(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setTcpNoDelay( boolean on ) + { + try + { + ch.socket().setTcpNoDelay( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public int getTrafficClass() + { + if( SocketSessionConfigImpl.isGetTrafficClassAvailable() ) + { + try + { + return ch.socket().getTrafficClass(); + } + catch( SocketException e ) + { + // Throw an exception only when setTrafficClass is also available. + if( SocketSessionConfigImpl.isSetTrafficClassAvailable() ) + { + throw new RuntimeIOException( e ); + } + } + } + + return 0; + } + + public void setTrafficClass( int tc ) + { + if( SocketSessionConfigImpl.isSetTrafficClassAvailable() ) + { + try + { + ch.socket().setTrafficClass( tc ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + } + + public int getSendBufferSize() + { + try + { + return ch.socket().getSendBufferSize(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setSendBufferSize( int size ) + { + if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() ) + { + try + { + ch.socket().setSendBufferSize( size ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + } + + public int getReceiveBufferSize() + { + try + { + return ch.socket().getReceiveBufferSize(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setReceiveBufferSize( int size ) + { + if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() ) + { + try + { + ch.socket().setReceiveBufferSize( size ); + MultiThreadSocketSessionImpl.this.readBufferSize = size; + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + } + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java new file mode 100644 index 0000000000..a23e546af5 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.vmpipe; + +import java.io.IOException; +import java.net.SocketAddress; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.common.support.BaseIoConnector; +import org.apache.mina.common.support.BaseIoConnectorConfig; +import org.apache.mina.common.support.BaseIoSessionConfig; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.transport.vmpipe.support.VmPipe; +import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker; +import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl; +import org.apache.mina.util.AnonymousSocketAddress; + +/** + * Connects to {@link IoHandler}s which is bound on the specified + * {@link VmPipeAddress}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +public class QpidVmPipeConnector extends VmPipeConnector +{ + private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {}; + private final IoServiceConfig defaultConfig = new BaseIoConnectorConfig() + { + public IoSessionConfig getSessionConfig() + { + return CONFIG; + } + }; + + /** + * Creates a new instance. + */ + public QpidVmPipeConnector() + { + } + + public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config ) + { + return connect( address, null, handler, config ); + } + + public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config ) + { + if( address == null ) + throw new NullPointerException( "address" ); + if( handler == null ) + throw new NullPointerException( "handler" ); + if( ! ( address instanceof VmPipeAddress ) ) + throw new IllegalArgumentException( + "address must be VmPipeAddress." ); + + if( config == null ) + { + config = getDefaultConfig(); + } + + VmPipe entry = ( VmPipe ) VmPipeAcceptor.boundHandlers.get( address ); + if( entry == null ) + { + return DefaultConnectFuture.newFailedFuture( + new IOException( "Endpoint unavailable: " + address ) ); + } + + DefaultConnectFuture future = new DefaultConnectFuture(); + VmPipeSessionImpl localSession = + new VmPipeSessionImpl( + this, + config, + getListeners(), + new Object(), // lock + new AnonymousSocketAddress(), + handler, + entry ); + + // initialize acceptor session + VmPipeSessionImpl remoteSession = localSession.getRemoteSession(); + try + { + IoFilterChain filterChain = remoteSession.getFilterChain(); + entry.getAcceptor().getFilterChainBuilder().buildFilterChain( filterChain ); + entry.getConfig().getFilterChainBuilder().buildFilterChain( filterChain ); + entry.getConfig().getThreadModel().buildFilterChain( filterChain ); + + // The following sentences don't throw any exceptions. + entry.getListeners().fireSessionCreated( remoteSession ); + VmPipeIdleStatusChecker.getInstance().addSession( remoteSession ); + } + catch( Throwable t ) + { + ExceptionMonitor.getInstance().exceptionCaught( t ); + remoteSession.close(); + } + + + // initialize connector session + try + { + IoFilterChain filterChain = localSession.getFilterChain(); + this.getFilterChainBuilder().buildFilterChain( filterChain ); + config.getFilterChainBuilder().buildFilterChain( filterChain ); + config.getThreadModel().buildFilterChain( filterChain ); + + // The following sentences don't throw any exceptions. + localSession.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future ); + getListeners().fireSessionCreated( localSession ); + VmPipeIdleStatusChecker.getInstance().addSession( localSession); + } + catch( Throwable t ) + { + future.setException( t ); + } + + + + return future; + } + + public IoServiceConfig getDefaultConfig() + { + return defaultConfig; + } +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index 19f5035e33..f6f596da95 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -20,9 +20,7 @@ */ package org.apache.qpid; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; /** @@ -55,7 +53,7 @@ public class AMQChannelException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), - new AMQShortString(getMessage())); + MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor)); + return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId)); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index ba9f69a05c..afd415b1eb 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -21,9 +21,7 @@ package org.apache.qpid; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; /** @@ -61,7 +59,12 @@ public class AMQConnectionException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), - new AMQShortString(getMessage())); + MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor)); + return new AMQFrame(channel, + reg.createConnectionCloseBody(getErrorCode().getCode(), + new AMQShortString(getMessage()), + _classId, + _methodId)); + } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java index b60f130652..6cdd57d6f2 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java @@ -1,40 +1,47 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid;
-
-/**
- * AMQConnectionFailureException indicates that a connection to a broker could not be formed.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents failure to connect to a broker.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- */
-public class AMQConnectionFailureException extends AMQException
-{
- public AMQConnectionFailureException(String message, Throwable cause)
- {
- super(null, message, cause);
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid; + +import org.apache.qpid.protocol.AMQConstant; + +/** + * AMQConnectionFailureException indicates that a connection to a broker could not be formed. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represents failure to connect to a broker. + * </table> + * + * @todo Not an AMQP exception as no status code. + */ +public class AMQConnectionFailureException extends AMQException +{ + public AMQConnectionFailureException(String message, Throwable cause) + { + super(null, message, cause); + } + + public AMQConnectionFailureException(AMQConstant errorCode, String message, Throwable cause) + { + super(errorCode, message, cause); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java index 6cbb98fd86..eda532b64e 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQException.java @@ -20,6 +20,8 @@ */ package org.apache.qpid; +import javax.management.JMException; + import org.apache.qpid.protocol.AMQConstant; /** @@ -38,7 +40,7 @@ public class AMQException extends Exception { /** Holds the AMQ error code constant associated with this exception. */ private AMQConstant _errorCode; - + /** * Creates an exception with an optional error code, optional message and optional underlying cause. * @@ -52,6 +54,28 @@ public class AMQException extends Exception _errorCode = errorCode; } + /* + * Deprecated constructors brought from M2.1 + */ + @Deprecated + public AMQException (String msg) + { + this(null, (msg == null) ? "" : msg); + } + + @Deprecated + public AMQException (AMQConstant errorCode, String msg) + { + this(errorCode, (msg == null) ? "" : msg, null); + } + + @Deprecated + public AMQException(String msg, Throwable cause) + { + this(null, msg, cause); + } + + /** * Gets the AMQ protocol exception code associated with this exception. * @@ -61,4 +85,9 @@ public class AMQException extends Exception { return _errorCode; } + + public boolean isHardError() + { + return true; + } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java index 15c8bea0a4..baca2a4773 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java @@ -36,4 +36,10 @@ public class AMQInvalidArgumentException extends AMQException { super(AMQConstant.INVALID_ARGUMENT, message, cause); } + + public boolean isHardError() + { + return false; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java index 1502c0efc5..01a569b693 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java @@ -45,4 +45,10 @@ public class AMQUndeliveredException extends AMQException { return _bounced; } + + public boolean isHardError() + { + return false; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 02ae3cb089..7eef73f337 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -22,6 +22,7 @@ package org.apache.qpid.codec; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; @@ -48,6 +49,9 @@ import org.apache.qpid.framing.ProtocolInitiation; */ public class AMQDecoder extends CumulativeProtocolDecoder { + + private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer"; + /** Holds the 'normal' AMQP data decoder. */ private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); @@ -56,6 +60,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder /** Flag to indicate whether this decoder needs to handle protocol initiation. */ private boolean _expectProtocolInitiation; + private boolean firstDecode = true; /** * Creates a new AMQP decoder. @@ -81,14 +86,24 @@ public class AMQDecoder extends CumulativeProtocolDecoder */ protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - if (_expectProtocolInitiation) + + boolean decoded; + if (_expectProtocolInitiation + || (firstDecode + && (in.remaining() > 0) + && (in.get(in.position()) == (byte)'A'))) { - return doDecodePI(session, in, out); + decoded = doDecodePI(session, in, out); } else { - return doDecodeDataBlock(session, in, out); + decoded = doDecodeDataBlock(session, in, out); } + if(firstDecode && decoded) + { + firstDecode = false; + } + return decoded; } /** @@ -160,4 +175,97 @@ public class AMQDecoder extends CumulativeProtocolDecoder { _expectProtocolInitiation = expectProtocolInitiation; } + + + /** + * Cumulates content of <tt>in</tt> into internal buffer and forwards + * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. + * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> + * and the cumulative buffer is compacted after decoding ends. + * + * @throws IllegalStateException if your <tt>doDecode()</tt> returned + * <tt>true</tt> not consuming the cumulative buffer. + */ + public void decode( IoSession session, ByteBuffer in, + ProtocolDecoderOutput out ) throws Exception + { + ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER ); + // if we have a session buffer, append data to that otherwise + // use the buffer read from the network directly + if( buf != null ) + { + buf.put( in ); + buf.flip(); + } + else + { + buf = in; + } + + for( ;; ) + { + int oldPos = buf.position(); + boolean decoded = doDecode( session, buf, out ); + if( decoded ) + { + if( buf.position() == oldPos ) + { + throw new IllegalStateException( + "doDecode() can't return true when buffer is not consumed." ); + } + + if( !buf.hasRemaining() ) + { + break; + } + } + else + { + break; + } + } + + // if there is any data left that cannot be decoded, we store + // it in a buffer in the session and next time this decoder is + // invoked the session buffer gets appended to + if ( buf.hasRemaining() ) + { + storeRemainingInSession( buf, session ); + } + else + { + removeSessionBuffer( session ); + } + } + + /** + * Releases the cumulative buffer used by the specified <tt>session</tt>. + * Please don't forget to call <tt>super.dispose( session )</tt> when + * you override this method. + */ + public void dispose( IoSession session ) throws Exception + { + removeSessionBuffer( session ); + } + + private void removeSessionBuffer(IoSession session) + { + ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER ); + if( buf != null ) + { + buf.release(); + session.removeAttribute( BUFFER ); + } + } + + private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator(); + + private void storeRemainingInSession(ByteBuffer buf, IoSession session) + { + ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false ); + remainingBuf.setAutoExpand( true ); + remainingBuf.put( buf ); + session.setAttribute( BUFFER, remainingBuf ); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index c497717870..fe04155bb8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -1,39 +1,40 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.framing;
-
-import org.apache.mina.common.ByteBuffer;
-
-public abstract class AMQBody
-{
- public abstract byte getFrameType();
-
- /**
- * Get the size of the body
- * @return unsigned short
- */
- protected abstract int getSize();
-
- protected abstract void writePayload(ByteBuffer buffer);
-
- protected abstract void populateFromBuffer(ByteBuffer buffer, long size)
- throws AMQFrameDecodingException, AMQProtocolVersionException;
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; + +public interface AMQBody +{ + public byte getFrameType(); + + /** + * Get the size of the body + * @return unsigned short + */ + public abstract int getSize(); + + public void writePayload(ByteBuffer buffer); + + void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException; +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 11f505fd4b..02a46f3748 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -27,7 +27,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock private final int _channel; private final AMQBody _bodyFrame; - + public static final byte FRAME_END_BYTE = (byte) 0xCE; public AMQFrame(final int channel, final AMQBody bodyFrame) @@ -47,13 +47,19 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock return 1 + 2 + 4 + _bodyFrame.getSize() + 1; } + public static final int getFrameOverhead() + { + return 1 + 2 + 4 + 1; + } + + public void writePayload(ByteBuffer buffer) { buffer.put(_bodyFrame.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, _channel); EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize()); _bodyFrame.writePayload(buffer); - buffer.put((byte) 0xCE); + buffer.put(FRAME_END_BYTE); } public final int getChannel() @@ -66,10 +72,54 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock return _bodyFrame; } - - public String toString() { return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame); } + + public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body) + { + buffer.put(body.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body.getSize()); + body.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + + } + + public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2) + { + buffer.put(body1.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body1.getSize()); + body1.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + buffer.put(body2.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body2.getSize()); + body2.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + + } + + public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) + { + buffer.put(body1.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body1.getSize()); + body1.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + buffer.put(body2.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body2.getSize()); + body2.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + buffer.put(body3.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body3.getSize()); + body3.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java index 843b6a1e8c..2373edb478 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java @@ -38,4 +38,10 @@ public class AMQFrameDecodingException extends AMQException { super(errorCode, message, cause); } + + public AMQFrameDecodingException(AMQConstant errorCode, String message) + { + super(errorCode, message, null); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 0982847aac..4763b22290 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -1,132 +1,83 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.framing;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQChannelException;
-import org.apache.qpid.AMQConnectionException;
-import org.apache.qpid.protocol.AMQConstant;
-
-public abstract class AMQMethodBody extends AMQBody
-{
- public static final byte TYPE = 1;
-
- /** AMQP version */
- protected byte major;
- protected byte minor;
-
- public byte getMajor()
- {
- return major;
- }
-
- public byte getMinor()
- {
- return minor;
- }
-
- public AMQMethodBody(byte major, byte minor)
- {
- this.major = major;
- this.minor = minor;
- }
-
- /** unsigned short */
- protected abstract int getBodySize();
-
- /** @return unsigned short */
- protected abstract int getClazz();
-
- /** @return unsigned short */
- protected abstract int getMethod();
-
- protected abstract void writeMethodPayload(ByteBuffer buffer);
-
- public byte getFrameType()
- {
- return TYPE;
- }
-
- protected int getSize()
- {
- return 2 + 2 + getBodySize();
- }
-
- protected void writePayload(ByteBuffer buffer)
- {
- EncodingUtils.writeUnsignedShort(buffer, getClazz());
- EncodingUtils.writeUnsignedShort(buffer, getMethod());
- writeMethodPayload(buffer);
- }
-
- protected abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
-
- protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
- {
- populateMethodBodyFromBuffer(buffer);
- }
-
- public String toString()
- {
- StringBuffer buf = new StringBuffer(getClass().getName());
- buf.append("[ Class: ").append(getClazz());
- buf.append(" Method: ").append(getMethod()).append(']');
- return buf.toString();
- }
-
- /**
- * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and
- * method ids of the body it resulted from).
- */
-
- /**
- * Convenience Method to create a channel not found exception
- *
- * @param channelId The channel id that is not found
- *
- * @return new AMQChannelException
- */
- public AMQChannelException getChannelNotFoundException(int channelId)
- {
- return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId);
- }
-
- public AMQChannelException getChannelException(AMQConstant code, String message)
- {
- return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, null);
- }
-
- public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
- {
- return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause);
- }
-
- public AMQConnectionException getConnectionException(AMQConstant code, String message)
- {
- return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, null);
- }
-
- public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
- {
- return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause);
- }
-
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; + +public interface AMQMethodBody extends AMQBody +{ + public static final byte TYPE = 1; + + /** AMQP version */ + public byte getMajor(); + + public byte getMinor(); + + + + /** @return unsigned short */ + public int getClazz(); + + /** @return unsigned short */ + public int getMethod(); + + public void writeMethodPayload(ByteBuffer buffer); + + + public int getSize(); + + public void writePayload(ByteBuffer buffer); + + //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; + + //public void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException; + + public AMQFrame generateFrame(int channelId); + + public String toString(); + + + + /** + * Convenience Method to create a channel not found exception + * + * @param channelId The channel id that is not found + * + * @return new AMQChannelException + */ + public AMQChannelException getChannelNotFoundException(int channelId); + + public AMQChannelException getChannelException(AMQConstant code, String message); + + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause); + + public AMQConnectionException getConnectionException(AMQConstant code, String message); + + + public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause); + + + public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException; +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index cf85bdab31..1a7022c11b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -40,7 +40,6 @@ public class AMQMethodBodyFactory implements BodyFactory public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - return _protocolSession.getRegistry().get((short) in.getUnsignedShort(), (short) in.getUnsignedShort(), in, - bodySize); + return _protocolSession.getMethodRegistry().convertToBody(in, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java new file mode 100644 index 0000000000..64af717342 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -0,0 +1,96 @@ +package org.apache.qpid.framing;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+public abstract class AMQMethodBodyImpl implements AMQMethodBody
+{
+ public static final byte TYPE = 1;
+
+ public AMQMethodBodyImpl()
+ {
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+
+ /** unsigned short */
+ abstract protected int getBodySize();
+
+
+ public AMQFrame generateFrame(int channelId)
+ {
+ return new AMQFrame(channelId, this);
+ }
+
+ /**
+ * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and
+ * method ids of the body it resulted from).
+ */
+
+ /**
+ * Convenience Method to create a channel not found exception
+ *
+ * @param channelId The channel id that is not found
+ *
+ * @return new AMQChannelException
+ */
+ public AMQChannelException getChannelNotFoundException(int channelId)
+ {
+ return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId);
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message)
+ {
+ return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), null);
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
+ {
+ return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message)
+ {
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), null);
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
+ {
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException
+ {
+ session.methodFrameReceived(channelId, this);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java index 359efe7eb7..0030742e94 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -26,6 +26,5 @@ import org.apache.mina.common.ByteBuffer; public abstract interface AMQMethodBodyInstanceFactory
{
- public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
- public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+ public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index ee63f2c83d..a747aaeda7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -1,444 +1,721 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.framing;
-
-import org.apache.mina.common.ByteBuffer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.WeakHashMap;
-import java.lang.ref.WeakReference;
-
-/**
- * A short string is a representation of an AMQ Short String
- * Short strings differ from the Java String class by being limited to on ASCII characters (0-127)
- * and thus can be held more effectively in a byte buffer.
- *
- */
-public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
-{
-
- private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap =
- new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>()
- {
- protected Map<AMQShortString, WeakReference<AMQShortString>> initialValue()
- {
- return new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
- };
- };
-
- private static final Map<AMQShortString, WeakReference<AMQShortString>> _globalInternMap =
- new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
-
- private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
-
- private final ByteBuffer _data;
- private int _hashCode;
- private final int _length;
- private static final char[] EMPTY_CHAR_ARRAY = new char[0];
- private char[] chars;
- private String str;
-
- public AMQShortString(byte[] data)
- {
-
- _data = ByteBuffer.wrap(data);
- _length = data.length;
- }
-
- public AMQShortString(String data)
- {
- this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
- if (data != null)
- {
- _hashCode = data.hashCode();
- }
- }
-
- public AMQShortString(char[] data)
- {
- if (data == null)
- {
- throw new NullPointerException("Cannot create AMQShortString with null char[]");
- }
-
- final int length = data.length;
- final byte[] stringBytes = new byte[length];
- for (int i = 0; i < length; i++)
- {
- stringBytes[i] = (byte) (0xFF & data[i]);
- }
-
- _data = ByteBuffer.wrap(stringBytes);
- _data.rewind();
- _length = length;
-
- }
-
- public AMQShortString(CharSequence charSequence)
- {
- final int length = charSequence.length();
- final byte[] stringBytes = new byte[length];
- int hash = 0;
- for (int i = 0; i < length; i++)
- {
- stringBytes[i] = ((byte) (0xFF & charSequence.charAt(i)));
- hash = (31 * hash) + stringBytes[i];
-
- }
-
- _data = ByteBuffer.wrap(stringBytes);
- _data.rewind();
- _hashCode = hash;
- _length = length;
-
- }
-
- private AMQShortString(ByteBuffer data)
- {
- _data = data;
- _length = data.limit();
-
- }
-
- /**
- * Get the length of the short string
- * @return length of the underlying byte array
- */
- public int length()
- {
- return _length;
- }
-
- public char charAt(int index)
- {
-
- return (char) _data.get(index);
-
- }
-
- public CharSequence subSequence(int start, int end)
- {
- return new CharSubSequence(start, end);
- }
-
- public int writeToByteArray(byte[] encoding, int pos)
- {
- final int size = length();
- encoding[pos++] = (byte) length();
- for (int i = 0; i < size; i++)
- {
- encoding[pos++] = _data.get(i);
- }
-
- return pos;
- }
-
- public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
- {
-
- final byte len = byteEncodedDestination[pos];
- if (len == 0)
- {
- return null;
- }
-
- ByteBuffer data = ByteBuffer.wrap(byteEncodedDestination, pos + 1, len).slice();
-
- return new AMQShortString(data);
- }
-
- public static AMQShortString readFromBuffer(ByteBuffer buffer)
- {
- final short length = buffer.getUnsigned();
- if (length == 0)
- {
- return null;
- }
- else
- {
- ByteBuffer data = buffer.slice();
- data.limit(length);
- data.rewind();
- buffer.skip(length);
-
- return new AMQShortString(data);
- }
- }
-
- public byte[] getBytes()
- {
-
- if (_data.buf().hasArray() && (_data.arrayOffset() == 0))
- {
- return _data.array();
- }
- else
- {
- final int size = length();
- byte[] b = new byte[size];
- ByteBuffer buf = _data.duplicate();
- buf.rewind();
- buf.get(b);
-
- return b;
- }
-
- }
-
- public void writeToBuffer(ByteBuffer buffer)
- {
-
- final int size = length();
- if (size != 0)
- {
-
- buffer.put((byte) size);
- if (_data.buf().hasArray())
- {
- buffer.put(_data.array(), _data.arrayOffset(), length());
- }
- else
- {
-
- for (int i = 0; i < size; i++)
- {
-
- buffer.put(_data.get(i));
- }
- }
- }
- else
- {
- // really writing out unsigned byte
- buffer.put((byte) 0);
- }
-
- }
-
- private final class CharSubSequence implements CharSequence
- {
- private final int _offset;
- private final int _end;
-
- public CharSubSequence(final int offset, final int end)
- {
- _offset = offset;
- _end = end;
- }
-
- public int length()
- {
- return _end - _offset;
- }
-
- public char charAt(int index)
- {
- return AMQShortString.this.charAt(index + _offset);
- }
-
- public CharSequence subSequence(int start, int end)
- {
- return new CharSubSequence(start + _offset, end + _offset);
- }
- }
-
- public char[] asChars()
- {
- if (chars == null)
- {
- final int size = length();
- chars = new char[size];
-
- for (int i = 0; i < size; i++)
- {
- chars[i] = (char) _data.get(i);
- }
- }
-
- return chars;
- }
-
- public String asString()
- {
- if (str == null)
- {
- str = new String(asChars());
- }
-
- return str;
- }
-
- public boolean equals(Object o)
- {
- if (o == null)
- {
- return false;
- }
-
- if (o == this)
- {
- return true;
- }
-
- if (o instanceof AMQShortString)
- {
-
- final AMQShortString otherString = (AMQShortString) o;
-
- if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
- {
- return false;
- }
-
- return _data.equals(otherString._data);
-
- }
-
- return (o instanceof CharSequence) && equals((CharSequence) o);
-
- }
-
- public boolean equals(CharSequence s)
- {
- if (s == null)
- {
- return false;
- }
-
- if (s.length() != length())
- {
- return false;
- }
-
- for (int i = 0; i < length(); i++)
- {
- if (charAt(i) != s.charAt(i))
- {
- return false;
- }
- }
-
- return true;
- }
-
- public int hashCode()
- {
- int hash = _hashCode;
- if (hash == 0)
- {
- final int size = length();
-
- for (int i = 0; i < size; i++)
- {
- hash = (31 * hash) + _data.get(i);
- }
-
- _hashCode = hash;
- }
-
- return hash;
- }
-
- public void setDirty()
- {
- _hashCode = 0;
- }
-
- public String toString()
- {
- return asString();
- }
-
- public int compareTo(AMQShortString name)
- {
- if (name == null)
- {
- return 1;
- }
- else
- {
-
- if (name.length() < length())
- {
- return -name.compareTo(this);
- }
-
- for (int i = 0; i < length(); i++)
- {
- final byte d = _data.get(i);
- final byte n = name._data.get(i);
- if (d < n)
- {
- return -1;
- }
-
- if (d > n)
- {
- return 1;
- }
- }
-
- return (length() == name.length()) ? 0 : -1;
- }
- }
-
- public AMQShortString intern()
- {
-
- hashCode();
-
- Map<AMQShortString, WeakReference<AMQShortString>> localMap =
- _localInternMap.get();
-
- WeakReference<AMQShortString> ref = localMap.get(this);
- AMQShortString internString;
-
- if(ref != null)
- {
- internString = ref.get();
- if(internString != null)
- {
- return internString;
- }
- }
-
-
- synchronized(_globalInternMap)
- {
-
- ref = _globalInternMap.get(this);
- if((ref == null) || ((internString = ref.get()) == null))
- {
- internString = new AMQShortString(getBytes());
- ref = new WeakReference(internString);
- _globalInternMap.put(internString, ref);
- }
-
- }
- localMap.put(internString, ref);
- return internString;
-
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.lang.ref.WeakReference; + +/** + * A short string is a representation of an AMQ Short String + * Short strings differ from the Java String class by being limited to on ASCII characters (0-127) + * and thus can be held more effectively in a byte buffer. + * + */ +public final class AMQShortString implements CharSequence, Comparable<AMQShortString> +{ + private static final byte MINUS = (byte)'-'; + private static final byte ZERO = (byte) '0'; + + + + private final class TokenizerImpl implements AMQShortStringTokenizer + { + private final byte _delim; + private int _count = -1; + private int _pos = 0; + + public TokenizerImpl(final byte delim) + { + _delim = delim; + } + + public int countTokens() + { + if(_count == -1) + { + _count = 1 + AMQShortString.this.occurences(_delim); + } + return _count; + } + + public AMQShortString nextToken() + { + if(_pos <= AMQShortString.this.length()) + { + int nextDelim = AMQShortString.this.indexOf(_delim, _pos); + if(nextDelim == -1) + { + nextDelim = AMQShortString.this.length(); + } + + AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++); + _pos = nextDelim; + return nextToken; + } + else + { + return null; + } + } + + public boolean hasMoreTokens() + { + return _pos <= AMQShortString.this.length(); + } + } + + private AMQShortString substring(final int from, final int to) + { + return new AMQShortString(_data, from+_offset, to+_offset); + } + + + private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap = + new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>() + { + protected Map<AMQShortString, WeakReference<AMQShortString>> initialValue() + { + return new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>(); + }; + }; + + private static final Map<AMQShortString, WeakReference<AMQShortString>> _globalInternMap = + new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>(); + + private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class); + + private final byte[] _data; + private final int _offset; + private int _hashCode; + private final int _length; + private static final char[] EMPTY_CHAR_ARRAY = new char[0]; + + public static final AMQShortString EMPTY_STRING = new AMQShortString((String)null); + + public AMQShortString(byte[] data) + { + + _data = data.clone(); + _length = data.length; + _offset = 0; + } + + public AMQShortString(byte[] data, int pos) + { + final int size = data[pos++]; + final byte[] dataCopy = new byte[size]; + System.arraycopy(data,pos,dataCopy,0,size); + _length = size; + _data = dataCopy; + _offset = 0; + } + + public AMQShortString(String data) + { + this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray()); + + } + + public AMQShortString(char[] data) + { + if (data == null) + { + throw new NullPointerException("Cannot create AMQShortString with null char[]"); + } + + final int length = data.length; + final byte[] stringBytes = new byte[length]; + int hash = 0; + for (int i = 0; i < length; i++) + { + stringBytes[i] = (byte) (0xFF & data[i]); + hash = (31 * hash) + stringBytes[i]; + } + _hashCode = hash; + _data = stringBytes; + + _length = length; + _offset = 0; + + } + + public AMQShortString(CharSequence charSequence) + { + final int length = charSequence.length(); + final byte[] stringBytes = new byte[length]; + int hash = 0; + for (int i = 0; i < length; i++) + { + stringBytes[i] = ((byte) (0xFF & charSequence.charAt(i))); + hash = (31 * hash) + stringBytes[i]; + + } + + _data = stringBytes; + _hashCode = hash; + _length = length; + _offset = 0; + + } + + private AMQShortString(ByteBuffer data, final int length) + { + if(data.isDirect() || data.isReadOnly()) + { + byte[] dataBytes = new byte[length]; + data.get(dataBytes); + _data = dataBytes; + _offset = 0; + } + else + { + + _data = data.array(); + _offset = data.arrayOffset() + data.position(); + data.skip(length); + + } + _length = length; + + } + + private AMQShortString(final byte[] data, final int from, final int to) + { + _offset = from; + _length = to - from; + _data = data; + } + + public AMQShortString shrink() + { + if(_data.length != _length) + { + byte[] dataBytes = new byte[_length]; + System.arraycopy(_data,_offset,dataBytes,0,_length); + return new AMQShortString(dataBytes,0,_length); + } + else + { + return this; + } + } + + + /** + * Get the length of the short string + * @return length of the underlying byte array + */ + public int length() + { + return _length; + } + + public char charAt(int index) + { + + return (char) _data[_offset + index]; + + } + + public CharSequence subSequence(int start, int end) + { + return new CharSubSequence(start, end); + } + + public int writeToByteArray(byte[] encoding, int pos) + { + final int size = length(); + encoding[pos++] = (byte) size; + System.arraycopy(_data,_offset,encoding,pos,size); + return pos+size; + } + + public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos) + { + + + final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos); + if(shortString.length() == 0) + { + return null; + } + else + { + return shortString; + } + } + + public static AMQShortString readFromBuffer(ByteBuffer buffer) + { + final short length = buffer.getUnsigned(); + if (length == 0) + { + return null; + } + else + { + + return new AMQShortString(buffer, length); + } + } + + public byte[] getBytes() + { + if(_offset == 0 && _length == _data.length) + { + return _data.clone(); + } + else + { + byte[] data = new byte[_length]; + System.arraycopy(_data,_offset,data,0,_length); + return data; + } + } + + public void writeToBuffer(ByteBuffer buffer) + { + + final int size = length(); + //buffer.setAutoExpand(true); + buffer.put((byte) size); + buffer.put(_data, _offset, size); + + } + + public boolean endsWith(String s) + { + return endsWith(new AMQShortString(s)); + } + + + public boolean endsWith(AMQShortString otherString) + { + + if (otherString.length() > length()) + { + return false; + } + + + int thisLength = length(); + int otherLength = otherString.length(); + + for (int i = 1; i <= otherLength; i++) + { + if (charAt(thisLength - i) != otherString.charAt(otherLength - i)) + { + return false; + } + } + return true; + } + + public boolean startsWith(String s) + { + return startsWith(new AMQShortString(s)); + } + + public boolean startsWith(AMQShortString otherString) + { + + if (otherString.length() > length()) + { + return false; + } + + for (int i = 0; i < otherString.length(); i++) + { + if (charAt(i) != otherString.charAt(i)) + { + return false; + } + } + + return true; + + } + + public boolean startsWith(CharSequence otherString) + { + if (otherString.length() > length()) + { + return false; + } + + for (int i = 0; i < otherString.length(); i++) + { + if (charAt(i) != otherString.charAt(i)) + { + return false; + } + } + + return true; + } + + + private final class CharSubSequence implements CharSequence + { + private final int _sequenceOffset; + private final int _end; + + public CharSubSequence(final int offset, final int end) + { + _sequenceOffset = offset; + _end = end; + } + + public int length() + { + return _end - _sequenceOffset; + } + + public char charAt(int index) + { + return AMQShortString.this.charAt(index + _sequenceOffset); + } + + public CharSequence subSequence(int start, int end) + { + return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset); + } + } + + public char[] asChars() + { + final int size = length(); + final char[] chars = new char[size]; + + for (int i = 0; i < size; i++) + { + chars[i] = (char) _data[i + _offset]; + } + + return chars; + } + + public String asString() + { + return new String(asChars()); + } + + public boolean equals(Object o) + { + + + if(o instanceof AMQShortString) + { + return equals((AMQShortString)o); + } + if(o instanceof CharSequence) + { + return equals((CharSequence)o); + } + + if (o == null) + { + return false; + } + + if (o == this) + { + return true; + } + + + return false; + + } + + public boolean equals(final AMQShortString otherString) + { + if (otherString == this) + { + return true; + } + + if (otherString == null) + { + return false; + } + + if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode)) + { + return false; + } + + return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data)) + || Arrays.equals(getBytes(),otherString.getBytes()); + + } + + public boolean equals(CharSequence s) + { + if(s instanceof AMQShortString) + { + return equals((AMQShortString)s); + } + + if (s == null) + { + return false; + } + + if (s.length() != length()) + { + return false; + } + + for (int i = 0; i < length(); i++) + { + if (charAt(i) != s.charAt(i)) + { + return false; + } + } + + return true; + } + + public int hashCode() + { + int hash = _hashCode; + if (hash == 0) + { + final int size = length(); + + for (int i = 0; i < size; i++) + { + hash = (31 * hash) + _data[i+_offset]; + } + + _hashCode = hash; + } + + return hash; + } + + public void setDirty() + { + _hashCode = 0; + } + + public String toString() + { + return asString(); + } + + public int compareTo(AMQShortString name) + { + if (name == null) + { + return 1; + } + else + { + + if (name.length() < length()) + { + return -name.compareTo(this); + } + + for (int i = 0; i < length(); i++) + { + final byte d = _data[i+_offset]; + final byte n = name._data[i+name._offset]; + if (d < n) + { + return -1; + } + + if (d > n) + { + return 1; + } + } + + return (length() == name.length()) ? 0 : -1; + } + } + + + public AMQShortStringTokenizer tokenize(byte delim) + { + return new TokenizerImpl(delim); + } + + + public AMQShortString intern() + { + + hashCode(); + + Map<AMQShortString, WeakReference<AMQShortString>> localMap = + _localInternMap.get(); + + WeakReference<AMQShortString> ref = localMap.get(this); + AMQShortString internString; + + if(ref != null) + { + internString = ref.get(); + if(internString != null) + { + return internString; + } + } + + + synchronized(_globalInternMap) + { + + ref = _globalInternMap.get(this); + if((ref == null) || ((internString = ref.get()) == null)) + { + internString = shrink(); + ref = new WeakReference(internString); + _globalInternMap.put(internString, ref); + } + + } + localMap.put(internString, ref); + return internString; + + } + + private int occurences(final byte delim) + { + int count = 0; + final int end = _offset + _length; + for(int i = _offset ; i < end ; i++ ) + { + if(_data[i] == delim) + { + count++; + } + } + return count; + } + + private int indexOf(final byte val, final int pos) + { + + for(int i = pos; i < length(); i++) + { + if(_data[_offset+i] == val) + { + return i; + } + } + return -1; + } + + + public static AMQShortString join(final Collection<AMQShortString> terms, + final AMQShortString delim) + { + if(terms.size() == 0) + { + return EMPTY_STRING; + } + + int size = delim.length() * (terms.size() - 1); + for(AMQShortString term : terms) + { + size += term.length(); + } + + byte[] data = new byte[size]; + int pos = 0; + final byte[] delimData = delim._data; + final int delimOffset = delim._offset; + final int delimLength = delim._length; + + + for(AMQShortString term : terms) + { + + if(pos!=0) + { + System.arraycopy(delimData, delimOffset,data,pos, delimLength); + pos+=delimLength; + } + System.arraycopy(term._data,term._offset,data,pos,term._length); + pos+=term._length; + } + + + + return new AMQShortString(data,0,size); + } + + public int toIntValue() + { + int pos = _offset; + int val = 0; + + + boolean isNegative = (_data[pos] == MINUS); + if(isNegative) + { + pos++; + } + + final int end = _length + _offset; + + while(pos < end) + { + int digit = (int) (_data[pos++] - ZERO); + if((digit < 0) || (digit > 9)) + { + throw new NumberFormatException("\""+toString()+"\" is not a valid number"); + } + val = val * 10; + val += digit; + } + if(isNegative) + { + val = val * -1; + } + return val; + } + + public boolean contains(final byte b) + { + final int end = _length + _offset; + for(int i = _offset; i < end; i++) + { + if(_data[i] == b) + { + return true; + } + } + return false; //To change body of created methods use File | Settings | File Templates. + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java new file mode 100644 index 0000000000..e2db8906a1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java @@ -0,0 +1,31 @@ +package org.apache.qpid.framing; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public interface AMQShortStringTokenizer +{ + + public int countTokens(); + + public AMQShortString nextToken(); + + boolean hasMoreTokens(); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java index 6dda91a488..2c356d072c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java @@ -23,12 +23,24 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer;
import java.math.BigDecimal;
-import java.math.BigInteger;
+/**
+ * AMQType is a type that represents the different possible AMQP field table types. It provides operations for each
+ * of the types to perform tasks such as calculating the size of an instance of the type, converting types between AMQP
+ * and Java native types, and reading and writing instances of AMQP types in binary formats to and from byte buffers.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Get the equivalent one byte identifier for a type.
+ * <tr><td> Calculate the size of an instance of an AMQP parameter type. <td> {@link EncodingUtils}
+ * <tr><td> Convert an instance of an AMQP parameter into a compatable Java object tagged with its AMQP type.
+ * <td> {@link AMQTypedValue}
+ * <tr><td> Write an instance of an AMQP parameter type to a byte buffer. <td> {@link EncodingUtils}
+ * <tr><td> Read an instance of an AMQP parameter from a byte buffer. <td> {@link EncodingUtils}
+ * </table>
+ */
public enum AMQType
{
- //AMQP FieldTable Wire Types
-
LONG_STRING('S')
{
public int getEncodingSize(Object value)
@@ -36,7 +48,6 @@ public enum AMQType return EncodingUtils.encodedLongStringLength((String) value);
}
-
public String toNativeValue(Object value)
{
if (value != null)
@@ -58,12 +69,10 @@ public enum AMQType {
return EncodingUtils.readLongString(buffer);
}
-
},
INTEGER('i')
{
-
public int getEncodingSize(Object value)
{
return EncodingUtils.unsignedIntegerLength();
@@ -89,12 +98,11 @@ public enum AMQType }
else if ((value instanceof String) || (value == null))
{
- return Long.valueOf((String)value);
+ return Long.valueOf((String) value);
}
else
{
- throw new NumberFormatException("Cannot convert: " + value + "(" +
- value.getClass().getName() + ") to int.");
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + ") to int.");
}
}
@@ -111,22 +119,21 @@ public enum AMQType DECIMAL('D')
{
-
public int getEncodingSize(Object value)
{
- return EncodingUtils.encodedByteLength()+ EncodingUtils.encodedIntegerLength();
+ return EncodingUtils.encodedByteLength() + EncodingUtils.encodedIntegerLength();
}
public Object toNativeValue(Object value)
{
- if(value instanceof BigDecimal)
+ if (value instanceof BigDecimal)
{
return (BigDecimal) value;
}
else
{
- throw new NumberFormatException("Cannot convert: " + value + "(" +
- value.getClass().getName() + ") to BigDecimal.");
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to BigDecimal.");
}
}
@@ -150,7 +157,8 @@ public enum AMQType int unscaled = EncodingUtils.readInteger(buffer);
BigDecimal bd = new BigDecimal(unscaled);
- return bd.setScale(places);
+
+ return bd.setScale(places);
}
},
@@ -163,14 +171,14 @@ public enum AMQType public Object toNativeValue(Object value)
{
- if(value instanceof Long)
+ if (value instanceof Long)
{
return (Long) value;
}
else
{
- throw new NumberFormatException("Cannot convert: " + value + "(" +
- value.getClass().getName() + ") to timestamp.");
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to timestamp.");
}
}
@@ -179,37 +187,97 @@ public enum AMQType EncodingUtils.writeLong(buffer, (Long) value);
}
-
public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readLong(buffer);
}
},
+ /**
+ * Implements the field table type. The native value of a field table type will be an instance of
+ * {@link FieldTable}, which itself may contain name/value pairs encoded as {@link AMQTypedValue}s.
+ */
FIELD_TABLE('F')
{
+ /**
+ * Calculates the size of an instance of the type in bytes.
+ *
+ * @param value An instance of the type.
+ *
+ * @return The size of the instance of the type in bytes.
+ */
public int getEncodingSize(Object value)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ // Ensure that the value is a FieldTable.
+ if (!(value instanceof FieldTable))
+ {
+ throw new IllegalArgumentException("Value is not a FieldTable.");
+ }
+
+ FieldTable ftValue = (FieldTable) value;
+
+ // Loop over all name/value pairs adding up size of each. FieldTable itself keeps track of its encoded
+ // size as entries are added, so no need to loop over all explicitly.
+ // EncodingUtils calculation of the encoded field table lenth, will include 4 bytes for its 'size' field.
+ return EncodingUtils.encodedFieldTableLength(ftValue);
}
+ /**
+ * Converts an instance of the type to an equivalent Java native representation.
+ *
+ * @param value An instance of the type.
+ *
+ * @return An equivalent Java native representation.
+ */
public Object toNativeValue(Object value)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ // Ensure that the value is a FieldTable.
+ if (!(value instanceof FieldTable))
+ {
+ throw new IllegalArgumentException("Value is not a FieldTable.");
+ }
+
+ return (FieldTable) value;
}
+ /**
+ * Writes an instance of the type to a specified byte buffer.
+ *
+ * @param value An instance of the type.
+ * @param buffer The byte buffer to write it to.
+ */
public void writeValueImpl(Object value, ByteBuffer buffer)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ // Ensure that the value is a FieldTable.
+ if (!(value instanceof FieldTable))
+ {
+ throw new IllegalArgumentException("Value is not a FieldTable.");
+ }
+
+ FieldTable ftValue = (FieldTable) value;
+
+ // Loop over all name/values writing out into buffer.
+ ftValue.writeToBuffer(buffer);
}
+ /**
+ * Reads an instance of the type from a specified byte buffer.
+ *
+ * @param buffer The byte buffer to write it to.
+ *
+ * @return An instance of the type.
+ */
public Object readValueFromBuffer(ByteBuffer buffer)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ try
+ {
+ // Read size of field table then all name/value pairs.
+ return EncodingUtils.readFieldTable(buffer);
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ throw new IllegalArgumentException("Unable to read field table from buffer.", e);
+ }
}
},
@@ -220,7 +288,6 @@ public enum AMQType return 0;
}
-
public Object toNativeValue(Object value)
{
if (value == null)
@@ -229,14 +296,13 @@ public enum AMQType }
else
{
- throw new NumberFormatException("Cannot convert: " + value + "(" +
- value.getClass().getName() + ") to null String.");
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to null String.");
}
}
public void writeValueImpl(Object value, ByteBuffer buffer)
- {
- }
+ { }
public Object readValueFromBuffer(ByteBuffer buffer)
{
@@ -244,8 +310,6 @@ public enum AMQType }
},
- // Extended types
-
BINARY('x')
{
public int getEncodingSize(Object value)
@@ -253,21 +317,19 @@ public enum AMQType return EncodingUtils.encodedLongstrLength((byte[]) value);
}
-
public Object toNativeValue(Object value)
{
- if((value instanceof byte[]) || (value == null))
+ if ((value instanceof byte[]) || (value == null))
{
return value;
}
else
{
- throw new IllegalArgumentException("Value: " + value + " (" + value.getClass().getName() +
- ") cannot be converted to byte[]");
+ throw new IllegalArgumentException("Value: " + value + " (" + value.getClass().getName()
+ + ") cannot be converted to byte[]");
}
}
-
public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeLongstr(buffer, (byte[]) value);
@@ -277,7 +339,6 @@ public enum AMQType {
return EncodingUtils.readLongstr(buffer);
}
-
},
ASCII_STRING('c')
@@ -287,7 +348,6 @@ public enum AMQType return EncodingUtils.encodedLongStringLength((String) value);
}
-
public String toNativeValue(Object value)
{
if (value != null)
@@ -309,7 +369,6 @@ public enum AMQType {
return EncodingUtils.readLongString(buffer);
}
-
},
WIDE_STRING('C')
@@ -320,7 +379,6 @@ public enum AMQType return EncodingUtils.encodedLongStringLength((String) value);
}
-
public String toNativeValue(Object value)
{
if (value != null)
@@ -351,7 +409,6 @@ public enum AMQType return EncodingUtils.encodedBooleanLength();
}
-
public Object toNativeValue(Object value)
{
if (value instanceof Boolean)
@@ -360,12 +417,12 @@ public enum AMQType }
else if ((value instanceof String) || (value == null))
{
- return Boolean.valueOf((String)value);
+ return Boolean.valueOf((String) value);
}
else
{
- throw new NumberFormatException("Cannot convert: " + value + "(" +
- value.getClass().getName() + ") to boolean.");
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to boolean.");
}
}
@@ -374,7 +431,6 @@ public enum AMQType EncodingUtils.writeBoolean(buffer, (Boolean) value);
}
-
public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readBoolean(buffer);
@@ -388,7 +444,6 @@ public enum AMQType return EncodingUtils.encodedCharLength();
}
-
public Character toNativeValue(Object value)
{
if (value instanceof Character)
@@ -401,8 +456,8 @@ public enum AMQType }
else
{
- throw new NumberFormatException("Cannot convert: " + value + "(" +
- value.getClass().getName() + ") to char.");
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to char.");
}
}
@@ -415,7 +470,6 @@ public enum AMQType {
return EncodingUtils.readChar(buffer);
}
-
},
BYTE('b')
@@ -425,7 +479,6 @@ public enum AMQType return EncodingUtils.encodedByteLength();
}
-
public Byte toNativeValue(Object value)
{
if (value instanceof Byte)
@@ -434,12 +487,12 @@ public enum AMQType }
else if ((value instanceof String) || (value == null))
{
- return Byte.valueOf((String)value);
+ return Byte.valueOf((String) value);
}
else
{
- throw new NumberFormatException("Cannot convert: " + value + "(" +
- value.getClass().getName() + ") to byte.");
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to byte.");
}
}
@@ -456,13 +509,11 @@ public enum AMQType SHORT('s')
{
-
public int getEncodingSize(Object value)
{
return EncodingUtils.encodedShortLength();
}
-
public Short toNativeValue(Object value)
{
if (value instanceof Short)
@@ -475,16 +526,13 @@ public enum AMQType }
else if ((value instanceof String) || (value == null))
{
- return Short.valueOf((String)value);
+ return Short.valueOf((String) value);
}
-
else
{
- throw new NumberFormatException("Cannot convert: " + value + "(" +
- value.getClass().getName() + ") to short.");
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to short.");
}
-
-
}
public void writeValueImpl(Object value, ByteBuffer buffer)
@@ -521,12 +569,11 @@ public enum AMQType }
else if ((value instanceof String) || (value == null))
{
- return Integer.valueOf((String)value);
+ return Integer.valueOf((String) value);
}
else
{
- throw new NumberFormatException("Cannot convert: " + value + "(" +
- value.getClass().getName() + ") to int.");
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + ") to int.");
}
}
@@ -543,7 +590,6 @@ public enum AMQType LONG('l')
{
-
public int getEncodingSize(Object value)
{
return EncodingUtils.encodedLongLength();
@@ -551,7 +597,7 @@ public enum AMQType public Object toNativeValue(Object value)
{
- if(value instanceof Long)
+ if (value instanceof Long)
{
return (Long) value;
}
@@ -569,12 +615,12 @@ public enum AMQType }
else if ((value instanceof String) || (value == null))
{
- return Long.valueOf((String)value);
+ return Long.valueOf((String) value);
}
else
{
- throw new NumberFormatException("Cannot convert: " + value + "(" +
- value.getClass().getName() + ") to long.");
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to long.");
}
}
@@ -583,7 +629,6 @@ public enum AMQType EncodingUtils.writeLong(buffer, (Long) value);
}
-
public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readLong(buffer);
@@ -597,7 +642,6 @@ public enum AMQType return EncodingUtils.encodedFloatLength();
}
-
public Float toNativeValue(Object value)
{
if (value instanceof Float)
@@ -606,12 +650,12 @@ public enum AMQType }
else if ((value instanceof String) || (value == null))
{
- return Float.valueOf((String)value);
+ return Float.valueOf((String) value);
}
else
{
- throw new NumberFormatException("Cannot convert: " + value + "(" +
- value.getClass().getName() + ") to float.");
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to float.");
}
}
@@ -628,13 +672,11 @@ public enum AMQType DOUBLE('d')
{
-
public int getEncodingSize(Object value)
{
return EncodingUtils.encodedDoubleLength();
}
-
public Double toNativeValue(Object value)
{
if (value instanceof Double)
@@ -647,12 +689,12 @@ public enum AMQType }
else if ((value instanceof String) || (value == null))
{
- return Double.valueOf((String)value);
+ return Double.valueOf((String) value);
}
else
{
- throw new NumberFormatException("Cannot convert: " + value + "(" +
- value.getClass().getName() + ") to double.");
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to double.");
}
}
@@ -667,35 +709,87 @@ public enum AMQType }
};
+ /** Holds the defined one byte identifier for the type. */
private final byte _identifier;
+ /**
+ * Creates an instance of an AMQP type from its defined one byte identifier.
+ *
+ * @param identifier The one byte identifier for the type.
+ */
AMQType(char identifier)
{
_identifier = (byte) identifier;
}
+ /**
+ * Extracts the byte identifier for the typ.
+ *
+ * @return The byte identifier for the typ.
+ */
public final byte identifier()
{
return _identifier;
}
-
+ /**
+ * Calculates the size of an instance of the type in bytes.
+ *
+ * @param value An instance of the type.
+ *
+ * @return The size of the instance of the type in bytes.
+ */
public abstract int getEncodingSize(Object value);
+ /**
+ * Converts an instance of the type to an equivalent Java native representation.
+ *
+ * @param value An instance of the type.
+ *
+ * @return An equivalent Java native representation.
+ */
public abstract Object toNativeValue(Object value);
+ /**
+ * Converts an instance of the type to an equivalent Java native representation, packaged as an
+ * {@link AMQTypedValue} tagged with its AMQP type.
+ *
+ * @param value An instance of the type.
+ *
+ * @return An equivalent Java native representation, tagged with its AMQP type.
+ */
public AMQTypedValue asTypedValue(Object value)
{
return new AMQTypedValue(this, toNativeValue(value));
}
+ /**
+ * Writes an instance of the type to a specified byte buffer, preceded by its one byte identifier. As the type and
+ * value are both written, this provides a fully encoded description of a parameters type and value.
+ *
+ * @param value An instance of the type.
+ * @param buffer The byte buffer to write it to.
+ */
public void writeToBuffer(Object value, ByteBuffer buffer)
{
- buffer.put((byte)identifier());
+ buffer.put(identifier());
writeValueImpl(value, buffer);
}
+ /**
+ * Writes an instance of the type to a specified byte buffer.
+ *
+ * @param value An instance of the type.
+ * @param buffer The byte buffer to write it to.
+ */
abstract void writeValueImpl(Object value, ByteBuffer buffer);
+ /**
+ * Reads an instance of the type from a specified byte buffer.
+ *
+ * @param buffer The byte buffer to write it to.
+ *
+ * @return An instance of the type.
+ */
abstract Object readValueFromBuffer(ByteBuffer buffer);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index 7193580884..e5b1fad9a8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -18,23 +18,40 @@ * under the License.
*
*/
-
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+/**
+ * AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter
+ * value. It provides the ability to read and write fully typed parameters to and from byte buffers. It also provides
+ * the ability to create such parameters from Java native value and a type tag or to extract the native value and type
+ * from one.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Create a fully typed AMQP value from a native type and a type tag. <td> {@link AMQType}
+ * <tr><td> Create a fully typed AMQP value from a binary representation in a byte buffer. <td> {@link AMQType}
+ * <tr><td> Write a fully typed AMQP value to a binary representation in a byte buffer. <td> {@link AMQType}
+ * <tr><td> Extract the type from a fully typed AMQP value.
+ * <tr><td> Extract the value from a fully typed AMQP value.
+ * </table>
+ */
public class AMQTypedValue
{
+ /** The type of the value. */
private final AMQType _type;
- private final Object _value;
+ /** The Java native representation of the AMQP typed value. */
+ private final Object _value;
public AMQTypedValue(AMQType type, Object value)
{
- if(type == null)
+ if (type == null)
{
throw new NullPointerException("Cannot create a typed value with null type");
}
+
_type = type;
_value = type.toNativeValue(value);
}
@@ -42,10 +59,9 @@ public class AMQTypedValue private AMQTypedValue(AMQType type, ByteBuffer buffer)
{
_type = type;
- _value = type.readValueFromBuffer( buffer );
+ _value = type.readValueFromBuffer(buffer);
}
-
public AMQType getType()
{
return _type;
@@ -56,10 +72,9 @@ public class AMQTypedValue return _value;
}
-
public void writeToBuffer(ByteBuffer buffer)
{
- _type.writeToBuffer(_value,buffer);
+ _type.writeToBuffer(_value, buffer);
}
public int getEncodingSize()
@@ -70,11 +85,12 @@ public class AMQTypedValue public static AMQTypedValue readFromBuffer(ByteBuffer buffer)
{
AMQType type = AMQTypeMap.getType(buffer.get());
+
return new AMQTypedValue(type, buffer);
}
public String toString()
{
- return "["+getType()+": "+getValue()+"]";
+ return "[" + getType() + ": " + getValue() + "]";
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java index 5ec62ede93..94030f383e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -24,7 +24,6 @@ import org.apache.mina.common.ByteBuffer; public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock { - private ByteBuffer _encodedBlock; private AMQDataBlock[] _blocks; @@ -33,27 +32,12 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD _blocks = blocks; } - /** - * The encoded block will be logically first before the AMQDataBlocks which are encoded - * into the buffer afterwards. - * @param encodedBlock already-encoded data - * @param blocks some blocks to be encoded. - */ - public CompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock[] blocks) - { - this(blocks); - _encodedBlock = encodedBlock; - } public AMQDataBlock[] getBlocks() { return _blocks; } - public ByteBuffer getEncodedBlock() - { - return _encodedBlock; - } public long getSize() { @@ -62,20 +46,11 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD { frameSize += _blocks[i].getSize(); } - if (_encodedBlock != null) - { - _encodedBlock.rewind(); - frameSize += _encodedBlock.remaining(); - } return frameSize; } public void writePayload(ByteBuffer buffer) { - if (_encodedBlock != null) - { - buffer.put(_encodedBlock); - } for (int i = 0; i < _blocks.length; i++) { _blocks[i].writePayload(buffer); @@ -91,7 +66,7 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD else { StringBuilder buf = new StringBuilder(this.getClass().getName()); - buf.append("{encodedBlock=").append(_encodedBlock); + buf.append("{"); for (int i = 0 ; i < _blocks.length; i++) { buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]"); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index be38695384..9d39f8aa86 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -21,8 +21,10 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; -public class ContentBody extends AMQBody +public class ContentBody implements AMQBody { public static final byte TYPE = 3; @@ -63,11 +65,24 @@ public class ContentBody extends AMQBody { if (payload != null) { - ByteBuffer copy = payload.duplicate(); - buffer.put(copy.rewind()); + if(payload.isDirect() || payload.isReadOnly()) + { + ByteBuffer copy = payload.duplicate(); + buffer.put(copy.rewind()); + } + else + { + buffer.put(payload.array(),payload.arrayOffset(),payload.limit()); + } } } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + throws AMQException + { + session.contentBodyReceived(channelId, this); + } + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException { if (size > 0) diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 02631a5f88..83e5a7e341 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -21,8 +21,10 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; -public class ContentHeaderBody extends AMQBody +public class ContentHeaderBody implements AMQBody { public static final byte TYPE = 2; @@ -110,6 +112,12 @@ public class ContentHeaderBody extends AMQBody properties.writePropertyListPayload(buffer); } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + throws AMQException + { + session.contentHeaderReceived(channelId, this); + } + public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, long bodySize) { diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index 712eb437db..46189b63d7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -22,6 +22,8 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; + public class ContentHeaderPropertiesFactory { private static final ContentHeaderPropertiesFactory _instance = new ContentHeaderPropertiesFactory(); @@ -43,7 +45,7 @@ public class ContentHeaderPropertiesFactory // AMQP version change: "Hardwired" version to major=8, minor=0 // TODO: Change so that the actual version is obtained from // the ProtocolInitiation object for this session. - if (classId == BasicConsumeBody.getClazz((byte)8, (byte)0)) + if (classId == BasicConsumeBodyImpl.CLASS_ID) { properties = new BasicContentHeaderProperties(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java new file mode 100644 index 0000000000..f6795ff200 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java @@ -0,0 +1,50 @@ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public abstract class DeferredDataBlock extends AMQDataBlock +{ + private AMQDataBlock _underlyingDataBlock; + + + public long getSize() + { + if(_underlyingDataBlock == null) + { + _underlyingDataBlock = createAMQDataBlock(); + } + return _underlyingDataBlock.getSize(); + } + + public void writePayload(ByteBuffer buffer) + { + if(_underlyingDataBlock == null) + { + _underlyingDataBlock = createAMQDataBlock(); + } + _underlyingDataBlock.writePayload(buffer); + } + + abstract protected AMQDataBlock createAMQDataBlock(); + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index ccba8bd41e..6425f8c591 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -229,7 +229,11 @@ public class EncodingUtils encodedString[i] = (byte) cha[i]; } - writeBytes(buffer, encodedString); + // TODO: check length fits in an unsigned byte + writeUnsignedByte(buffer, (short)encodedString.length); + buffer.put(encodedString); + + } else { @@ -928,15 +932,15 @@ public class EncodingUtils public static byte[] readBytes(ByteBuffer buffer) { - short length = buffer.getUnsigned(); + long length = buffer.getUnsignedInt(); if (length == 0) { return null; } else { - byte[] dataBytes = new byte[length]; - buffer.get(dataBytes, 0, length); + byte[] dataBytes = new byte[(int)length]; + buffer.get(dataBytes, 0, (int)length); return dataBytes; } @@ -947,13 +951,14 @@ public class EncodingUtils if (data != null) { // TODO: check length fits in an unsigned byte - writeUnsignedByte(buffer, (short) data.length); + writeUnsignedInteger(buffer, (long)data.length); buffer.put(data); } else - { + { // really writing out unsigned byte - buffer.put((byte) 0); + //buffer.put((byte) 0); + writeUnsignedInteger(buffer, 0L); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 3438770450..9ba9b53b13 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -18,7 +18,6 @@ * under the License. * */ - package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; @@ -360,6 +359,41 @@ public class FieldTable } } + /** + * Extracts a value from the field table that is itself a FieldTable associated with the specified parameter name. + * + * @param string The name of the parameter to get the associated FieldTable value for. + * + * @return The associated FieldTable value, or <tt>null</tt> if the associated value is not of FieldTable type or + * not present in the field table at all. + */ + public FieldTable getFieldTable(String string) + { + return getFieldTable(new AMQShortString(string)); + } + + /** + * Extracts a value from the field table that is itself a FieldTable associated with the specified parameter name. + * + * @param string The name of the parameter to get the associated FieldTable value for. + * + * @return The associated FieldTable value, or <tt>null</tt> if the associated value is not of FieldTable type or + * not present in the field table at all. + */ + public FieldTable getFieldTable(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + + if ((value != null) && (value.getType() == AMQType.FIELD_TABLE)) + { + return (FieldTable) value.getValue(); + } + else + { + return null; + } + } + public Object getObject(String string) { return getObject(new AMQShortString(string)); @@ -568,6 +602,32 @@ public class FieldTable return setProperty(string, AMQType.VOID.asTypedValue(null)); } + /** + * Associates a nested field table with the specified parameter name. + * + * @param string The name of the parameter to store in the table. + * @param ftValue The field table value to associate with the parameter name. + * + * @return The stored value. + */ + public Object setFieldTable(String string, FieldTable ftValue) + { + return setFieldTable(new AMQShortString(string), ftValue); + } + + /** + * Associates a nested field table with the specified parameter name. + * + * @param string The name of the parameter to store in the table. + * @param ftValue The field table value to associate with the parameter name. + * + * @return The stored value. + */ + public Object setFieldTable(AMQShortString string, FieldTable ftValue) + { + return setProperty(string, AMQType.FIELD_TABLE.asTypedValue(ftValue)); + } + public Object setObject(AMQShortString string, Object object) { if (object instanceof Boolean) @@ -706,12 +766,15 @@ public class FieldTable public void writeToBuffer(ByteBuffer buffer) { - final boolean trace = _logger.isTraceEnabled(); + final boolean trace = _logger.isDebugEnabled(); if (trace) { - _logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "..."); - _logger.trace(_properties.toString()); + _logger.debug("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "..."); + if (_properties != null) + { + _logger.debug(_properties.toString()); + } } EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize()); @@ -894,14 +957,21 @@ public class FieldTable if (_encodedForm != null) { + if(buffer.isDirect() || buffer.isReadOnly()) + { + ByteBuffer encodedForm = _encodedForm.duplicate(); - if (_encodedForm.position() != 0) + if (encodedForm.position() != 0) + { + encodedForm.flip(); + } + + buffer.put(encodedForm); + } + else { - _encodedForm.flip(); + buffer.put(_encodedForm.array(),_encodedForm.arrayOffset(),(int)_encodedSize); } - // _encodedForm.limit((int)getEncodedSize()); - - buffer.put(_encodedForm); } else if (_properties != null) { @@ -915,11 +985,11 @@ public class FieldTable final Map.Entry<AMQShortString, AMQTypedValue> me = it.next(); try { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + me.getValue().getValue()); - _logger.trace("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); + _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); } // Write the actual parameter name @@ -928,12 +998,12 @@ public class FieldTable } catch (Exception e) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Exception thrown:" + e); - _logger.trace("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + _logger.debug("Exception thrown:" + e); + _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + me.getValue().getValue()); - _logger.trace("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); + _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); } throw new RuntimeException(e); @@ -945,7 +1015,7 @@ public class FieldTable private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException { - final boolean trace = _logger.isTraceEnabled(); + final boolean trace = _logger.isDebugEnabled(); if (length > 0) { @@ -961,7 +1031,7 @@ public class FieldTable if (trace) { - _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + "', key '" + key + "', value '" + value.getValue() + "'"); } @@ -976,7 +1046,7 @@ public class FieldTable if (trace) { - _logger.trace("FieldTable::FieldTable(buffer," + length + "): Done."); + _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done."); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 7246c4a1cf..15a43345b5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -21,8 +21,10 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; -public class HeartbeatBody extends AMQBody +public class HeartbeatBody implements AMQBody { public static final byte TYPE = 8; public static AMQFrame FRAME = new HeartbeatBody().toFrame(); @@ -46,15 +48,21 @@ public class HeartbeatBody extends AMQBody return TYPE; } - protected int getSize() + public int getSize() { return 0;//heartbeats we generate have no payload } - protected void writePayload(ByteBuffer buffer) + public void writePayload(ByteBuffer buffer) { } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + throws AMQException + { + session.heartbeatBodyReceived(channelId, this); + } + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException { if(size > 0) diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 4c253b9973..3ac17e9204 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -139,7 +139,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData } } - public void checkVersion() throws AMQException + public ProtocolVersion checkVersion() throws AMQException { if(_protocolHeader.length != 4) @@ -180,6 +180,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData throw new AMQProtocolVersionException("Protocol version " + _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker.", null); } + return pv; } public String toString() diff --git a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java index 26c048e34a..f8cf3f3011 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java @@ -25,7 +25,7 @@ import org.apache.mina.common.ByteBuffer; public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
{
- private ByteBuffer _encodedBlock;
+ private AMQDataBlock _firstFrame;
private AMQDataBlock _block;
@@ -40,10 +40,10 @@ public class SmallCompositeAMQDataBlock extends AMQDataBlock implements Encodabl * @param encodedBlock already-encoded data
* @param block a block to be encoded.
*/
- public SmallCompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock block)
+ public SmallCompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock block)
{
this(block);
- _encodedBlock = encodedBlock;
+ _firstFrame = encodedBlock;
}
public AMQDataBlock getBlock()
@@ -51,28 +51,28 @@ public class SmallCompositeAMQDataBlock extends AMQDataBlock implements Encodabl return _block;
}
- public ByteBuffer getEncodedBlock()
+ public AMQDataBlock getFirstFrame()
{
- return _encodedBlock;
+ return _firstFrame;
}
public long getSize()
{
long frameSize = _block.getSize();
- if (_encodedBlock != null)
+ if (_firstFrame != null)
{
- _encodedBlock.rewind();
- frameSize += _encodedBlock.remaining();
+
+ frameSize += _firstFrame.getSize();
}
return frameSize;
}
public void writePayload(ByteBuffer buffer)
{
- if (_encodedBlock != null)
+ if (_firstFrame != null)
{
- buffer.put(_encodedBlock);
+ _firstFrame.writePayload(buffer);
}
_block.writePayload(buffer);
@@ -87,7 +87,7 @@ public class SmallCompositeAMQDataBlock extends AMQDataBlock implements Encodabl else
{
StringBuilder buf = new StringBuilder(this.getClass().getName());
- buf.append("{encodedBlock=").append(_encodedBlock);
+ buf.append("{encodedBlock=").append(_firstFrame);
buf.append(" _block=[").append(_block.toString()).append("]");
diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java index 6006e9793c..516d0c569c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -182,7 +182,7 @@ public class VersionSpecificRegistry + " method " + methodID + ".", null);
}
- return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size);
+ return bodyFactory.newInstance( in, size);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java index 706499c1b0..49c28bb06b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java @@ -27,6 +27,8 @@ public interface MessagePublishInfo public AMQShortString getExchange();
+ public void setExchange(AMQShortString exchange);
+
public boolean isImmediate();
public boolean isMandatory();
diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java new file mode 100644 index 0000000000..3c5cb74773 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java @@ -0,0 +1,209 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.amqp_0_9;
+
+import org.apache.qpid.framing.EncodingUtils;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.Content;
+
+import org.apache.mina.common.ByteBuffer;
+
+public abstract class AMQMethodBody_0_9 extends org.apache.qpid.framing.AMQMethodBodyImpl
+{
+
+ public byte getMajor()
+ {
+ return 0;
+ }
+
+ public byte getMinor()
+ {
+ return 9;
+ }
+
+ public int getSize()
+ {
+ return 2 + 2 + getBodySize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, getClazz());
+ EncodingUtils.writeUnsignedShort(buffer, getMethod());
+ writeMethodPayload(buffer);
+ }
+
+
+ protected byte readByte(ByteBuffer buffer)
+ {
+ return buffer.get();
+ }
+
+ protected AMQShortString readAMQShortString(ByteBuffer buffer)
+ {
+ return EncodingUtils.readAMQShortString(buffer);
+ }
+
+ protected int getSizeOf(AMQShortString string)
+ {
+ return EncodingUtils.encodedShortStringLength(string);
+ }
+
+ protected void writeByte(ByteBuffer buffer, byte b)
+ {
+ buffer.put(b);
+ }
+
+ protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, string);
+ }
+
+ protected int readInt(ByteBuffer buffer)
+ {
+ return buffer.getInt();
+ }
+
+ protected void writeInt(ByteBuffer buffer, int i)
+ {
+ buffer.putInt(i);
+ }
+
+ protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ return EncodingUtils.readFieldTable(buffer);
+ }
+
+ protected int getSizeOf(FieldTable table)
+ {
+ return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, table);
+ }
+
+ protected long readLong(ByteBuffer buffer)
+ {
+ return buffer.getLong();
+ }
+
+ protected void writeLong(ByteBuffer buffer, long l)
+ {
+ buffer.putLong(l);
+ }
+
+ protected int getSizeOf(byte[] response)
+ {
+ return (response == null) ? 4 :response.length + 4;
+ }
+
+ protected void writeBytes(ByteBuffer buffer, byte[] data)
+ {
+ EncodingUtils.writeBytes(buffer,data);
+ }
+
+ protected byte[] readBytes(ByteBuffer buffer)
+ {
+ return EncodingUtils.readBytes(buffer);
+ }
+
+ protected short readShort(ByteBuffer buffer)
+ {
+ return EncodingUtils.readShort(buffer);
+ }
+
+ protected void writeShort(ByteBuffer buffer, short s)
+ {
+ EncodingUtils.writeShort(buffer, s);
+ }
+
+ protected Content readContent(ByteBuffer buffer)
+ {
+ return null; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected int getSizeOf(Content body)
+ {
+ return 0; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeContent(ByteBuffer buffer, Content body)
+ {
+ //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected byte readBitfield(ByteBuffer buffer)
+ {
+ return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected int readUnsignedShort(ByteBuffer buffer)
+ {
+ return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
+ {
+ buffer.put(bitfield0);
+ }
+
+ protected void writeUnsignedShort(ByteBuffer buffer, int s)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, s);
+ }
+
+ protected long readUnsignedInteger(ByteBuffer buffer)
+ {
+ return buffer.getUnsignedInt();
+ }
+ protected void writeUnsignedInteger(ByteBuffer buffer, long i)
+ {
+ EncodingUtils.writeUnsignedInteger(buffer, i);
+ }
+
+
+ protected short readUnsignedByte(ByteBuffer buffer)
+ {
+ return buffer.getUnsigned();
+ }
+
+ protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
+ {
+ EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
+ }
+
+ protected long readTimestamp(ByteBuffer buffer)
+ {
+ return EncodingUtils.readTimestamp(buffer);
+ }
+
+ protected void writeTimestamp(ByteBuffer buffer, long t)
+ {
+ EncodingUtils.writeTimestamp(buffer, t);
+ }
+
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java new file mode 100644 index 0000000000..2fd4f70138 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -0,0 +1,172 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.amqp_0_9;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.*;
+import org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl;
+
+public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private int _basicPublishClassId;
+ private int _basicPublishMethodId;
+
+ public MethodConverter_0_9()
+ {
+ super((byte)0,(byte)9);
+
+
+ }
+
+ public AMQBody convertToBody(ContentChunk contentChunk)
+ {
+ if(contentChunk instanceof ContentChunk_0_9)
+ {
+ return ((ContentChunk_0_9)contentChunk).toBody();
+ }
+ else
+ {
+ return new ContentBody(contentChunk.getData());
+ }
+ }
+
+ public ContentChunk convertToContentChunk(AMQBody body)
+ {
+ final ContentBody contentBodyChunk = (ContentBody) body;
+
+ return new ContentChunk_0_9(contentBodyChunk);
+
+ }
+
+ public void configure()
+ {
+
+ _basicPublishClassId = org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl.CLASS_ID;
+ _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
+
+ }
+
+ public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
+ {
+ final BasicPublishBody publishBody = ((BasicPublishBody) methodBody);
+
+ final AMQShortString exchange = publishBody.getExchange();
+ final AMQShortString routingKey = publishBody.getRoutingKey();
+
+ return new MethodConverter_0_9.MessagePublishInfoImpl(exchange,
+ publishBody.getImmediate(),
+ publishBody.getMandatory(),
+ routingKey);
+
+ }
+
+ public AMQMethodBody convertToBody(MessagePublishInfo info)
+ {
+
+ return new BasicPublishBodyImpl(0,
+ info.getExchange(),
+ info.getRoutingKey(),
+ info.isMandatory(),
+ info.isImmediate()) ;
+
+ }
+
+ private static class MessagePublishInfoImpl implements MessagePublishInfo
+ {
+ private AMQShortString _exchange;
+ private final boolean _immediate;
+ private final boolean _mandatory;
+ private final AMQShortString _routingKey;
+
+ public MessagePublishInfoImpl(final AMQShortString exchange,
+ final boolean immediate,
+ final boolean mandatory,
+ final AMQShortString routingKey)
+ {
+ _exchange = exchange;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _routingKey = routingKey;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public boolean isImmediate()
+ {
+ return _immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return _mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+ }
+
+ private static class ContentChunk_0_9 implements ContentChunk
+ {
+ private final ContentBody _contentBodyChunk;
+
+ public ContentChunk_0_9(final ContentBody contentBodyChunk)
+ {
+ _contentBodyChunk = contentBodyChunk;
+ }
+
+ public int getSize()
+ {
+ return _contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return _contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ _contentBodyChunk.reduceBufferToFit();
+ }
+
+ public AMQBody toBody()
+ {
+ return _contentBodyChunk;
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java new file mode 100644 index 0000000000..2b7c9534a9 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java @@ -0,0 +1,209 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.amqp_8_0;
+
+import org.apache.qpid.framing.EncodingUtils;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.Content;
+
+import org.apache.mina.common.ByteBuffer;
+
+public abstract class AMQMethodBody_8_0 extends org.apache.qpid.framing.AMQMethodBodyImpl
+{
+
+ public byte getMajor()
+ {
+ return 8;
+ }
+
+ public byte getMinor()
+ {
+ return 0;
+ }
+
+ public int getSize()
+ {
+ return 2 + 2 + getBodySize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, getClazz());
+ EncodingUtils.writeUnsignedShort(buffer, getMethod());
+ writeMethodPayload(buffer);
+ }
+
+
+ protected byte readByte(ByteBuffer buffer)
+ {
+ return buffer.get();
+ }
+
+ protected AMQShortString readAMQShortString(ByteBuffer buffer)
+ {
+ return EncodingUtils.readAMQShortString(buffer);
+ }
+
+ protected int getSizeOf(AMQShortString string)
+ {
+ return EncodingUtils.encodedShortStringLength(string);
+ }
+
+ protected void writeByte(ByteBuffer buffer, byte b)
+ {
+ buffer.put(b);
+ }
+
+ protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, string);
+ }
+
+ protected int readInt(ByteBuffer buffer)
+ {
+ return buffer.getInt();
+ }
+
+ protected void writeInt(ByteBuffer buffer, int i)
+ {
+ buffer.putInt(i);
+ }
+
+ protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ return EncodingUtils.readFieldTable(buffer);
+ }
+
+ protected int getSizeOf(FieldTable table)
+ {
+ return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, table);
+ }
+
+ protected long readLong(ByteBuffer buffer)
+ {
+ return buffer.getLong();
+ }
+
+ protected void writeLong(ByteBuffer buffer, long l)
+ {
+ buffer.putLong(l);
+ }
+
+ protected int getSizeOf(byte[] response)
+ {
+ return (response == null) ? 4 : response.length + 4;
+ }
+
+ protected void writeBytes(ByteBuffer buffer, byte[] data)
+ {
+ EncodingUtils.writeBytes(buffer,data);
+ }
+
+ protected byte[] readBytes(ByteBuffer buffer)
+ {
+ return EncodingUtils.readBytes(buffer);
+ }
+
+ protected short readShort(ByteBuffer buffer)
+ {
+ return EncodingUtils.readShort(buffer);
+ }
+
+ protected void writeShort(ByteBuffer buffer, short s)
+ {
+ EncodingUtils.writeShort(buffer, s);
+ }
+
+ protected Content readContent(ByteBuffer buffer)
+ {
+ return null; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected int getSizeOf(Content body)
+ {
+ return 0; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeContent(ByteBuffer buffer, Content body)
+ {
+ //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected byte readBitfield(ByteBuffer buffer)
+ {
+ return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected int readUnsignedShort(ByteBuffer buffer)
+ {
+ return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
+ {
+ buffer.put(bitfield0);
+ }
+
+ protected void writeUnsignedShort(ByteBuffer buffer, int s)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, s);
+ }
+
+ protected long readUnsignedInteger(ByteBuffer buffer)
+ {
+ return buffer.getUnsignedInt();
+ }
+ protected void writeUnsignedInteger(ByteBuffer buffer, long i)
+ {
+ EncodingUtils.writeUnsignedInteger(buffer, i);
+ }
+
+
+ protected short readUnsignedByte(ByteBuffer buffer)
+ {
+ return buffer.getUnsigned();
+ }
+
+ protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
+ {
+ EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
+ }
+
+ protected long readTimestamp(ByteBuffer buffer)
+ {
+ return EncodingUtils.readTimestamp(buffer);
+ }
+
+ protected void writeTimestamp(ByteBuffer buffer, long t)
+ {
+ EncodingUtils.writeTimestamp(buffer, t);
+ }
+
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java new file mode 100644 index 0000000000..b1be49a350 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java @@ -0,0 +1,151 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.amqp_8_0;
+
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
+import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
+import org.apache.qpid.framing.*;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private int _basicPublishClassId;
+ private int _basicPublishMethodId;
+
+ public MethodConverter_8_0()
+ {
+ super((byte)8,(byte)0);
+
+
+ }
+
+ public AMQBody convertToBody(ContentChunk contentChunk)
+ {
+ return new ContentBody(contentChunk.getData());
+ }
+
+ public ContentChunk convertToContentChunk(AMQBody body)
+ {
+ final ContentBody contentBodyChunk = (ContentBody) body;
+
+ return new ContentChunk()
+ {
+
+ public int getSize()
+ {
+ return contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ contentBodyChunk.reduceBufferToFit();
+ }
+ };
+
+ }
+
+ public void configure()
+ {
+
+ _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID;
+ _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
+
+ }
+
+ public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
+ {
+ final BasicPublishBody publishBody = ((BasicPublishBody) methodBody);
+
+ final AMQShortString exchange = publishBody.getExchange();
+ final AMQShortString routingKey = publishBody.getRoutingKey();
+
+ return new MessagePublishInfoImpl(exchange == null ? null : exchange.intern(),
+ publishBody.getImmediate(),
+ publishBody.getMandatory(),
+ routingKey == null ? null : routingKey.intern());
+
+ }
+
+ public AMQMethodBody convertToBody(MessagePublishInfo info)
+ {
+
+ return new BasicPublishBodyImpl(0,
+ info.getExchange(),
+ info.getRoutingKey(),
+ info.isMandatory(),
+ info.isImmediate()) ;
+
+ }
+
+ private static class MessagePublishInfoImpl implements MessagePublishInfo
+ {
+ private AMQShortString _exchange;
+ private final boolean _immediate;
+ private final boolean _mandatory;
+ private final AMQShortString _routingKey;
+
+ public MessagePublishInfoImpl(final AMQShortString exchange,
+ final boolean immediate,
+ final boolean mandatory,
+ final AMQShortString routingKey)
+ {
+ _exchange = exchange;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _routingKey = routingKey;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public boolean isImmediate()
+ {
+ return _immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return _mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java index ba3c5d03fa..b2a09ac592 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -94,21 +94,23 @@ public class Job implements Runnable /** * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job. */ - void processAll() + boolean processAll() { // limit the number of events processed in one run - for (int i = 0; i < _maxEvents; i++) + int i = _maxEvents; + while( --i != 0 ) { Event e = _eventQueue.poll(); if (e == null) { - break; + return true; } else { e.process(_session); } } + return false; } /** @@ -144,9 +146,15 @@ public class Job implements Runnable */ public void run() { - processAll(); - deactivate(); - _completionHandler.completed(_session, this); + if(processAll()) + { + deactivate(); + _completionHandler.completed(_session, this); + } + else + { + _completionHandler.notCompleted(_session, this); + } } /** @@ -158,5 +166,7 @@ public class Job implements Runnable static interface JobCompletionHandler { public void completed(IoSession session, Job job); + + public void notCompleted(final IoSession session, final Job job); } } diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index d0dfb1adcf..2912e54662 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -30,6 +30,8 @@ import org.apache.qpid.pool.Event.CloseEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ExecutorService; /** * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it @@ -83,9 +85,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo /** Used for debugging purposes. */ private static final Logger _logger = LoggerFactory.getLogger(PoolingFilter.class); - /** Holds a mapping from Mina sessions to batched jobs for execution. */ - private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>(); - /** Holds the managed reference to obtain the executor for the batched jobs. */ private final ReferenceCountingExecutorService _poolReference; @@ -93,7 +92,9 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo private final String _name; /** Defines the maximum number of events that will be batched into a single job. */ - private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + + private final int _maxEvents; /** * Creates a named pooling filter, on the specified shared thread pool. @@ -101,10 +102,11 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo * @param refCountingPool The thread pool reference. * @param name The identifying name of the filter type. */ - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents) { _poolReference = refCountingPool; _name = name; + _maxEvents = maxEvents; } /** @@ -159,20 +161,34 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo /** * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. * - * @param session The Mina session to work in. + * @param job The job. * @param event The event to hand off asynchronously. */ - void fireAsynchEvent(IoSession session, Event event) + void fireAsynchEvent(Job job, Event event) { - Job job = getJobForSession(session); + // job.acquire(); //prevents this job being removed from _jobs job.add(event); - // Additional checks on pool to check that it hasn't shutdown. - // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown()) + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) { - _poolReference.getPool().execute(job); + return; + } + + // rather than perform additional checks on pool to check that it hasn't shutdown. + // catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate()) + { + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } } } @@ -185,7 +201,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void createNewJobForSession(IoSession session) { - Job job = new Job(session, this, _maxEvents); + Job job = new Job(session, this, MAX_JOB_EVENTS); session.setAttribute(_name, job); } @@ -196,7 +212,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo * * @return The Job for this filter to place asynchronous events into. */ - private Job getJobForSession(IoSession session) + public Job getJobForSession(IoSession session) { return (Job) session.getAttribute(_name); } @@ -210,17 +226,57 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void completed(IoSession session, Job job) { + + if (!job.isComplete()) { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + // ritchiem : 2006-12-13 Do we need to perform the additional checks here? // Can the pool be shutdown at this point? - if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown()) + if (job.activate()) { - _poolReference.getPool().execute(job); + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } } } + public void notCompleted(IoSession session, Job job) + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + + } + + + /** * No-op pass through filter to the next filter in the chain. * @@ -377,7 +433,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS)); } /** @@ -389,8 +445,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) { - - fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message)); + Job job = getJobForSession(session); + fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message)); } /** @@ -401,7 +457,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void sessionClosed(final NextFilter nextFilter, final IoSession session) { - fireAsynchEvent(session, new CloseEvent(nextFilter)); + Job job = getJobForSession(session); + fireAsynchEvent(job, new CloseEvent(nextFilter)); } } @@ -419,7 +476,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS)); } /** @@ -431,7 +488,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) { - fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest)); + Job job = getJobForSession(session); + fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest)); } /** @@ -442,7 +500,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void sessionClosed(final NextFilter nextFilter, final IoSession session) { - fireAsynchEvent(session, new CloseEvent(nextFilter)); + Job job = getJobForSession(session); + fireAsynchEvent(job, new CloseEvent(nextFilter)); } } } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java index 2fbeeda1d4..5a7679a972 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java @@ -21,6 +21,7 @@ package org.apache.qpid.protocol; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.AMQException; /** * AMQMethodListener is a listener that receives notifications of AMQP methods. The methods are packaged as events in @@ -57,7 +58,7 @@ public interface AMQMethodListener * * @todo Consider narrowing the exception. */ - <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception; + <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException; /** * Notifies the listener of an error on the event context to which it is listening. The listener should perform diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 7c1d6fdaa0..b56a05f725 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -20,7 +20,8 @@ */
package org.apache.qpid.protocol;
-import org.apache.qpid.framing.VersionSpecificRegistry;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
/**
* AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to
@@ -42,5 +43,15 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto *
* @return The method registry for a specific version of the AMQP.
*/
- public VersionSpecificRegistry getRegistry();
+// public VersionSpecificRegistry getRegistry();
+
+ MethodRegistry getMethodRegistry();
+
+
+ public void methodFrameReceived(int channelId, AMQMethodBody body) throws AMQException;
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException;
+ public void contentBodyReceived(int channelId, ContentBody body) throws AMQException;
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
+
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java index 60a7f30185..dea80cdcf4 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java @@ -20,6 +20,8 @@ */
package org.apache.qpid.protocol;
+import org.apache.qpid.framing.ProtocolVersion;
+
/**
* ProtocolVersionAware is implemented by all AMQP handling classes, that need to provide an awareness to callers of
* the version of the AMQP protocol that they are able to handle.
@@ -32,6 +34,7 @@ package org.apache.qpid.protocol; public interface ProtocolVersionAware
{
/**
+ * @deprecated
* Reports the AMQP minor version, that the implementer can handle.
*
* @return The AMQP minor version.
@@ -39,9 +42,12 @@ public interface ProtocolVersionAware public byte getProtocolMinorVersion();
/**
+ * @deprecated
* Reports the AMQP major version, that the implementer can handle.
*
* @return The AMQP major version.
*/
public byte getProtocolMajorVersion();
+
+ public ProtocolVersion getProtocolVersion();
}
diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java index 461cf9591d..633cf4fe3a 100644 --- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java +++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java @@ -215,6 +215,14 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ public void remove() { last.remove(); + if(last == _mainIterator) + { + _size.decrementAndGet(); + } + else + { + _messageHeadSize.decrementAndGet(); + } } }; } diff --git a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java new file mode 100644 index 0000000000..b93dc46741 --- /dev/null +++ b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java @@ -0,0 +1,396 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.SocketIOTest; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.CloseFuture; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.filter.ReadThrottleFilterBuilder; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; + +public class IOWriterClient implements Runnable +{ + private static final Logger _logger = LoggerFactory.getLogger(IOWriterClient.class); + + public static int DEFAULT_TEST_SIZE = 2; + + private IoSession _session; + + private long _startTime; + + private long[] _chunkTimes; + + public int _chunkCount = 200000; + + private int _chunkSize = 1024; + + private CountDownLatch _notifier; + + private int _maximumWriteQueueLength; + + static public int _PORT = IOWriterServer._PORT; + + public void run() + { + _logger.info("Starting to send " + _chunkCount + " buffers of " + _chunkSize + "B"); + _startTime = System.currentTimeMillis(); + _notifier = new CountDownLatch(1); + + for (int i = 0; i < _chunkCount; i++) + { + ByteBuffer buf = ByteBuffer.allocate(_chunkSize, false); + byte check = (byte) (i % 128); + buf.put(check); + buf.fill((byte) 88, buf.remaining()); + buf.flip(); + + _session.write(buf); + } + + long _sentall = System.currentTimeMillis(); + long _receivedall = _sentall; + try + { + _logger.info("All buffers sent; waiting for receipt from server"); + _notifier.await(); + _receivedall = System.currentTimeMillis(); + } + catch (InterruptedException e) + { + //Ignore + } + _logger.info("Completed"); + _logger.info("Total time waiting for server after last write: " + (_receivedall - _sentall)); + + long totalTime = System.currentTimeMillis() - _startTime; + + _logger.info("Total time: " + totalTime); + _logger.info("MB per second: " + (int) ((1.0 * _chunkSize * _chunkCount) / totalTime)); + long lastChunkTime = _startTime; + double average = 0; + for (int i = 0; i < _chunkTimes.length; i++) + { + if (i == 0) + { + average = _chunkTimes[i] - _startTime; + } + else + { + long delta = _chunkTimes[i] - lastChunkTime; + if (delta != 0) + { + average = (average + delta) / 2; + } + } + lastChunkTime = _chunkTimes[i]; + } + _logger.info("Average chunk time: " + average + "ms"); + _logger.info("Maximum WriteRequestQueue size: " + _maximumWriteQueueLength); + + CloseFuture cf = _session.close(); + _logger.info("Closing session"); + cf.join(); + } + + private class WriterHandler extends IoHandlerAdapter + { + private int _chunksReceived = 0; + + private int _partialBytesRead = 0; + + private byte _partialCheckNumber; + + private int _totalBytesReceived = 0; + + private int _receivedCount = 0; + private int _sentCount = 0; + private static final String DEFAULT_READ_BUFFER = "262144"; + private static final String DEFAULT_WRITE_BUFFER = "262144"; + + public void sessionCreated(IoSession session) throws Exception + { + IoFilterChain chain = session.getFilterChain(); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER))); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + + writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER))); + + writefilter.attach(chain); + } + + public void messageSent(IoSession session, Object message) throws Exception + { + _maximumWriteQueueLength = Math.max(session.getScheduledWriteRequests(), _maximumWriteQueueLength); + + if (_logger.isDebugEnabled()) + { + ++_sentCount; + if (_sentCount % 1000 == 0) + { + _logger.debug("Sent count " + _sentCount + ":WQueue" + session.getScheduledWriteRequests()); + + } + } + } + + public void messageReceived(IoSession session, Object message) throws Exception + { + if (_logger.isDebugEnabled()) + { + ++_receivedCount; + + if (_receivedCount % 1000 == 0) + { + _logger.debug("Receieved count " + _receivedCount); + } + } + + ByteBuffer result = (ByteBuffer) message; + _totalBytesReceived += result.remaining(); + int size = result.remaining(); + long now = System.currentTimeMillis(); + if (_partialBytesRead > 0) + { + int offset = _chunkSize - _partialBytesRead; + if (size >= offset) + { + _chunkTimes[_chunksReceived++] = now; + result.position(offset); + } + else + { + // have not read even one chunk, including the previous partial bytes + _partialBytesRead += size; + return; + } + } + + + int chunkCount = result.remaining() / _chunkSize; + + for (int i = 0; i < chunkCount; i++) + { + _chunkTimes[_chunksReceived++] = now; + byte check = result.get(); + _logger.debug("Check number " + check + " read"); + if (check != (byte) ((_chunksReceived - 1) % 128)) + { + _logger.error("Check number " + check + " read when expected " + (_chunksReceived % 128)); + } + _logger.debug("Chunk times recorded"); + + try + { + result.skip(_chunkSize - 1); + } + catch (IllegalArgumentException e) + { + _logger.error("Position was: " + result.position()); + _logger.error("Tried to skip to: " + (_chunkSize * i)); + _logger.error("limit was; " + result.limit()); + } + } + _logger.debug("Chunks received now " + _chunksReceived); + _logger.debug("Bytes received: " + _totalBytesReceived); + _partialBytesRead = result.remaining(); + + if (_partialBytesRead > 0) + { + _partialCheckNumber = result.get(); + } + + + if (_chunksReceived >= _chunkCount) + { + _notifier.countDown(); + } + + } + + public void exceptionCaught(IoSession session, Throwable cause) throws Exception + { + _logger.error("Error: " + cause, cause); + } + } + + public void startWriter() throws IOException, InterruptedException + { + + _maximumWriteQueueLength = 0; + + IoConnector ioConnector = null; + + if (Boolean.getBoolean("multinio")) + { + _logger.warn("Using MultiThread NIO"); + ioConnector = new org.apache.mina.transport.socket.nio.MultiThreadSocketConnector(); + } + else + { + _logger.warn("Using MINA NIO"); + ioConnector = new org.apache.mina.transport.socket.nio.SocketConnector(); + } + + SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getDefaultConfig().getSessionConfig(); + scfg.setTcpNoDelay(true); + scfg.setSendBufferSize(32768); + scfg.setReceiveBufferSize(32768); + + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + + + final InetSocketAddress address = new InetSocketAddress("localhost", _PORT); + _logger.info("Attempting connection to " + address); + + //Old mina style +// ioConnector.setHandler(new WriterHandler()); +// ConnectFuture future = ioConnector.connect(address); + ConnectFuture future = ioConnector.connect(address, new WriterHandler()); + // wait for connection to complete + future.join(); + _logger.info("Connection completed"); + // we call getSession which throws an IOException if there has been an error connecting + _session = future.getSession(); + + _chunkTimes = new long[_chunkCount]; + Thread t = new Thread(this); + t.start(); + t.join(); + _logger.info("Test Complete"); + } + + + public void test1k() throws IOException, InterruptedException + { + _logger.info("Starting 1k test"); + _chunkSize = 1024; + startWriter(); + } + + + public void test2k() throws IOException, InterruptedException + { + _logger.info("Starting 2k test"); + _chunkSize = 2048; + startWriter(); + } + + + public void test4k() throws IOException, InterruptedException + { + _logger.info("Starting 4k test"); + _chunkSize = 4096; + startWriter(); + } + + + public void test8k() throws IOException, InterruptedException + { + _logger.info("Starting 8k test"); + _chunkSize = 8192; + startWriter(); + } + + + public void test16k() throws IOException, InterruptedException + { + _logger.info("Starting 16k test"); + _chunkSize = 16384; + startWriter(); + } + + + public void test32k() throws IOException, InterruptedException + { + _logger.info("Starting 32k test"); + _chunkSize = 32768; + startWriter(); + } + + + public static int getIntArg(String[] args, int index, int defaultValue) + { + if (args.length > index) + { + try + { + return Integer.parseInt(args[index]); + } + catch (NumberFormatException e) + { + //Do nothing + } + } + return defaultValue; + } + + public static void main(String[] args) throws IOException, InterruptedException + { + _PORT = getIntArg(args, 0, _PORT); + + int test = getIntArg(args, 1, DEFAULT_TEST_SIZE); + + IOWriterClient w = new IOWriterClient(); + w._chunkCount = getIntArg(args, 2, w._chunkCount); + switch (test) + { + case 0: + w.test1k(); + w.test2k(); + w.test4k(); + w.test8k(); + w.test16k(); + w.test32k(); + break; + case 1: + w.test1k(); + break; + case 2: + w.test2k(); + break; + case 4: + w.test4k(); + break; + case 8: + w.test8k(); + break; + case 16: + w.test16k(); + break; + case 32: + w.test32k(); + break; + } + } +} diff --git a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java new file mode 100644 index 0000000000..423e98c67b --- /dev/null +++ b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java @@ -0,0 +1,157 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.SocketIOTest; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.filter.ReadThrottleFilterBuilder; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** Tests MINA socket performance. This acceptor simply reads data from the network and writes it back again. */ +public class IOWriterServer +{ + private static final Logger _logger = LoggerFactory.getLogger(IOWriterServer.class); + + static public int _PORT = 9999; + + private static final String DEFAULT_READ_BUFFER = "262144"; + private static final String DEFAULT_WRITE_BUFFER = "262144"; + + + private static class TestHandler extends IoHandlerAdapter + { + private int _sentCount = 0; + + private int _bytesSent = 0; + + private int _receivedCount = 0; + + public void sessionCreated(IoSession ioSession) throws java.lang.Exception + { + IoFilterChain chain = ioSession.getFilterChain(); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER))); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + + writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER))); + + writefilter.attach(chain); + + } + + public void messageReceived(IoSession session, Object message) throws Exception + { + ((ByteBuffer) message).acquire(); + session.write(message); + + if (_logger.isDebugEnabled()) + { + _bytesSent += ((ByteBuffer) message).remaining(); + + _sentCount++; + + if (_sentCount % 1000 == 0) + { + _logger.debug("Bytes sent: " + _bytesSent); + } + } + } + + public void messageSent(IoSession session, Object message) throws Exception + { + if (_logger.isDebugEnabled()) + { + ++_receivedCount; + + if (_receivedCount % 1000 == 0) + { + _logger.debug("Receieved count " + _receivedCount); + } + } + } + + public void exceptionCaught(IoSession session, Throwable cause) throws Exception + { + _logger.error("Error: " + cause, cause); + } + } + + public void startAcceptor() throws IOException + { + IoAcceptor acceptor; + if (Boolean.getBoolean("multinio")) + { + _logger.warn("Using MultiThread NIO"); + acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(); + } + else + { + _logger.warn("Using MINA NIO"); + acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(); + } + + + SocketSessionConfig sc = (SocketSessionConfig) acceptor.getDefaultConfig().getSessionConfig(); + sc.setTcpNoDelay(true); + sc.setSendBufferSize(32768); + sc.setReceiveBufferSize(32768); + + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + + //The old mina style +// acceptor.setLocalAddress(new InetSocketAddress(_PORT)); +// acceptor.setHandler(new TestHandler()); +// acceptor.bind(); + acceptor.bind(new InetSocketAddress(_PORT), new TestHandler()); + + _logger.info("Bound on port " + _PORT + ":" + _logger.isDebugEnabled()); + _logger.debug("debug on"); + } + + public static void main(String[] args) throws IOException + { + + if (args.length > 0) + { + try + { + _PORT = Integer.parseInt(args[0]); + } + catch (NumberFormatException e) + { + //IGNORE so use default port 9999; + } + } + + IOWriterServer a = new IOWriterServer(); + a.startAcceptor(); + } +} diff --git a/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java b/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java new file mode 100644 index 0000000000..92e7ce0a80 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing; + +import junit.framework.TestCase; +public class AMQShortStringTest extends TestCase +{ + + public static final AMQShortString HELLO = new AMQShortString("Hello"); + public static final AMQShortString HELL = new AMQShortString("Hell"); + public static final AMQShortString GOODBYE = new AMQShortString("Goodbye"); + public static final AMQShortString GOOD = new AMQShortString("Good"); + public static final AMQShortString BYE = new AMQShortString("BYE"); + + public void testStartsWith() + { + assertTrue(HELLO.startsWith(HELL)); + + assertFalse(HELL.startsWith(HELLO)); + + assertTrue(GOODBYE.startsWith(GOOD)); + + assertFalse(GOOD.startsWith(GOODBYE)); + } + + public void testEndWith() + { + assertFalse(HELL.endsWith(HELLO)); + + assertTrue(GOODBYE.endsWith(new AMQShortString("bye"))); + + assertFalse(GOODBYE.endsWith(BYE)); + } + + + public void testTokenize() + { + AMQShortString dotSeparatedWords = new AMQShortString("this.is.a.test.with.1.2.3.-numbers-and-then--dashes-"); + AMQShortStringTokenizer dotTokenizer = dotSeparatedWords.tokenize((byte) '.'); + + assertTrue(dotTokenizer.hasMoreTokens()); + assertEquals(new AMQShortString("this"),(dotTokenizer.nextToken())); + assertTrue(dotTokenizer.hasMoreTokens()); + assertEquals(new AMQShortString("is"),(dotTokenizer.nextToken())); + assertTrue(dotTokenizer.hasMoreTokens()); + assertEquals(new AMQShortString("a"),(dotTokenizer.nextToken())); + assertTrue(dotTokenizer.hasMoreTokens()); + assertEquals(new AMQShortString("test"),(dotTokenizer.nextToken())); + assertTrue(dotTokenizer.hasMoreTokens()); + assertEquals(new AMQShortString("with"),(dotTokenizer.nextToken())); + assertTrue(dotTokenizer.hasMoreTokens()); + assertEquals(dotTokenizer.nextToken().toIntValue() , 1); + assertTrue(dotTokenizer.hasMoreTokens()); + assertEquals(dotTokenizer.nextToken().toIntValue() , 2); + assertTrue(dotTokenizer.hasMoreTokens()); + assertEquals(dotTokenizer.nextToken().toIntValue() , 3); + assertTrue(dotTokenizer.hasMoreTokens()); + AMQShortString dashString = dotTokenizer.nextToken(); + assertEquals(new AMQShortString("-numbers-and-then--dashes-"),(dashString)); + + AMQShortStringTokenizer dashTokenizer = dashString.tokenize((byte)'-'); + assertEquals(dashTokenizer.countTokens(), 7); + + AMQShortString[] expectedResults = new AMQShortString[] + { AMQShortString.EMPTY_STRING, + new AMQShortString("numbers"), + new AMQShortString("and"), + new AMQShortString("then"), + AMQShortString.EMPTY_STRING, + new AMQShortString("dashes"), + AMQShortString.EMPTY_STRING }; + + for(int i = 0; i < 7; i++) + { + assertTrue(dashTokenizer.hasMoreTokens()); + assertEquals(dashTokenizer.nextToken(), expectedResults[i]); + } + + assertFalse(dotTokenizer.hasMoreTokens()); + } + + + public void testEquals() + { + assertEquals(GOODBYE, new AMQShortString("Goodbye")); + assertEquals(new AMQShortString("A"), new AMQShortString("A")); + assertFalse(new AMQShortString("A").equals(new AMQShortString("a"))); + } + + +} diff --git a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java index e63b0df770..007da7423e 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java @@ -1,21 +1,21 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.framing; @@ -439,6 +439,60 @@ public class PropertyFieldTableTest extends TestCase Assert.assertEquals("Hello", table1.getString("value")); } + /** Check that a nested field table parameter correctly encodes and decodes to a byte buffer. */ + public void testNestedFieldTable() + { + byte[] testBytes = new byte[] { 0, 1, 2, 3, 4, 5 }; + + FieldTable outerTable = new FieldTable(); + FieldTable innerTable = new FieldTable(); + + // Put some stuff in the inner table. + innerTable.setBoolean("bool", true); + innerTable.setByte("byte", Byte.MAX_VALUE); + innerTable.setBytes("bytes", testBytes); + innerTable.setChar("char", 'c'); + innerTable.setDouble("double", Double.MAX_VALUE); + innerTable.setFloat("float", Float.MAX_VALUE); + innerTable.setInteger("int", Integer.MAX_VALUE); + innerTable.setLong("long", Long.MAX_VALUE); + innerTable.setShort("short", Short.MAX_VALUE); + innerTable.setString("string", "hello"); + innerTable.setString("null-string", null); + + // Put the inner table in the outer one. + outerTable.setFieldTable("innerTable", innerTable); + + // Write the outer table into the buffer. + final ByteBuffer buffer = ByteBuffer.allocate((int) outerTable.getEncodedSize() + 4); + outerTable.writeToBuffer(buffer); + buffer.flip(); + + // Extract the table back from the buffer again. + try + { + FieldTable extractedOuterTable = EncodingUtils.readFieldTable(buffer); + + FieldTable extractedTable = extractedOuterTable.getFieldTable("innerTable"); + + Assert.assertEquals((Boolean) true, extractedTable.getBoolean("bool")); + Assert.assertEquals((Byte) Byte.MAX_VALUE, extractedTable.getByte("byte")); + assertBytesEqual(testBytes, extractedTable.getBytes("bytes")); + Assert.assertEquals((Character) 'c', extractedTable.getCharacter("char")); + Assert.assertEquals(Double.MAX_VALUE, extractedTable.getDouble("double")); + Assert.assertEquals(Float.MAX_VALUE, extractedTable.getFloat("float")); + Assert.assertEquals((Integer) Integer.MAX_VALUE, extractedTable.getInteger("int")); + Assert.assertEquals((Long) Long.MAX_VALUE, extractedTable.getLong("long")); + Assert.assertEquals((Short) Short.MAX_VALUE, extractedTable.getShort("short")); + Assert.assertEquals("hello", extractedTable.getString("string")); + Assert.assertEquals(null, extractedTable.getString("null-string")); + } + catch (AMQFrameDecodingException e) + { + fail("Failed to decode field table with nested inner table."); + } + } + public void testValues() { FieldTable table = new FieldTable(); |
