diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-01-24 15:43:23 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-01-24 15:43:23 +0000 |
| commit | 02bf06d891b9baab1d03080a9332138bdf2ea0b9 (patch) | |
| tree | 33016507be7862396047a03e65c8817e69158dc5 /java/common/src | |
| parent | 0a0e1e1417c83789954e9cc19787297009b8803e (diff) | |
| download | qpid-python-02bf06d891b9baab1d03080a9332138bdf2ea0b9.tar.gz | |
Merged revisions 598285,598619,598721,598834-598835,599375,599531,599533,599572,599805,602134,604151,604928,605536,605542,606015-606016 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1
........
r598285 | ritchiem | 2007-11-26 14:16:01 +0000 (Mon, 26 Nov 2007) | 3 lines
QPID-92, QPID-564 : Upgraded Mina to 1.0.1 still not good enough but all future versions currently have a bug with the CumulativeProtocolDecoder. It compact()s the buffer which breaks slices. Added MultiThread Support which is some of the feature set of QPID-564
........
r598619 | ritchiem | 2007-11-27 12:51:14 +0000 (Tue, 27 Nov 2007) | 1 line
Renamed POMs to M2.1.1 Removed erroneous equals() in SpecificMethodFrameListenerTest
........
r598721 | ritchiem | 2007-11-27 18:09:33 +0000 (Tue, 27 Nov 2007) | 1 line
QPID-621 : Patch Supplied by Aidan Skinner. Msg Ack after msg consumer is closed.
........
r598834 | ritchiem | 2007-11-28 00:45:32 +0000 (Wed, 28 Nov 2007) | 14 lines
QPID-679 : Patch provided by Aidan Skinner and additional from odd problems during test runs.
AMQChannel - Catch and log AMQException occuring when requeue()-ing. Previously exceptions wouldn't be caught at all. The requeue() is called during closure so there is nothing we can do protocol wise on error other than log the issue and continue with any other shutdown that is needed.
AMQMinaProtocolSession & AMQPFastProtocolHandler . Additions to catch and log AMQExceptions. Changes to AMQMinaProtocolSession were done to ignore all input on a closing session other than the close-ok. Previously only Protocol frames were ignored this resulted in Content*Body-s still being processed. Additional checks were made for the MessageStoreClosedException to log and continue. As said else were we need to seperate protocol exceptoions(AMQException) from internal code exception handling. Further All AMQExceptions occuring in the frameReceived method are now caught and logged. Allowing them to propogate higher will only result in thread death.
AMQPFastProtocolHandler Caught AMQExceptions occuring whilst closing the session. Again allowing these to continue will result in thread death. There is not a lot that can be done other than log the problem as the session is already closed by this point. Prevented the stacktrace associated with a session exception being printed in the exceptionCaught method when the problem was an IO Exception. This doesn't add anything useful and only adds to the log file sizes.
ApplicationRegistry - Added removeAll option which ensures that all ARs are correctly purged so that we can attempt to clean up between Unit Tests.
MemoryMessageStore - This was causing us real problems during the failover testing. Similar checks should probably be made to any other Message Store Impl. The issue was that when shutting down the broker the MS.close() method is called this sets all the storage to null. However, there may still be message processing going on as the close() does not attempt to stop connection processing. Hence we now check to see if the Store is close throwing a MSClosedException if required. This prevents NPEs that have been seen during Unit failover testing. In fact the close() is called as a request to shutdown the ApplicationRegistry, but this only occurs from tests and broker shutdown, no attempt to unbind or prevent further connections during this period is yet done.
CLIENT CHANGES
AMQConnection - Added method to check if failover is in progress.
AMQClient - Upgraded acknowledge() exception to JMSException for errors due to failover. Also , added call to update consumers as a result of failover.
BasicMessageConsumer - Changes to acquireReceiving to take in to consideration blocking for failover to occur. wrt receiveNoWait.. which previously blocked for failover to complete... not exactly noWait. acknowledge will now
TransportConnection - Update to ensure all inVM brokers are correctly killed.
FailoverTest - QPID-679 - Finder of all the above problems.
........
r598835 | ritchiem | 2007-11-28 01:01:05 +0000 (Wed, 28 Nov 2007) | 1 line
CommitRollbackTest - this one just was never right.. now we have something better.
........
r599375 | ritchiem | 2007-11-29 10:58:08 +0000 (Thu, 29 Nov 2007) | 1 line
Update to broker to address fanout python failure.
........
r599531 | ritchiem | 2007-11-29 17:56:12 +0000 (Thu, 29 Nov 2007) | 1 line
QPID-92 QPID-564 Forgot to upgrade mina to 1.0.1
........
r599533 | ritchiem | 2007-11-29 18:25:21 +0000 (Thu, 29 Nov 2007) | 1 line
QPID-564 QPID-92 Tidied up a few points and fixed infinite loop in Read IO Thread
........
r599572 | ritchiem | 2007-11-29 20:56:22 +0000 (Thu, 29 Nov 2007) | 2 lines
Mina Fix: Vm Pipe Starts Connection session before acceptor session. This results in protocol frames arriving before the protocol decoder has been configured on the InVM Broker. Verification of this could be done by adding a client side filter that delays the first message by a few seconds.
........
r599805 | ritchiem | 2007-11-30 12:47:08 +0000 (Fri, 30 Nov 2007) | 1 line
Added new simple Request/Repsonse code as my last commit here seems to have missed the actual code.
........
r602134 | rupertlssmith | 2007-12-07 16:00:14 +0000 (Fri, 07 Dec 2007) | 1 line
Added JDNI config for two broker, failover setup for failover tests. Also passed into FT tests config.
........
r604151 | ritchiem | 2007-12-14 10:40:37 +0000 (Fri, 14 Dec 2007) | 2 lines
QPID-707 : Added new test to check message count on broker as messages are consumed to ensure that an ack is sent at 5000 mgs. Added acks on message consumer closure.
Augmented VMTestCase to have helper methods for accessing broker statistics.
........
r604928 | rupertlssmith | 2007-12-17 17:00:10 +0000 (Mon, 17 Dec 2007) | 1 line
DUPS_OK mode set to be same as AUTO_ACK, fixed broken dups ok test.
........
r605536 | rupertlssmith | 2007-12-19 13:40:05 +0000 (Wed, 19 Dec 2007) | 1 line
Messages were being sent mandatory by default, set to false.
........
r605542 | rupertlssmith | 2007-12-19 13:53:44 +0000 (Wed, 19 Dec 2007) | 1 line
Changed test configs to use colons instead of commas.
........
r606015 | rgodfrey | 2007-12-20 20:08:01 +0000 (Thu, 20 Dec 2007) | 2 lines
QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers
Ack each individual message on commit, not use multiple acks
........
r606016 | rgodfrey | 2007-12-20 20:12:25 +0000 (Thu, 20 Dec 2007) | 2 lines
QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers
Ack each individual message on commit, not use multiple acks
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@614906 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
4 files changed, 483 insertions, 144 deletions
diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java new file mode 100644 index 0000000000..810d12f472 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.codec; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; + +/** + * A {@link ProtocolDecoder} that cumulates the content of received + * buffers to a <em>cumulative buffer</em> to help users implement decoders. + * <p> + * If the received {@link ByteBuffer} is only a part of a message. + * decoders should cumulate received buffers to make a message complete or + * to postpone decoding until more buffers arrive. + * <p> + * Here is an example decoder that decodes CRLF terminated lines into + * <code>Command</code> objects: + * <pre> + * public class CRLFTerminatedCommandLineDecoder + * extends CumulativeProtocolDecoder { + * + * private Command parseCommand(ByteBuffer in) { + * // Convert the bytes in the specified buffer to a + * // Command object. + * ... + * } + * + * protected boolean doDecode(IoSession session, ByteBuffer in, + * ProtocolDecoderOutput out) + * throws Exception { + * + * // Remember the initial position. + * int start = in.position(); + * + * // Now find the first CRLF in the buffer. + * byte previous = 0; + * while (in.hasRemaining()) { + * byte current = in.get(); + * + * if (previous == '\r' && current == '\n') { + * // Remember the current position and limit. + * int position = in.position(); + * int limit = in.limit(); + * try { + * in.position(start); + * in.limit(position); + * // The bytes between in.position() and in.limit() + * // now contain a full CRLF terminated line. + * out.write(parseCommand(in.slice())); + * } finally { + * // Set the position to point right after the + * // detected line and set the limit to the old + * // one. + * in.position(position); + * in.limit(limit); + * } + * // Decoded one line; CumulativeProtocolDecoder will + * // call me again until I return false. So just + * // return true until there are no more lines in the + * // buffer. + * return true; + * } + * + * previous = current; + * } + * + * // Could not find CRLF in the buffer. Reset the initial + * // position to the one we recorded above. + * in.position(start); + * + * return false; + * } + * } + * </pre> + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 598285 $, $Date: 2007-11-26 14:16:01 +0000 (Mon, 26 Nov 2007) $ + */ +public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter { + + private static final String BUFFER = OurCumulativeProtocolDecoder.class + .getName() + + ".Buffer"; + + /** + * Creates a new instance. + */ + protected OurCumulativeProtocolDecoder() { + } + + /** + * Cumulates content of <tt>in</tt> into internal buffer and forwards + * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. + * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> + * and the cumulative buffer is NOT compacted after decoding ends. + * + * @throws IllegalStateException if your <tt>doDecode()</tt> returned + * <tt>true</tt> not consuming the cumulative buffer. + */ + public void decode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception { + boolean usingSessionBuffer = true; + ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER); + // If we have a session buffer, append data to that; otherwise + // use the buffer read from the network directly. + if (buf != null) { + buf.put(in); + buf.flip(); + } else { + buf = in; + usingSessionBuffer = false; + } + + for (;;) { + int oldPos = buf.position(); + boolean decoded = doDecode(session, buf, out); + if (decoded) { + if (buf.position() == oldPos) { + throw new IllegalStateException( + "doDecode() can't return true when buffer is not consumed."); + } + + if (!buf.hasRemaining()) { + break; + } + } else { + break; + } + } + + + // if there is any data left that cannot be decoded, we store + // it in a buffer in the session and next time this decoder is + // invoked the session buffer gets appended to + if (buf.hasRemaining()) { + storeRemainingInSession(buf, session); + } else { + if (usingSessionBuffer) + removeSessionBuffer(session); + } + } + + /** + * Implement this method to consume the specified cumulative buffer and + * decode its content into message(s). + * + * @param in the cumulative buffer + * @return <tt>true</tt> if and only if there's more to decode in the buffer + * and you want to have <tt>doDecode</tt> method invoked again. + * Return <tt>false</tt> if remaining data is not enough to decode, + * then this method will be invoked again when more data is cumulated. + * @throws Exception if cannot decode <tt>in</tt>. + */ + protected abstract boolean doDecode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception; + + /** + * Releases the cumulative buffer used by the specified <tt>session</tt>. + * Please don't forget to call <tt>super.dispose( session )</tt> when + * you override this method. + */ + public void dispose(IoSession session) throws Exception { + removeSessionBuffer(session); + } + + private void removeSessionBuffer(IoSession session) { + ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER); + if (buf != null) { + buf.release(); + } + } + + private void storeRemainingInSession(ByteBuffer buf, IoSession session) { + ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity()); + remainingBuf.setAutoExpand(true); + remainingBuf.order(buf.order()); + remainingBuf.put(buf); + session.setAttribute(BUFFER, remainingBuf); + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java index 202ac1a530..cb24102edd 100644 --- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java @@ -376,8 +376,7 @@ public class MultiThreadSocketConnector extends SocketConnector // Set the ConnectFuture of the specified session, which will be // removed and notified by AbstractIoFilterChain eventually. -// session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); - session.setAttribute(AbstractIoFilterChain.class.getName() + ".connectFuture", connectFuture); + session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); // Forward the remaining process to the SocketIoProcessor. session.getIoProcessor().addNew(session); diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java index 11c54bb248..03838ca3f1 100644 --- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java @@ -66,9 +66,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private ReentrantLock trafficMaskUpdateLock = new ReentrantLock(); - /** - * @noinspection FieldAccessedSynchronizedAndUnsynchronized - */ + /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ private volatile Selector selector, writeSelector; private final Queue newSessions = new Queue(); @@ -90,11 +88,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor this.executor = executor; } - void addNew( SocketSessionImpl session ) throws IOException + void addNew(SocketSessionImpl session) throws IOException { - synchronized( newSessions ) + synchronized (newSessions) { - newSessions.push( session ); + newSessions.push(session); } startupWorker(); @@ -103,16 +101,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor writeSelector.wakeup(); } - void remove( SocketSessionImpl session ) throws IOException + void remove(SocketSessionImpl session) throws IOException { - scheduleRemove( session ); + scheduleRemove(session); startupWorker(); selector.wakeup(); } private void startupWorker() throws IOException { - synchronized(readLock) + synchronized (readLock) { if (readWorker == null) { @@ -122,7 +120,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } - synchronized(writeLock) + synchronized (writeLock) { if (writeWorker == null) { @@ -134,38 +132,38 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } - void flush( SocketSessionImpl session ) + void flush(SocketSessionImpl session) { - scheduleFlush( session ); + scheduleFlush(session); Selector selector = this.writeSelector; - if( selector != null ) + if (selector != null) { selector.wakeup(); } } - void updateTrafficMask( SocketSessionImpl session ) + void updateTrafficMask(SocketSessionImpl session) { - scheduleTrafficControl( session ); + scheduleTrafficControl(session); Selector selector = this.selector; - if( selector != null ) + if (selector != null) { selector.wakeup(); } } - private void scheduleRemove( SocketSessionImpl session ) + private void scheduleRemove(SocketSessionImpl session) { - synchronized( removingSessions ) + synchronized (removingSessions) { - removingSessions.push( session ); + removingSessions.push(session); } } - private void scheduleFlush( SocketSessionImpl session ) + private void scheduleFlush(SocketSessionImpl session) { - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { //if flushingSessions grows to contain Integer.MAX_VALUE sessions // then this will fail. @@ -176,31 +174,31 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } - private void scheduleTrafficControl( SocketSessionImpl session ) + private void scheduleTrafficControl(SocketSessionImpl session) { - synchronized( trafficControllingSessions ) + synchronized (trafficControllingSessions) { - trafficControllingSessions.push( session ); + trafficControllingSessions.push(session); } } private void doAddNewReader() throws InterruptedException { - if( newSessions.isEmpty() ) + if (newSessions.isEmpty()) { return; } - for( ; ; ) + for (; ;) { MultiThreadSocketSessionImpl session; - synchronized( newSessions ) + synchronized (newSessions) { session = (MultiThreadSocketSessionImpl) newSessions.peek(); } - if( session == null ) + if (session == null) { break; } @@ -211,21 +209,20 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { - ch.configureBlocking( false ); - session.setSelectionKey( ch.register( selector, - SelectionKey.OP_READ, - session ) ); - + ch.configureBlocking(false); + session.setSelectionKey(ch.register(selector, + SelectionKey.OP_READ, + session)); //System.out.println("ReadDebug:"+"Awaiting Registration"); session.awaitRegistration(); sessionCreated(session); } - catch( IOException e ) + catch (IOException e) { // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } } } @@ -242,7 +239,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { MultiThreadSocketSessionImpl session; - synchronized(newSessions) + synchronized (newSessions) { session = (MultiThreadSocketSessionImpl) newSessions.peek(); } @@ -257,7 +254,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { ch.configureBlocking(false); - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { flushingSessionsSet.add(session); } @@ -275,17 +272,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } } } - private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException { MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; - synchronized(newSessions) + synchronized (newSessions) { if (!session.created()) { @@ -294,7 +290,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). - session.getServiceListeners().fireSessionCreated( session ); + session.getServiceListeners().fireSessionCreated(session); session.doneCreation(); } @@ -303,21 +299,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void doRemove() { - if( removingSessions.isEmpty() ) + if (removingSessions.isEmpty()) { return; } - for( ; ; ) + for (; ;) { MultiThreadSocketSessionImpl session; - synchronized( removingSessions ) + synchronized (removingSessions) { session = (MultiThreadSocketSessionImpl) removingSessions.pop(); } - if( session == null ) + if (session == null) { break; } @@ -330,7 +326,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // (In case that Session.close() is called before addSession() is processed) if (key == null || writeKey == null) { - scheduleRemove( session ); + scheduleRemove(session); break; } // skip if channel is already closed @@ -342,24 +338,24 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session)); - synchronized(readLock) + synchronized (readLock) { key.cancel(); } - synchronized(writeLock) + synchronized (writeLock) { writeKey.cancel(); } ch.close(); } - catch( IOException e ) + catch (IOException e) { - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } finally { - releaseWriteBuffers( session ); - session.getServiceListeners().fireSessionDestroyed( session ); + releaseWriteBuffers(session); + session.getServiceListeners().fireSessionDestroyed(session); } } } @@ -368,16 +364,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { Iterator it = selectedKeys.iterator(); - while( it.hasNext() ) + while (it.hasNext()) { - SelectionKey key = ( SelectionKey ) it.next(); + SelectionKey key = (SelectionKey) it.next(); MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment(); - synchronized(readLock) + synchronized (readLock) { if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable()) { - read( session ); + read(session); } } @@ -395,7 +391,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor SelectionKey key = (SelectionKey) it.next(); SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - synchronized(writeLock) + synchronized (writeLock) { if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable()) { @@ -403,7 +399,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // Clear OP_WRITE key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { flushingSessions.offer(session); } @@ -424,7 +420,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor int totalReadBytes = 0; - for (; totalReadBytes <= MAX_READ_BYTES_PER_SESSION;) + while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION) { ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); SocketChannel ch = session.getChannel(); @@ -482,6 +478,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor scheduleRemove(session); } session.getFilterChain().fireExceptionCaught(session, e); + + //Stop Reading this session. + return; } finally { @@ -507,12 +506,12 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { lastIdleReadCheckTime = currentTime; Set keys = selector.keys(); - if( keys != null ) + if (keys != null) { - for( Iterator it = keys.iterator(); it.hasNext(); ) + for (Iterator it = keys.iterator(); it.hasNext();) { - SelectionKey key = ( SelectionKey ) it.next(); - SocketSessionImpl session = ( SocketSessionImpl ) key.attachment(); + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); notifyReadIdleness(session, currentTime); } } @@ -542,15 +541,15 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void notifyReadIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ), - IdleStatus.BOTH_IDLE, - Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) ); + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), + IdleStatus.BOTH_IDLE, + Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.READER_IDLE ), - IdleStatus.READER_IDLE, - Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) ); + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.READER_IDLE), + IdleStatus.READER_IDLE, + Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); notifyWriteTimeout(session, currentTime, session .getWriteTimeoutInMillis(), session.getLastWriteTime()); @@ -559,51 +558,51 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void notifyWriteIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0( - session, currentTime, + session, currentTime, session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0( session, currentTime, - session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ), - IdleStatus.WRITER_IDLE, - Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) ); + session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), + IdleStatus.WRITER_IDLE, + Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); - notifyWriteTimeout( session, currentTime, session - .getWriteTimeoutInMillis(), session.getLastWriteTime() ); + notifyWriteTimeout(session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime()); } - private void notifyIdleness0( SocketSessionImpl session, long currentTime, - long idleTime, IdleStatus status, - long lastIoTime ) + private void notifyIdleness0(SocketSessionImpl session, long currentTime, + long idleTime, IdleStatus status, + long lastIoTime) { - if( idleTime > 0 && lastIoTime != 0 - && ( currentTime - lastIoTime ) >= idleTime ) + if (idleTime > 0 && lastIoTime != 0 + && (currentTime - lastIoTime) >= idleTime) { - session.increaseIdleCount( status ); - session.getFilterChain().fireSessionIdle( session, status ); + session.increaseIdleCount(status); + session.getFilterChain().fireSessionIdle(session, status); } } - private void notifyWriteTimeout( SocketSessionImpl session, - long currentTime, - long writeTimeout, long lastIoTime ) + private void notifyWriteTimeout(SocketSessionImpl session, + long currentTime, + long writeTimeout, long lastIoTime) { MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session; SelectionKey key = sesh.getWriteSelectionKey(); - synchronized(writeLock) - { - if( writeTimeout > 0 - && ( currentTime - lastIoTime ) >= writeTimeout - && key != null && key.isValid() - && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 ) + synchronized (writeLock) { - session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException() ); + if (writeTimeout > 0 + && (currentTime - lastIoTime) >= writeTimeout + && key != null && key.isValid() + && (key.interestOps() & SelectionKey.OP_WRITE) != 0) + { + session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException()); + } } } - } private SocketSessionImpl getNextFlushingSession() { @@ -612,9 +611,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void releaseSession(SocketSessionImpl session) { - synchronized(session.getWriteRequestQueue()) + synchronized (session.getWriteRequestQueue()) { - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { if (session.getScheduledWriteRequests() > 0) { @@ -642,7 +641,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor WriteRequest req; //Should this be synchronized? - synchronized(writeRequestQueue) + synchronized (writeRequestQueue) { while ((req = (WriteRequest) writeRequestQueue.pop()) != null) { @@ -668,9 +667,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null) { - if( !session.isConnected() ) + if (!session.isConnected()) { - releaseWriteBuffers( session ); + releaseWriteBuffers(session); releaseSession(session); continue; } @@ -678,14 +677,14 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor SelectionKey key = session.getWriteSelectionKey(); // Retry later if session is not yet fully initialized. // (In case that Session.write() is called before addSession() is processed) - if( key == null ) + if (key == null) { - scheduleFlush( session ); + scheduleFlush(session); releaseSession(session); continue; } // skip if channel is already closed - if( !key.isValid() ) + if (!key.isValid()) { releaseSession(session); continue; @@ -698,11 +697,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor releaseSession(session); } } - catch( IOException e ) + catch (IOException e) { releaseSession(session); - scheduleRemove( session ); - session.getFilterChain().fireExceptionCaught( session, e ); + scheduleRemove(session); + session.getFilterChain().fireExceptionCaught(session, e); } } @@ -714,32 +713,32 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; // Clear OP_WRITE SelectionKey key = session.getWriteSelectionKey(); - synchronized(writeLock) + synchronized (writeLock) { - key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) ); + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } SocketChannel ch = session.getChannel(); Queue writeRequestQueue = session.getWriteRequestQueue(); long totalFlushedBytes = 0; - for( ; ; ) + while (true) { WriteRequest req; - synchronized( writeRequestQueue ) + synchronized (writeRequestQueue) { - req = ( WriteRequest ) writeRequestQueue.first(); + req = (WriteRequest) writeRequestQueue.first(); } - if( req == null ) + if (req == null) { break; } - ByteBuffer buf = ( ByteBuffer ) req.getMessage(); - if( buf.remaining() == 0 ) + ByteBuffer buf = (ByteBuffer) req.getMessage(); + if (buf.remaining() == 0) { - synchronized( writeRequestQueue ) + synchronized (writeRequestQueue) { writeRequestQueue.pop(); } @@ -747,7 +746,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor session.increaseWrittenMessages(); buf.reset(); - session.getFilterChain().fireMessageSent( session, req ); + session.getFilterChain().fireMessageSent(session, req); continue; } @@ -755,23 +754,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor int writtenBytes = 0; // Reported as DIRMINA-362 - //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. -// if (key.isWritable()) + //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. + if (key.isWritable()) { - try - { - writtenBytes = ch.write(buf.buf()); - totalFlushedBytes += writtenBytes; - } - catch (IOException ioe) - { - throw ioe; - } + writtenBytes = ch.write(buf.buf()); + totalFlushedBytes += writtenBytes; } - if( writtenBytes > 0 ) + if (writtenBytes > 0) { - session.increaseWrittenBytes( writtenBytes ); + session.increaseWrittenBytes(writtenBytes); } if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) @@ -911,7 +903,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor if (writeSelector.keys().isEmpty()) { - synchronized(writeLock) + synchronized (writeLock) { if (writeSelector.keys().isEmpty() && newSessions.isEmpty()) @@ -963,7 +955,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader"); //System.out.println("ReadDebug:"+"Startup"); - for( ; ; ) + for (; ;) { try { @@ -972,7 +964,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor doAddNewReader(); doUpdateTrafficMask(); - if( nKeys > 0 ) + if (nKeys > 0) { //System.out.println("ReadDebug:"+nKeys + " keys from selector"); @@ -987,21 +979,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor doRemove(); notifyReadIdleness(); - if( selector.keys().isEmpty() ) + if (selector.keys().isEmpty()) { - synchronized(readLock) + synchronized (readLock) { - if( selector.keys().isEmpty() && newSessions.isEmpty() ) + if (selector.keys().isEmpty() && newSessions.isEmpty()) { readWorker = null; try { selector.close(); } - catch( IOException e ) + catch (IOException e) { - ExceptionMonitor.getInstance().exceptionCaught( e ); + ExceptionMonitor.getInstance().exceptionCaught(e); } finally { @@ -1013,17 +1005,17 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } } - catch( Throwable t ) + catch (Throwable t) { - ExceptionMonitor.getInstance().exceptionCaught( t ); + ExceptionMonitor.getInstance().exceptionCaught(t); try { - Thread.sleep( 1000 ); + Thread.sleep(1000); } - catch( InterruptedException e1 ) + catch (InterruptedException e1) { - ExceptionMonitor.getInstance().exceptionCaught( e1 ); + ExceptionMonitor.getInstance().exceptionCaught(e1); } } } diff --git a/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java new file mode 100644 index 0000000000..16e74b17d2 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.vmpipe; + +import java.io.IOException; +import java.net.SocketAddress; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.common.support.BaseIoConnector; +import org.apache.mina.common.support.BaseIoConnectorConfig; +import org.apache.mina.common.support.BaseIoSessionConfig; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.transport.vmpipe.support.VmPipe; +import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker; +import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl; +import org.apache.mina.util.AnonymousSocketAddress; + +/** + * Connects to {@link IoHandler}s which is bound on the specified + * {@link VmPipeAddress}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class QpidVmPipeConnector extends VmPipeConnector +{ + private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {}; + private final IoServiceConfig defaultConfig = new BaseIoConnectorConfig() + { + public IoSessionConfig getSessionConfig() + { + return CONFIG; + } + }; + + /** + * Creates a new instance. + */ + public QpidVmPipeConnector() + { + } + + public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config ) + { + return connect( address, null, handler, config ); + } + + public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config ) + { + if( address == null ) + throw new NullPointerException( "address" ); + if( handler == null ) + throw new NullPointerException( "handler" ); + if( ! ( address instanceof VmPipeAddress ) ) + throw new IllegalArgumentException( + "address must be VmPipeAddress." ); + + if( config == null ) + { + config = getDefaultConfig(); + } + + VmPipe entry = ( VmPipe ) VmPipeAcceptor.boundHandlers.get( address ); + if( entry == null ) + { + return DefaultConnectFuture.newFailedFuture( + new IOException( "Endpoint unavailable: " + address ) ); + } + + DefaultConnectFuture future = new DefaultConnectFuture(); + VmPipeSessionImpl localSession = + new VmPipeSessionImpl( + this, + config, + getListeners(), + new Object(), // lock + new AnonymousSocketAddress(), + handler, + entry ); + + // initialize acceptor session + VmPipeSessionImpl remoteSession = localSession.getRemoteSession(); + try + { + IoFilterChain filterChain = remoteSession.getFilterChain(); + entry.getAcceptor().getFilterChainBuilder().buildFilterChain( filterChain ); + entry.getConfig().getFilterChainBuilder().buildFilterChain( filterChain ); + entry.getConfig().getThreadModel().buildFilterChain( filterChain ); + + // The following sentences don't throw any exceptions. + entry.getListeners().fireSessionCreated( remoteSession ); + VmPipeIdleStatusChecker.getInstance().addSession( remoteSession ); + } + catch( Throwable t ) + { + ExceptionMonitor.getInstance().exceptionCaught( t ); + remoteSession.close(); + } + + + // initialize connector session + try + { + IoFilterChain filterChain = localSession.getFilterChain(); + this.getFilterChainBuilder().buildFilterChain( filterChain ); + config.getFilterChainBuilder().buildFilterChain( filterChain ); + config.getThreadModel().buildFilterChain( filterChain ); + + // The following sentences don't throw any exceptions. + localSession.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future ); + getListeners().fireSessionCreated( localSession ); + VmPipeIdleStatusChecker.getInstance().addSession( localSession); + } + catch( Throwable t ) + { + future.setException( t ); + } + + + + return future; + } + + public IoServiceConfig getDefaultConfig() + { + return defaultConfig; + } +}
\ No newline at end of file |
