From 02bf06d891b9baab1d03080a9332138bdf2ea0b9 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 24 Jan 2008 15:43:23 +0000 Subject: Merged revisions 598285,598619,598721,598834-598835,599375,599531,599533,599572,599805,602134,604151,604928,605536,605542,606015-606016 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1 ........ r598285 | ritchiem | 2007-11-26 14:16:01 +0000 (Mon, 26 Nov 2007) | 3 lines QPID-92, QPID-564 : Upgraded Mina to 1.0.1 still not good enough but all future versions currently have a bug with the CumulativeProtocolDecoder. It compact()s the buffer which breaks slices. Added MultiThread Support which is some of the feature set of QPID-564 ........ r598619 | ritchiem | 2007-11-27 12:51:14 +0000 (Tue, 27 Nov 2007) | 1 line Renamed POMs to M2.1.1 Removed erroneous equals() in SpecificMethodFrameListenerTest ........ r598721 | ritchiem | 2007-11-27 18:09:33 +0000 (Tue, 27 Nov 2007) | 1 line QPID-621 : Patch Supplied by Aidan Skinner. Msg Ack after msg consumer is closed. ........ r598834 | ritchiem | 2007-11-28 00:45:32 +0000 (Wed, 28 Nov 2007) | 14 lines QPID-679 : Patch provided by Aidan Skinner and additional from odd problems during test runs. AMQChannel - Catch and log AMQException occuring when requeue()-ing. Previously exceptions wouldn't be caught at all. The requeue() is called during closure so there is nothing we can do protocol wise on error other than log the issue and continue with any other shutdown that is needed. AMQMinaProtocolSession & AMQPFastProtocolHandler . Additions to catch and log AMQExceptions. Changes to AMQMinaProtocolSession were done to ignore all input on a closing session other than the close-ok. Previously only Protocol frames were ignored this resulted in Content*Body-s still being processed. Additional checks were made for the MessageStoreClosedException to log and continue. As said else were we need to seperate protocol exceptoions(AMQException) from internal code exception handling. Further All AMQExceptions occuring in the frameReceived method are now caught and logged. Allowing them to propogate higher will only result in thread death. AMQPFastProtocolHandler Caught AMQExceptions occuring whilst closing the session. Again allowing these to continue will result in thread death. There is not a lot that can be done other than log the problem as the session is already closed by this point. Prevented the stacktrace associated with a session exception being printed in the exceptionCaught method when the problem was an IO Exception. This doesn't add anything useful and only adds to the log file sizes. ApplicationRegistry - Added removeAll option which ensures that all ARs are correctly purged so that we can attempt to clean up between Unit Tests. MemoryMessageStore - This was causing us real problems during the failover testing. Similar checks should probably be made to any other Message Store Impl. The issue was that when shutting down the broker the MS.close() method is called this sets all the storage to null. However, there may still be message processing going on as the close() does not attempt to stop connection processing. Hence we now check to see if the Store is close throwing a MSClosedException if required. This prevents NPEs that have been seen during Unit failover testing. In fact the close() is called as a request to shutdown the ApplicationRegistry, but this only occurs from tests and broker shutdown, no attempt to unbind or prevent further connections during this period is yet done. CLIENT CHANGES AMQConnection - Added method to check if failover is in progress. AMQClient - Upgraded acknowledge() exception to JMSException for errors due to failover. Also , added call to update consumers as a result of failover. BasicMessageConsumer - Changes to acquireReceiving to take in to consideration blocking for failover to occur. wrt receiveNoWait.. which previously blocked for failover to complete... not exactly noWait. acknowledge will now TransportConnection - Update to ensure all inVM brokers are correctly killed. FailoverTest - QPID-679 - Finder of all the above problems. ........ r598835 | ritchiem | 2007-11-28 01:01:05 +0000 (Wed, 28 Nov 2007) | 1 line CommitRollbackTest - this one just was never right.. now we have something better. ........ r599375 | ritchiem | 2007-11-29 10:58:08 +0000 (Thu, 29 Nov 2007) | 1 line Update to broker to address fanout python failure. ........ r599531 | ritchiem | 2007-11-29 17:56:12 +0000 (Thu, 29 Nov 2007) | 1 line QPID-92 QPID-564 Forgot to upgrade mina to 1.0.1 ........ r599533 | ritchiem | 2007-11-29 18:25:21 +0000 (Thu, 29 Nov 2007) | 1 line QPID-564 QPID-92 Tidied up a few points and fixed infinite loop in Read IO Thread ........ r599572 | ritchiem | 2007-11-29 20:56:22 +0000 (Thu, 29 Nov 2007) | 2 lines Mina Fix: Vm Pipe Starts Connection session before acceptor session. This results in protocol frames arriving before the protocol decoder has been configured on the InVM Broker. Verification of this could be done by adding a client side filter that delays the first message by a few seconds. ........ r599805 | ritchiem | 2007-11-30 12:47:08 +0000 (Fri, 30 Nov 2007) | 1 line Added new simple Request/Repsonse code as my last commit here seems to have missed the actual code. ........ r602134 | rupertlssmith | 2007-12-07 16:00:14 +0000 (Fri, 07 Dec 2007) | 1 line Added JDNI config for two broker, failover setup for failover tests. Also passed into FT tests config. ........ r604151 | ritchiem | 2007-12-14 10:40:37 +0000 (Fri, 14 Dec 2007) | 2 lines QPID-707 : Added new test to check message count on broker as messages are consumed to ensure that an ack is sent at 5000 mgs. Added acks on message consumer closure. Augmented VMTestCase to have helper methods for accessing broker statistics. ........ r604928 | rupertlssmith | 2007-12-17 17:00:10 +0000 (Mon, 17 Dec 2007) | 1 line DUPS_OK mode set to be same as AUTO_ACK, fixed broken dups ok test. ........ r605536 | rupertlssmith | 2007-12-19 13:40:05 +0000 (Wed, 19 Dec 2007) | 1 line Messages were being sent mandatory by default, set to false. ........ r605542 | rupertlssmith | 2007-12-19 13:53:44 +0000 (Wed, 19 Dec 2007) | 1 line Changed test configs to use colons instead of commas. ........ r606015 | rgodfrey | 2007-12-20 20:08:01 +0000 (Thu, 20 Dec 2007) | 2 lines QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers Ack each individual message on commit, not use multiple acks ........ r606016 | rgodfrey | 2007-12-20 20:12:25 +0000 (Thu, 20 Dec 2007) | 2 lines QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers Ack each individual message on commit, not use multiple acks ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@614906 13f79535-47bb-0310-9956-ffa450edef68 --- java/broker/etc/config.xml | 1 + java/broker/pom.xml | 2 +- java/broker/python-test.xml | 4 +- .../java/org/apache/qpid/server/AMQChannel.java | 11 +- .../server/exchange/DefaultExchangeFactory.java | 12 +- .../server/protocol/AMQPFastProtocolHandler.java | 11 +- .../qpid/server/registry/ApplicationRegistry.java | 44 ++-- .../qpid/server/store/MemoryMessageStore.java | 42 ++-- .../server/store/MessageStoreClosedException.java | 36 +++ .../apache/qpid/example/simple/reqresp/Client.java | 3 +- .../java/org/apache/qpid/client/AMQConnection.java | 4 + .../java/org/apache/qpid/client/AMQSession.java | 61 +++-- .../apache/qpid/client/BasicMessageConsumer.java | 115 ++++++--- .../protocol/BlockingMethodFrameListener.java | 14 -- .../listener/SpecificMethodFrameListener.java | 13 - .../qpid/client/transport/TransportConnection.java | 23 +- .../transport/VmPipeTransportConnection.java | 7 +- .../client/SpecificMethodFrameListenerTest.java | 73 ------ .../test/unit/transacted/CommitRollbackTest.java | 36 +-- .../filter/codec/OurCumulativeProtocolDecoder.java | 197 +++++++++++++++ .../socket/nio/MultiThreadSocketConnector.java | 3 +- .../socket/nio/MultiThreadSocketIoProcessor.java | 276 ++++++++++----------- .../mina/transport/vmpipe/QpidVmPipeConnector.java | 151 +++++++++++ java/perftests/pom.xml | 195 ++------------- java/pom.xml | 7 +- .../main/java/org/apache/qpid/test/VMTestCase.java | 21 +- .../org/apache/qpid/test/client/DupsOkTest.java | 140 +++++++++++ .../qpid/test/client/failover/FailoverTest.java | 222 +++++++++++++++++ .../apache/qpid/test/unit/ack/AcknowledgeTest.java | 151 +++++++++++ 29 files changed, 1287 insertions(+), 588 deletions(-) create mode 100644 java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java delete mode 100644 java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java create mode 100644 java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java create mode 100644 java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java create mode 100644 java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java create mode 100644 java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java create mode 100644 java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java (limited to 'java') diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index 737c8d22c4..b3b6a2877f 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -175,3 +175,4 @@ + diff --git a/java/broker/pom.xml b/java/broker/pom.xml index e2d7e6b9a3..b27ab657b7 100644 --- a/java/broker/pom.xml +++ b/java/broker/pom.xml @@ -248,7 +248,7 @@ + value="python run-tests -v -I java_failing.txt -b localhost:2110"/> diff --git a/java/broker/python-test.xml b/java/broker/python-test.xml index e58ae399b9..5c263e3169 100755 --- a/java/broker/python-test.xml +++ b/java/broker/python-test.xml @@ -43,9 +43,9 @@ under the License. > - + - + diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 9fb3a5040b..4696ec4453 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -226,7 +226,7 @@ public class AMQChannel BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties; //fixme: fudge for QPID-677 properties.getHeaders().keySet(); - + properties.setUserId(protocolSession.getAuthorizedID().getName()); } @@ -378,7 +378,14 @@ public class AMQChannel { _txnContext.rollback(); unsubscribeAllConsumers(session); - requeue(); + try + { + requeue(); + } + catch (AMQException e) + { + _log.error("Caught AMQException whilst attempting to reque:" + e); + } setClosing(true); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 8ede553464..1a9dc6673a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -53,7 +53,7 @@ public class DefaultExchangeFactory implements ExchangeFactory { _exchangeClassMap.put(type.getName(), type); } - + public Collection> getRegisteredTypes() { return _exchangeClassMap.values(); @@ -75,6 +75,12 @@ public class DefaultExchangeFactory implements ExchangeFactory public void initialise(Configuration hostConfig) { + + if (hostConfig == null) + { + return; + } + for(Object className : hostConfig.getList("custom-exchanges.class-name")) { try diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index fa9d83cd7e..543e043bed 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -165,7 +165,14 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter //fixme -- this can be null if (amqProtocolSession != null) { - amqProtocolSession.closeSession(); + try + { + amqProtocolSession.closeSession(); + } + catch (AMQException e) + { + _logger.error("Caught AMQException whilst closingSession:" + e); + } } } @@ -199,7 +206,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } else if (throwable instanceof IOException) { - _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable); + _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 22fa0fab23..455983c6d8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,15 +20,14 @@ */ package org.apache.qpid.server.registry; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.HashMap; +import java.util.Map; + /** * An abstract application registry that provides access to configuration information and handles the * construction and caching of configurable objects. @@ -59,24 +58,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public void run() { _logger.info("Shutting down application registries..."); - try - { - synchronized (ApplicationRegistry.class) - { - Iterator keyIterator = _instanceMap.values().iterator(); - - while (keyIterator.hasNext()) - { - IApplicationRegistry instance = keyIterator.next(); - - instance.close(); - } - } - } - catch (Exception e) - { - _logger.error("Error shutting down message store: " + e, e); - } + removeAll(); } } @@ -116,6 +98,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } catch (Exception e) { + _logger.error("Error shutting down message store: " + e, e); } finally @@ -124,6 +107,14 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + public static void removeAll() + { + Object[] keys = _instanceMap.keySet().toArray(); + for (Object k : keys) + { + remove((Integer) k); + } + } protected ApplicationRegistry(Configuration configuration) { @@ -154,7 +145,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry catch (Exception e) { _logger.error("Error configuring application: " + e, e); - //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID); + //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID); throw new RuntimeException("Unable to create Application Registry", e); } } @@ -167,7 +158,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public void close() throws Exception { - for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) + for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) { virtualHost.close(); } @@ -204,7 +195,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return instance; } - public static void setDefaultApplicationRegistry(String clazz) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 8ccb0be0a8..7a6e0b011f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,27 +20,26 @@ */ package org.apache.qpid.server.store; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.Exchange; -/** - * A simple message store that stores the messages in a threadsafe structure in memory. - */ +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** A simple message store that stores the messages in a threadsafe structure in memory. */ public class MemoryMessageStore implements MessageStore { private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); @@ -54,6 +53,7 @@ public class MemoryMessageStore implements MessageStore protected ConcurrentMap> _contentBodyMap; private final AtomicLong _messageId = new AtomicLong(1); + private AtomicBoolean _closed = new AtomicBoolean(false); public void configure() { @@ -77,6 +77,7 @@ public class MemoryMessageStore implements MessageStore public void close() throws Exception { + _closed.getAndSet(true); if (_metaDataMap != null) { _metaDataMap.clear(); @@ -89,8 +90,9 @@ public class MemoryMessageStore implements MessageStore } } - public void removeMessage(StoreContext context, Long messageId) + public void removeMessage(StoreContext context, Long messageId) throws AMQException { + checkNotClosed(); if (_log.isDebugEnabled()) { _log.debug("Removing message with id " + messageId); @@ -172,9 +174,10 @@ public class MemoryMessageStore implements MessageStore public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException { + checkNotClosed(); List bodyList = _contentBodyMap.get(messageId); - if(bodyList == null && lastContentBody) + if (bodyList == null && lastContentBody) { _contentBodyMap.put(messageId, Collections.singletonList(contentBody)); } @@ -193,17 +196,28 @@ public class MemoryMessageStore implements MessageStore public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException { + checkNotClosed(); _metaDataMap.put(messageId, messageMetaData); } - public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException { + checkNotClosed(); return _metaDataMap.get(messageId); } public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { + checkNotClosed(); List bodyList = _contentBodyMap.get(messageId); return bodyList.get(index); } + + private void checkNotClosed() throws MessageStoreClosedException + { + if (_closed.get()) + { + throw new MessageStoreClosedException(); + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java new file mode 100644 index 0000000000..3d1538c7eb --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java @@ -0,0 +1,36 @@ +package org.apache.qpid.server.store; + +import org.apache.qpid.AMQException;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * NOTE: this class currently extends AMQException but + * we should be using AMQExceptions internally in the code base for Protocol errors hence + * the message store interface should throw a different super class which this should be + * moved to reflect + */ +public class MessageStoreClosedException extends AMQException +{ + public MessageStoreClosedException() + { + super("Message store closed"); + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java index 02386e84eb..b6badff24d 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java @@ -3,7 +3,7 @@ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at @@ -252,4 +252,3 @@ public class Client implements MessageListener new Client(); } } - diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 38325a1e41..39b3b80e74 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1303,4 +1303,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion); } + public boolean isFailingOver() + { + return (_protocolHandler.getFailoverLatch() != null); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 15c113a05d..42f07f97f9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -83,6 +83,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -219,6 +220,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private Map _consumers = new ConcurrentHashMap(); + /** + * Contains a list of consumers which have been removed but which might still have + * messages to acknowledge, eg in client ack or transacted modes + */ + private CopyOnWriteArrayList _removedConsumers = new CopyOnWriteArrayList(); + /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ private ConcurrentHashMap _destinationConsumerCount = new ConcurrentHashMap(); @@ -387,7 +394,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @throws IllegalStateException If the session is closed. */ - public void acknowledge() throws IllegalStateException + public void acknowledge() throws JMSException { if (isClosed()) { @@ -611,20 +618,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator i = _consumers.values().iterator(); i.hasNext();) { -// i.next().acknowledgeLastDelivered(); -// } - - // get next acknowledgement to server - Long next = i.next().getLastDelivered(); - if (next != null && next > lastTag) - { - lastTag = next; - } + i.next().acknowledgeDelivered(); } - if (lastTag != -1) + if (_transacted) { - acknowledgeMessage(lastTag, true); + // Do the above, but for consumers which have been de-registered since the + // last commit + for (int i = 0; i < _removedConsumers.size(); i++) + { + // Sends acknowledgement to server + _removedConsumers.get(i).acknowledgeDelivered(); + _removedConsumers.remove(i); + } } // Commits outstanding messages sent and outstanding acknowledgements. @@ -760,7 +766,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false); + return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false); } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, @@ -776,7 +782,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false); + return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false); } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, @@ -1676,6 +1682,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _destinationConsumerCount.remove(dest); } } + + // Consumers that are closed in a transaction must be stored + // so that messages they have received can be acknowledged on commit + if (_transacted) + { + _removedConsumers.add(consumer); + } } } @@ -2445,6 +2458,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator it = consumers.iterator(); it.hasNext();) { BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); + consumer.failedOver(); registerConsumer(consumer, true); } } @@ -2543,17 +2557,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _messageDeliveryLock; } - /** - * Signifies that the session has pending sends to commit. - */ + /** Signifies that the session has pending sends to commit. */ public void markDirty() { _dirty = true; } - /** - * Signifies that the session has no pending sends to commit. - */ + /** Signifies that the session has no pending sends to commit. */ public void markClean() { _dirty = false; @@ -2562,6 +2572,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Check to see if failover has occured since the last call to markClean(commit or rollback). + * * @return boolean true if failover has occured. */ public boolean hasFailedOver() @@ -2571,6 +2582,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Check to see if any message have been sent in this transaction and have not been commited. + * * @return boolean true if a message has been sent but not commited */ public boolean isDirty() @@ -2624,7 +2636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // Reject messages on pre-receive queue - consumer.rollback(); + consumer.rollbackPendingMessages(); // Reject messages on pre-dispatch queue rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); @@ -2668,6 +2680,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } + for (int i = 0; i < _removedConsumers.size(); i++) + { + // Sends acknowledgement to server + _removedConsumers.get(i).rollback(); + _removedConsumers.remove(i); + } + setConnectionStopped(isStopped); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index ae31f5ebdd..610e0109b1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -33,14 +33,12 @@ import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; - import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -255,6 +253,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { + case Session.DUPS_OK_ACKNOWLEDGE: + _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag()); + _receivedDeliveryTags.add(msg.getDeliveryTag()); + break; case Session.CLIENT_ACKNOWLEDGE: _unacknowledgedDeliveryTags.add(msg.getDeliveryTag()); @@ -277,8 +279,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _session.setInRecovery(false); } - private void acquireReceiving() throws JMSException + /** + * @param immediate if true then return immediately if the connection is failing over + * + * @return boolean if the acquisition was successful + * + * @throws JMSException + * @throws InterruptedException + */ + private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException { + if (_connection.isFailingOver()) + { + if (immediate) + { + return false; + } + else + { + _connection.blockUntilNotFailingOver(); + } + } + if (!_receiving.compareAndSet(false, true)) { throw new javax.jms.IllegalStateException("Another thread is already receiving."); @@ -290,6 +312,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } _receivingThread = Thread.currentThread(); + return true; } private void releaseReceiving() @@ -343,7 +366,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer checkPreConditions(); - acquireReceiving(); + try + { + acquireReceiving(false); + } + catch (InterruptedException e) + { + _logger.warn("Interrupted: " + e); + if (isClosed()) + { + return null; + } + } _session.startDistpatcherIfNecessary(); @@ -424,7 +458,25 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { checkPreConditions(); - acquireReceiving(); + try + { + if (!acquireReceiving(true)) + { + //If we couldn't acquire the receiving thread then return null. + // This will occur if failing over. + return null; + } + } + catch (InterruptedException e) + { + /* + * This seems slightly shoddy but should never actually be executed + * since we told acquireReceiving to return immediately and it shouldn't + * block on anything. + */ + + return null; + } _session.startDistpatcherIfNecessary(); @@ -721,12 +773,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer break; case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetchHigh) + /*( if (++_outstanding >= _prefetchHigh) { _dups_ok_acknowledge_send = true; } - if (_outstanding <= _prefetchLow) + //Can't use <= as _prefetchHigh may equal _prefetchLow so no acking would occur. + if (_outstanding < _prefetchLow) { _dups_ok_acknowledge_send = false; } @@ -736,11 +789,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (!_session.isInRecovery()) { _session.acknowledgeMessage(msg.getDeliveryTag(), true); + _outstanding = 0; } } break; - + */ case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() if (!_session.isInRecovery()) @@ -777,20 +831,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } /** Acknowledge up to last message delivered (if any). Used when commiting. */ - void acknowledgeLastDelivered() + void acknowledgeDelivered() { - if (!_receivedDeliveryTags.isEmpty()) + while (!_receivedDeliveryTags.isEmpty()) { - long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - _session.acknowledgeMessage(lastDeliveryTag, true); + _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false); } } @@ -866,11 +911,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public void acknowledge() // throws JMSException + public void acknowledge() throws JMSException { - if (!isClosed()) + if (isClosed()) + { + throw new IllegalStateException("Consumer is closed"); + } + else if (_session.hasFailedOver()) + { + throw new JMSException("has failed over"); + } + else { - Iterator tags = _unacknowledgedDeliveryTags.iterator(); while (tags.hasNext()) { @@ -878,10 +930,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer tags.remove(); } } - else - { - throw new IllegalStateException("Consumer is closed"); - } } /** Called on recovery to reset the list of delivery tags */ @@ -951,7 +999,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - // rollback pending messages + rollbackPendingMessages(); + } + + public void rollbackPendingMessages() + { if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) @@ -1016,4 +1068,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _synchronousQueue.clear(); } + + /** to be called when a failover has occured */ + public void failedOver() + { + clearReceiveQueue(); + clearUnackedMessages(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 1badbb601c..2b63475d71 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -294,18 +294,4 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener } } - public boolean equals(Object o) - { - - if (o instanceof BlockingMethodFrameListener) - { - BlockingMethodFrameListener other = (BlockingMethodFrameListener) o; - - return _channelId == other._channelId; - } - - return false; - } - - } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java index 4a4f4a0a38..c66603b7a0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java @@ -41,17 +41,4 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener return _expectedClass.isInstance(frame); } - public boolean equals(Object o) - { - if (o instanceof SpecificMethodFrameListener) - { - SpecificMethodFrameListener other = (SpecificMethodFrameListener) o; - - // here we need to check if the two classes are the same. - return (_channelId == other._channelId) && (_expectedClass.equals(other._expectedClass)); - } - - return false; - } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 3257caa796..e8a220f5e9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; /** @@ -101,27 +100,21 @@ public class TransportConnection _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") ? "Qpid NIO is new default" : "Sysproperty 'qpidnio' is set")); - - result = new MultiThreadSocketConnector(); } else { _logger.info("Using Mina NIO"); - result = new SocketConnector(); // non-blocking connector } - // Don't have the connector's worker thread wait around for other connections (we only use // one SocketConnector per connection at the moment anyway). This allows short-running // clients (like unit tests) to complete quickly. result.setWorkerTimeout(0); - return result; } }); break; - case VM: { _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); @@ -280,8 +273,7 @@ public class TransportConnection } AMQVMBrokerCreationException amqbce = - new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); - amqbce.initCause(e); + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e); throw amqbce; } @@ -294,14 +286,11 @@ public class TransportConnection _acceptor.unbindAll(); synchronized (_inVmPipeAddress) { - Iterator keys = _inVmPipeAddress.keySet().iterator(); - - while (keys.hasNext()) - { - int id = (Integer) keys.next(); - _inVmPipeAddress.remove(id); - } - } + _inVmPipeAddress.clear(); + } + _acceptor = null; + _currentInstance = -1; + _currentVMPort = -1; } public static void killVMBroker(int port) diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index 25a9e26285..dca6efba67 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -22,15 +22,12 @@ package org.apache.qpid.client.transport; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; - import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.PoolingFilter; -import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.pool.ReadWriteThreadModel; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +46,7 @@ public class VmPipeTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { - final VmPipeConnector ioConnector = new VmPipeConnector(); + final VmPipeConnector ioConnector = new QpidVmPipeConnector(); final IoServiceConfig cfg = ioConnector.getDefaultConfig(); cfg.setThreadModel(ReadWriteThreadModel.getInstance()); diff --git a/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java deleted file mode 100644 index 69684a81ea..0000000000 --- a/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.apache.qpid.framing; - -import junit.framework.TestCase; -import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; - -import org.apache.mina.common.ByteBuffer; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ - -public class SpecificMethodFrameListenerTest extends TestCase -{ - - SpecificMethodFrameListener close1a = new SpecificMethodFrameListener(1, ChannelCloseOkBody.class); - SpecificMethodFrameListener close1b = new SpecificMethodFrameListener(1, ChannelCloseOkBody.class); - SpecificMethodFrameListener close2 = new SpecificMethodFrameListener(2, ChannelCloseOkBody.class); - SpecificMethodFrameListener open1a = new SpecificMethodFrameListener(1, ChannelOpenOkBody.class); - SpecificMethodFrameListener open1b = new SpecificMethodFrameListener(1, ChannelOpenOkBody.class); - - public void testEquals() - { - //Check that the the same objects are equal - assertEquals("ChannelCloseOKBody a should equal a", close1a, close1a); - assertEquals("ChannelOpenOkBody a should equal a", open1a, open1a); - - //check that the same values in differnt objects are equal - assertEquals("ChannelCloseOKBody b should equal a", close1b, close1a); - assertEquals("ChannelCloseOKBody a should equal b", close1a, close1b); - assertEquals("ChannelOpenOkBody a should equal b", open1a, open1b); - assertEquals("ChannelOpenOkBody a should equal b", open1a, open1b); - - //Chec that different values fail - //Different channels - assertFalse("ChannelCloseOKBody channel 1 should NOT equal channel 2", close1a.equals(close2)); - assertFalse("ChannelCloseOKBody channel 1 should NOT equal channel 2", close2.equals(close1a)); - - //Different Bodies - assertFalse("ChannelCloseOKBody should not equal ChannelOpenOkBody", close1a.equals(open1a)); - assertFalse("ChannelOpenOkBody should not equal ChannelCloseOKBody", open1a.equals(close1a)); - } - - public void testProcessMethod() throws AMQFrameDecodingException - { - ChannelCloseOkBody ccob = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9).createChannelCloseOkBody(); - ChannelOpenOkBody coob = ((MethodRegistry_0_9)(MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9))).createChannelOpenOkBody(new byte[0]); - - assertTrue("This SpecificMethodFrameListener should process a ChannelCloseOkBody", close1a.processMethod(1, ccob)); - assertFalse("This SpecificMethodFrameListener should NOT process a ChannelOpenOkBody", close1a.processMethod(1, coob)); - - - - - } -} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 224463a446..e45312448c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -418,14 +418,14 @@ public class CommitRollbackTest extends TestCase { _logger.info("Got 2 redelivered, message was prefetched"); _gottwoRedelivered = true; - + } else { - _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured"); + _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured"); assertFalse("Already received message two", _gottwo); assertFalse("Already received message redelivered two", _gottwoRedelivered); - + _gottwo = true; } } @@ -437,7 +437,7 @@ public class CommitRollbackTest extends TestCase * This test sends two messages receives on of them but doesn't ack it. * The consumer is then closed * the first message should be returned as redelivered. - * the second message should be delivered normally. + * the second message should be delivered normally. * @throws Exception */ public void testSend2ThenCloseAfter1andTryAgain() throws Exception @@ -458,6 +458,7 @@ public class CommitRollbackTest extends TestCase assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); _logger.info("Closing Consumer"); + _consumer.close(); _logger.info("Creating New consumer"); @@ -465,33 +466,20 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); -// NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected. -// Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet. - result = _consumer.receive(5000); - assertNotNull("test message was consumed and rolled back, but is gone", result); -// The first message back will be either 1 or 2 being redelivered - if (result.getJMSRedelivered()) - { - assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); - } - else // or it will be msg 2 arriving the first time due to latency. - { - _logger.info("Message 2 wasn't prefetched so wasn't rejected"); - assertEquals("2", ((TextMessage) result).getText()); - } + // Message 2 may be marked as redelivered if it was prefetched. + result = _consumer.receive(5000); + assertNotNull("Second message was not consumed, but is gone", result); - Message result2 = _consumer.receive(5000); - assertNotNull("test message was consumed and rolled back, but is gone", result2); + // The first message back will be 2, message 1 has been received but not committed + // Closing the consumer does not commit the session. // if this is message 1 then it should be marked as redelivered - if("1".equals(((TextMessage) result2).getText())) + if("1".equals(((TextMessage) result).getText())) { - assertTrue("Messasge is not marked as redelivered" + result2, result2.getJMSRedelivered()); + fail("First message was recieved again"); } - assertNotSame("Messages should not have the same content",((TextMessage) result2).getText(), ((TextMessage) result).getText() ); - result = _consumer.receive(1000); assertNull("test message should be null:" + result, result); diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java new file mode 100644 index 0000000000..810d12f472 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.codec; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; + +/** + * A {@link ProtocolDecoder} that cumulates the content of received + * buffers to a cumulative buffer to help users implement decoders. + *

+ * If the received {@link ByteBuffer} is only a part of a message. + * decoders should cumulate received buffers to make a message complete or + * to postpone decoding until more buffers arrive. + *

+ * Here is an example decoder that decodes CRLF terminated lines into + * Command objects: + *

+ * public class CRLFTerminatedCommandLineDecoder
+ *         extends CumulativeProtocolDecoder {
+ *
+ *     private Command parseCommand(ByteBuffer in) {
+ *         // Convert the bytes in the specified buffer to a
+ *         // Command object.
+ *         ...
+ *     }
+ *
+ *     protected boolean doDecode(IoSession session, ByteBuffer in,
+ *                                ProtocolDecoderOutput out)
+ *             throws Exception {
+ *
+ *         // Remember the initial position.
+ *         int start = in.position();
+ *
+ *         // Now find the first CRLF in the buffer.
+ *         byte previous = 0;
+ *         while (in.hasRemaining()) {
+ *             byte current = in.get();
+ *
+ *             if (previous == '\r' && current == '\n') {
+ *                 // Remember the current position and limit.
+ *                 int position = in.position();
+ *                 int limit = in.limit();
+ *                 try {
+ *                     in.position(start);
+ *                     in.limit(position);
+ *                     // The bytes between in.position() and in.limit()
+ *                     // now contain a full CRLF terminated line.
+ *                     out.write(parseCommand(in.slice()));
+ *                 } finally {
+ *                     // Set the position to point right after the
+ *                     // detected line and set the limit to the old
+ *                     // one.
+ *                     in.position(position);
+ *                     in.limit(limit);
+ *                 }
+ *                 // Decoded one line; CumulativeProtocolDecoder will
+ *                 // call me again until I return false. So just
+ *                 // return true until there are no more lines in the
+ *                 // buffer.
+ *                 return true;
+ *             }
+ *
+ *             previous = current;
+ *         }
+ *
+ *         // Could not find CRLF in the buffer. Reset the initial
+ *         // position to the one we recorded above.
+ *         in.position(start);
+ *
+ *         return false;
+ *     }
+ * }
+ * 
+ * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 598285 $, $Date: 2007-11-26 14:16:01 +0000 (Mon, 26 Nov 2007) $ + */ +public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter { + + private static final String BUFFER = OurCumulativeProtocolDecoder.class + .getName() + + ".Buffer"; + + /** + * Creates a new instance. + */ + protected OurCumulativeProtocolDecoder() { + } + + /** + * Cumulates content of in into internal buffer and forwards + * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. + * doDecode() is invoked repeatedly until it returns false + * and the cumulative buffer is NOT compacted after decoding ends. + * + * @throws IllegalStateException if your doDecode() returned + * true not consuming the cumulative buffer. + */ + public void decode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception { + boolean usingSessionBuffer = true; + ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER); + // If we have a session buffer, append data to that; otherwise + // use the buffer read from the network directly. + if (buf != null) { + buf.put(in); + buf.flip(); + } else { + buf = in; + usingSessionBuffer = false; + } + + for (;;) { + int oldPos = buf.position(); + boolean decoded = doDecode(session, buf, out); + if (decoded) { + if (buf.position() == oldPos) { + throw new IllegalStateException( + "doDecode() can't return true when buffer is not consumed."); + } + + if (!buf.hasRemaining()) { + break; + } + } else { + break; + } + } + + + // if there is any data left that cannot be decoded, we store + // it in a buffer in the session and next time this decoder is + // invoked the session buffer gets appended to + if (buf.hasRemaining()) { + storeRemainingInSession(buf, session); + } else { + if (usingSessionBuffer) + removeSessionBuffer(session); + } + } + + /** + * Implement this method to consume the specified cumulative buffer and + * decode its content into message(s). + * + * @param in the cumulative buffer + * @return true if and only if there's more to decode in the buffer + * and you want to have doDecode method invoked again. + * Return false if remaining data is not enough to decode, + * then this method will be invoked again when more data is cumulated. + * @throws Exception if cannot decode in. + */ + protected abstract boolean doDecode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception; + + /** + * Releases the cumulative buffer used by the specified session. + * Please don't forget to call super.dispose( session ) when + * you override this method. + */ + public void dispose(IoSession session) throws Exception { + removeSessionBuffer(session); + } + + private void removeSessionBuffer(IoSession session) { + ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER); + if (buf != null) { + buf.release(); + } + } + + private void storeRemainingInSession(ByteBuffer buf, IoSession session) { + ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity()); + remainingBuf.setAutoExpand(true); + remainingBuf.order(buf.order()); + remainingBuf.put(buf); + session.setAttribute(BUFFER, remainingBuf); + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java index 202ac1a530..cb24102edd 100644 --- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java @@ -376,8 +376,7 @@ public class MultiThreadSocketConnector extends SocketConnector // Set the ConnectFuture of the specified session, which will be // removed and notified by AbstractIoFilterChain eventually. -// session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); - session.setAttribute(AbstractIoFilterChain.class.getName() + ".connectFuture", connectFuture); + session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); // Forward the remaining process to the SocketIoProcessor. session.getIoProcessor().addNew(session); diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java index 11c54bb248..03838ca3f1 100644 --- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java @@ -66,9 +66,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private ReentrantLock trafficMaskUpdateLock = new ReentrantLock(); - /** - * @noinspection FieldAccessedSynchronizedAndUnsynchronized - */ + /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ private volatile Selector selector, writeSelector; private final Queue newSessions = new Queue(); @@ -90,11 +88,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor this.executor = executor; } - void addNew( SocketSessionImpl session ) throws IOException + void addNew(SocketSessionImpl session) throws IOException { - synchronized( newSessions ) + synchronized (newSessions) { - newSessions.push( session ); + newSessions.push(session); } startupWorker(); @@ -103,16 +101,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor writeSelector.wakeup(); } - void remove( SocketSessionImpl session ) throws IOException + void remove(SocketSessionImpl session) throws IOException { - scheduleRemove( session ); + scheduleRemove(session); startupWorker(); selector.wakeup(); } private void startupWorker() throws IOException { - synchronized(readLock) + synchronized (readLock) { if (readWorker == null) { @@ -122,7 +120,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } - synchronized(writeLock) + synchronized (writeLock) { if (writeWorker == null) { @@ -134,38 +132,38 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } - void flush( SocketSessionImpl session ) + void flush(SocketSessionImpl session) { - scheduleFlush( session ); + scheduleFlush(session); Selector selector = this.writeSelector; - if( selector != null ) + if (selector != null) { selector.wakeup(); } } - void updateTrafficMask( SocketSessionImpl session ) + void updateTrafficMask(SocketSessionImpl session) { - scheduleTrafficControl( session ); + scheduleTrafficControl(session); Selector selector = this.selector; - if( selector != null ) + if (selector != null) { selector.wakeup(); } } - private void scheduleRemove( SocketSessionImpl session ) + private void scheduleRemove(SocketSessionImpl session) { - synchronized( removingSessions ) + synchronized (removingSessions) { - removingSessions.push( session ); + removingSessions.push(session); } } - private void scheduleFlush( SocketSessionImpl session ) + private void scheduleFlush(SocketSessionImpl session) { - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { //if flushingSessions grows to contain Integer.MAX_VALUE sessions // then this will fail. @@ -176,31 +174,31 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } - private void scheduleTrafficControl( SocketSessionImpl session ) + private void scheduleTrafficControl(SocketSessionImpl session) { - synchronized( trafficControllingSessions ) + synchronized (trafficControllingSessions) { - trafficControllingSessions.push( session ); + trafficControllingSessions.push(session); } } private void doAddNewReader() throws InterruptedException { - if( newSessions.isEmpty() ) + if (newSessions.isEmpty()) { return; } - for( ; ; ) + for (; ;) { MultiThreadSocketSessionImpl session; - synchronized( newSessions ) + synchronized (newSessions) { session = (MultiThreadSocketSessionImpl) newSessions.peek(); } - if( session == null ) + if (session == null) { break; } @@ -211,21 +209,20 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { - ch.configureBlocking( false ); - session.setSelectionKey( ch.register( selector, - SelectionKey.OP_READ, - session ) ); - + ch.configureBlocking(false); + session.setSelectionKey(ch.register(selector, + SelectionKey.OP_READ, + session)); //System.out.println("ReadDebug:"+"Awaiting Registration"); session.awaitRegistration(); sessionCreated(session); } - catch( IOException e ) + catch (IOException e) { // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } } } @@ -242,7 +239,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { MultiThreadSocketSessionImpl session; - synchronized(newSessions) + synchronized (newSessions) { session = (MultiThreadSocketSessionImpl) newSessions.peek(); } @@ -257,7 +254,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { ch.configureBlocking(false); - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { flushingSessionsSet.add(session); } @@ -275,17 +272,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } } } - private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException { MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; - synchronized(newSessions) + synchronized (newSessions) { if (!session.created()) { @@ -294,7 +290,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). - session.getServiceListeners().fireSessionCreated( session ); + session.getServiceListeners().fireSessionCreated(session); session.doneCreation(); } @@ -303,21 +299,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void doRemove() { - if( removingSessions.isEmpty() ) + if (removingSessions.isEmpty()) { return; } - for( ; ; ) + for (; ;) { MultiThreadSocketSessionImpl session; - synchronized( removingSessions ) + synchronized (removingSessions) { session = (MultiThreadSocketSessionImpl) removingSessions.pop(); } - if( session == null ) + if (session == null) { break; } @@ -330,7 +326,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // (In case that Session.close() is called before addSession() is processed) if (key == null || writeKey == null) { - scheduleRemove( session ); + scheduleRemove(session); break; } // skip if channel is already closed @@ -342,24 +338,24 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session)); - synchronized(readLock) + synchronized (readLock) { key.cancel(); } - synchronized(writeLock) + synchronized (writeLock) { writeKey.cancel(); } ch.close(); } - catch( IOException e ) + catch (IOException e) { - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } finally { - releaseWriteBuffers( session ); - session.getServiceListeners().fireSessionDestroyed( session ); + releaseWriteBuffers(session); + session.getServiceListeners().fireSessionDestroyed(session); } } } @@ -368,16 +364,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { Iterator it = selectedKeys.iterator(); - while( it.hasNext() ) + while (it.hasNext()) { - SelectionKey key = ( SelectionKey ) it.next(); + SelectionKey key = (SelectionKey) it.next(); MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment(); - synchronized(readLock) + synchronized (readLock) { if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable()) { - read( session ); + read(session); } } @@ -395,7 +391,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor SelectionKey key = (SelectionKey) it.next(); SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - synchronized(writeLock) + synchronized (writeLock) { if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable()) { @@ -403,7 +399,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // Clear OP_WRITE key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { flushingSessions.offer(session); } @@ -424,7 +420,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor int totalReadBytes = 0; - for (; totalReadBytes <= MAX_READ_BYTES_PER_SESSION;) + while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION) { ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); SocketChannel ch = session.getChannel(); @@ -482,6 +478,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor scheduleRemove(session); } session.getFilterChain().fireExceptionCaught(session, e); + + //Stop Reading this session. + return; } finally { @@ -507,12 +506,12 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { lastIdleReadCheckTime = currentTime; Set keys = selector.keys(); - if( keys != null ) + if (keys != null) { - for( Iterator it = keys.iterator(); it.hasNext(); ) + for (Iterator it = keys.iterator(); it.hasNext();) { - SelectionKey key = ( SelectionKey ) it.next(); - SocketSessionImpl session = ( SocketSessionImpl ) key.attachment(); + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); notifyReadIdleness(session, currentTime); } } @@ -542,15 +541,15 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void notifyReadIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ), - IdleStatus.BOTH_IDLE, - Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) ); + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), + IdleStatus.BOTH_IDLE, + Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.READER_IDLE ), - IdleStatus.READER_IDLE, - Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) ); + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.READER_IDLE), + IdleStatus.READER_IDLE, + Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); notifyWriteTimeout(session, currentTime, session .getWriteTimeoutInMillis(), session.getLastWriteTime()); @@ -559,51 +558,51 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void notifyWriteIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0( - session, currentTime, + session, currentTime, session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0( session, currentTime, - session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ), - IdleStatus.WRITER_IDLE, - Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) ); + session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), + IdleStatus.WRITER_IDLE, + Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); - notifyWriteTimeout( session, currentTime, session - .getWriteTimeoutInMillis(), session.getLastWriteTime() ); + notifyWriteTimeout(session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime()); } - private void notifyIdleness0( SocketSessionImpl session, long currentTime, - long idleTime, IdleStatus status, - long lastIoTime ) + private void notifyIdleness0(SocketSessionImpl session, long currentTime, + long idleTime, IdleStatus status, + long lastIoTime) { - if( idleTime > 0 && lastIoTime != 0 - && ( currentTime - lastIoTime ) >= idleTime ) + if (idleTime > 0 && lastIoTime != 0 + && (currentTime - lastIoTime) >= idleTime) { - session.increaseIdleCount( status ); - session.getFilterChain().fireSessionIdle( session, status ); + session.increaseIdleCount(status); + session.getFilterChain().fireSessionIdle(session, status); } } - private void notifyWriteTimeout( SocketSessionImpl session, - long currentTime, - long writeTimeout, long lastIoTime ) + private void notifyWriteTimeout(SocketSessionImpl session, + long currentTime, + long writeTimeout, long lastIoTime) { MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session; SelectionKey key = sesh.getWriteSelectionKey(); - synchronized(writeLock) - { - if( writeTimeout > 0 - && ( currentTime - lastIoTime ) >= writeTimeout - && key != null && key.isValid() - && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 ) + synchronized (writeLock) { - session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException() ); + if (writeTimeout > 0 + && (currentTime - lastIoTime) >= writeTimeout + && key != null && key.isValid() + && (key.interestOps() & SelectionKey.OP_WRITE) != 0) + { + session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException()); + } } } - } private SocketSessionImpl getNextFlushingSession() { @@ -612,9 +611,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void releaseSession(SocketSessionImpl session) { - synchronized(session.getWriteRequestQueue()) + synchronized (session.getWriteRequestQueue()) { - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { if (session.getScheduledWriteRequests() > 0) { @@ -642,7 +641,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor WriteRequest req; //Should this be synchronized? - synchronized(writeRequestQueue) + synchronized (writeRequestQueue) { while ((req = (WriteRequest) writeRequestQueue.pop()) != null) { @@ -668,9 +667,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null) { - if( !session.isConnected() ) + if (!session.isConnected()) { - releaseWriteBuffers( session ); + releaseWriteBuffers(session); releaseSession(session); continue; } @@ -678,14 +677,14 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor SelectionKey key = session.getWriteSelectionKey(); // Retry later if session is not yet fully initialized. // (In case that Session.write() is called before addSession() is processed) - if( key == null ) + if (key == null) { - scheduleFlush( session ); + scheduleFlush(session); releaseSession(session); continue; } // skip if channel is already closed - if( !key.isValid() ) + if (!key.isValid()) { releaseSession(session); continue; @@ -698,11 +697,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor releaseSession(session); } } - catch( IOException e ) + catch (IOException e) { releaseSession(session); - scheduleRemove( session ); - session.getFilterChain().fireExceptionCaught( session, e ); + scheduleRemove(session); + session.getFilterChain().fireExceptionCaught(session, e); } } @@ -714,32 +713,32 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; // Clear OP_WRITE SelectionKey key = session.getWriteSelectionKey(); - synchronized(writeLock) + synchronized (writeLock) { - key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) ); + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } SocketChannel ch = session.getChannel(); Queue writeRequestQueue = session.getWriteRequestQueue(); long totalFlushedBytes = 0; - for( ; ; ) + while (true) { WriteRequest req; - synchronized( writeRequestQueue ) + synchronized (writeRequestQueue) { - req = ( WriteRequest ) writeRequestQueue.first(); + req = (WriteRequest) writeRequestQueue.first(); } - if( req == null ) + if (req == null) { break; } - ByteBuffer buf = ( ByteBuffer ) req.getMessage(); - if( buf.remaining() == 0 ) + ByteBuffer buf = (ByteBuffer) req.getMessage(); + if (buf.remaining() == 0) { - synchronized( writeRequestQueue ) + synchronized (writeRequestQueue) { writeRequestQueue.pop(); } @@ -747,7 +746,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor session.increaseWrittenMessages(); buf.reset(); - session.getFilterChain().fireMessageSent( session, req ); + session.getFilterChain().fireMessageSent(session, req); continue; } @@ -755,23 +754,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor int writtenBytes = 0; // Reported as DIRMINA-362 - //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. -// if (key.isWritable()) + //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. + if (key.isWritable()) { - try - { - writtenBytes = ch.write(buf.buf()); - totalFlushedBytes += writtenBytes; - } - catch (IOException ioe) - { - throw ioe; - } + writtenBytes = ch.write(buf.buf()); + totalFlushedBytes += writtenBytes; } - if( writtenBytes > 0 ) + if (writtenBytes > 0) { - session.increaseWrittenBytes( writtenBytes ); + session.increaseWrittenBytes(writtenBytes); } if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) @@ -911,7 +903,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor if (writeSelector.keys().isEmpty()) { - synchronized(writeLock) + synchronized (writeLock) { if (writeSelector.keys().isEmpty() && newSessions.isEmpty()) @@ -963,7 +955,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader"); //System.out.println("ReadDebug:"+"Startup"); - for( ; ; ) + for (; ;) { try { @@ -972,7 +964,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor doAddNewReader(); doUpdateTrafficMask(); - if( nKeys > 0 ) + if (nKeys > 0) { //System.out.println("ReadDebug:"+nKeys + " keys from selector"); @@ -987,21 +979,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor doRemove(); notifyReadIdleness(); - if( selector.keys().isEmpty() ) + if (selector.keys().isEmpty()) { - synchronized(readLock) + synchronized (readLock) { - if( selector.keys().isEmpty() && newSessions.isEmpty() ) + if (selector.keys().isEmpty() && newSessions.isEmpty()) { readWorker = null; try { selector.close(); } - catch( IOException e ) + catch (IOException e) { - ExceptionMonitor.getInstance().exceptionCaught( e ); + ExceptionMonitor.getInstance().exceptionCaught(e); } finally { @@ -1013,17 +1005,17 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } } - catch( Throwable t ) + catch (Throwable t) { - ExceptionMonitor.getInstance().exceptionCaught( t ); + ExceptionMonitor.getInstance().exceptionCaught(t); try { - Thread.sleep( 1000 ); + Thread.sleep(1000); } - catch( InterruptedException e1 ) + catch (InterruptedException e1) { - ExceptionMonitor.getInstance().exceptionCaught( e1 ); + ExceptionMonitor.getInstance().exceptionCaught(e1); } } } diff --git a/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java new file mode 100644 index 0000000000..16e74b17d2 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.vmpipe; + +import java.io.IOException; +import java.net.SocketAddress; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.common.support.BaseIoConnector; +import org.apache.mina.common.support.BaseIoConnectorConfig; +import org.apache.mina.common.support.BaseIoSessionConfig; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.transport.vmpipe.support.VmPipe; +import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker; +import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl; +import org.apache.mina.util.AnonymousSocketAddress; + +/** + * Connects to {@link IoHandler}s which is bound on the specified + * {@link VmPipeAddress}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class QpidVmPipeConnector extends VmPipeConnector +{ + private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {}; + private final IoServiceConfig defaultConfig = new BaseIoConnectorConfig() + { + public IoSessionConfig getSessionConfig() + { + return CONFIG; + } + }; + + /** + * Creates a new instance. + */ + public QpidVmPipeConnector() + { + } + + public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config ) + { + return connect( address, null, handler, config ); + } + + public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config ) + { + if( address == null ) + throw new NullPointerException( "address" ); + if( handler == null ) + throw new NullPointerException( "handler" ); + if( ! ( address instanceof VmPipeAddress ) ) + throw new IllegalArgumentException( + "address must be VmPipeAddress." ); + + if( config == null ) + { + config = getDefaultConfig(); + } + + VmPipe entry = ( VmPipe ) VmPipeAcceptor.boundHandlers.get( address ); + if( entry == null ) + { + return DefaultConnectFuture.newFailedFuture( + new IOException( "Endpoint unavailable: " + address ) ); + } + + DefaultConnectFuture future = new DefaultConnectFuture(); + VmPipeSessionImpl localSession = + new VmPipeSessionImpl( + this, + config, + getListeners(), + new Object(), // lock + new AnonymousSocketAddress(), + handler, + entry ); + + // initialize acceptor session + VmPipeSessionImpl remoteSession = localSession.getRemoteSession(); + try + { + IoFilterChain filterChain = remoteSession.getFilterChain(); + entry.getAcceptor().getFilterChainBuilder().buildFilterChain( filterChain ); + entry.getConfig().getFilterChainBuilder().buildFilterChain( filterChain ); + entry.getConfig().getThreadModel().buildFilterChain( filterChain ); + + // The following sentences don't throw any exceptions. + entry.getListeners().fireSessionCreated( remoteSession ); + VmPipeIdleStatusChecker.getInstance().addSession( remoteSession ); + } + catch( Throwable t ) + { + ExceptionMonitor.getInstance().exceptionCaught( t ); + remoteSession.close(); + } + + + // initialize connector session + try + { + IoFilterChain filterChain = localSession.getFilterChain(); + this.getFilterChainBuilder().buildFilterChain( filterChain ); + config.getFilterChainBuilder().buildFilterChain( filterChain ); + config.getThreadModel().buildFilterChain( filterChain ); + + // The following sentences don't throw any exceptions. + localSession.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future ); + getListeners().fireSessionCreated( localSession ); + VmPipeIdleStatusChecker.getInstance().addSession( localSession); + } + catch( Throwable t ) + { + future.setException( t ); + } + + + + return future; + } + + public IoServiceConfig getDefaultConfig() + { + return defaultConfig; + } +} \ No newline at end of file diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 911d488f94..69deaa383e 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -243,10 +243,10 @@ - -n TQCT-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 - -n TQCT-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 - -n TQCL-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 - -n TQCL-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 + -n TQCT-Qpid-01 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 + -n TQCT-Qpid-02 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 + -n TQCL-Qpid-01 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 + -n TQCL-Qpid-02 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 @@ -269,10 +269,10 @@ -n TQM-Qpid-02-1M -d10M -s[100] -c[8] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=1048576 destinationCount=1 rate=0 maxPending=100000000 - -n TTCT-Qpid-01 -d1M -s[10] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 - -n TTCT-Qpid-02 -d1M -s[10] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 - -n TTCL-Qpid-01 -d1M -s[10] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 - -n TTCL-Qpid-02 -d1M -s[10] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 + -n TTCT-Qpid-01 -d1M -s[10] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 + -n TTCT-Qpid-02 -d1M -s[10] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 + -n TTCL-Qpid-01 -d1M -s[10] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 + -n TTCL-Qpid-02 -d1M -s[10] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 @@ -295,10 +295,10 @@ -n TTM-Qpid-02-1M -d10M -s[4] -c[8] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=1048476 destinationCount=1 rate=0 maxPending=20000000 - -n PQCT-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=2000000 - -n PQCT-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=2000000 - -n PQCL-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=2000000 - -n PQCL-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=2000000 + -n PQCT-Qpid-01 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=2000000 + -n PQCT-Qpid-02 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=2000000 + -n PQCL-Qpid-01 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=2000000 + -n PQCL-Qpid-02 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=2000000 @@ -321,10 +321,10 @@ -n PQM-Qpid-02-1M -d10M -s[4] -c[8] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=1048576 destinationCount=1 rate=0 maxPending=20000000 - -n PTCT-Qpid-01 -d1M -s[1] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 - -n PTCT-Qpid-02 -d1M -s[1] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 - -n PTCL-Qpid-01 -d1M -s[1] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 - -n PTCL-Qpid-02 -d1M -s[1] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 + -n PTCT-Qpid-01 -d1M -s[1] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 + -n PTCT-Qpid-02 -d1M -s[1] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 + -n PTCL-Qpid-01 -d1M -s[1] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 + -n PTCL-Qpid-02 -d1M -s[1] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 @@ -453,169 +453,6 @@ -n TTBL-NA-Qpid-06 -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=50 transacted=false consTransacted=false consAckMode=257 commitBatchSize=1 batchSize=1001 messageSize=5120 destinationCount=1 rate=900 maxPending=2000000 -n TTBL-NA-Qpid-06 -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=50 transacted=false consTransacted=false consAckMode=257 commitBatchSize=1 batchSize=1001 messageSize=5120 destinationCount=1 rate=1000 maxPending=2000000 - - - - - - - -n FT-Qpid-01 -s[2500] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true properties=failovertest.properties failBeforeSend=true -o $QPID_WORK/results -n FT-Qpid-02 -s[2500] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true properties=failovertest.properties failAfterSend=true -o $QPID_WORK/results diff --git a/java/pom.xml b/java/pom.xml index c8093965e0..becea34848 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -163,7 +163,6 @@ under the License. management/eclipse-plugin client/example client-java14 - @@ -517,17 +516,17 @@ under the License. org.apache.mina mina-core - 1.0.0 + 1.0.1 org.apache.mina mina-filter-ssl - 1.0.0 + 1.0.1 org.apache.mina mina-java5 - 1.0.0 + 1.0.1 backport-util-concurrent diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java index 9629f87d46..624d9c9f3d 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java @@ -20,27 +20,16 @@ */ package org.apache.qpid.test; -import junit.extensions.TestSetup; - -import junit.framework.Test; import junit.framework.TestCase; - import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.server.registry.ApplicationRegistry; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; import javax.naming.Context; import javax.naming.spi.InitialContextFactory; - import java.util.HashMap; import java.util.Hashtable; -import java.util.LinkedList; -import java.util.List; import java.util.Map; public class VMTestCase extends TestCase @@ -89,7 +78,7 @@ public class VMTestCase extends TestCase } env.put("connectionfactory.connection", "amqp://guest:guest@" + _clientID + _virtualhost + "?brokerlist='" - + _brokerlist + "'"); + + _brokerlist + "'"); for (Map.Entry c : _connections.entrySet()) { @@ -121,6 +110,12 @@ public class VMTestCase extends TestCase super.tearDown(); } + public int getMessageCount(String queueName) + { + return ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(_virtualhost.substring(1)) + .getQueueRegistry().getQueue(new AMQShortString(queueName)).getMessageCount(); + } + public void testDummyinVMTestCase() { // keep maven happy diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java new file mode 100644 index 0000000000..037c8285bc --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java @@ -0,0 +1,140 @@ +package org.apache.qpid.test.client; + +import org.apache.qpid.test.VMTestCase; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.NamingException; +import java.util.concurrent.CountDownLatch;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +public class DupsOkTest extends VMTestCase +{ + + private Queue _queue; + private static final int MSG_COUNT = 9999; + private CountDownLatch _awaitCompletion = new CountDownLatch(1); + + public void setUp() throws Exception + { + super.setUp(); + + _queue = (Queue) _context.lookup("queue"); + + //CreateQueue + ((ConnectionFactory) _context.lookup("connection")).createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close(); + + //Create Producer put some messages on the queue + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(_queue); + + for (int count = 1; count <= MSG_COUNT; count++) + { + Message msg = producerSession.createTextMessage("Message " + count); + msg.setIntProperty("count", count); + producer.send(msg); + } + + producerConnection.close(); + } + + public void testDupsOK() throws NamingException, JMSException, InterruptedException + { + //Create Client + Connection clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + clientConnection.start(); + + Session clientSession = clientConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + + MessageConsumer consumer = clientSession.createConsumer(_queue); + + consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + if (message == null) + { + fail("Should not get null messages"); + } + + if (message instanceof TextMessage) + { + try + { + /*if (message.getIntProperty("count") == 5000) + { + assertEquals("The queue should have 4999 msgs left", 4999, getMessageCount(_queue.getQueueName())); + }*/ + + if (message.getIntProperty("count") == 9999) + { + assertEquals("The queue should have 0 msgs left", 0, getMessageCount(_queue.getQueueName())); + + //This is the last message so release test. + _awaitCompletion.countDown(); + } + + } + catch (JMSException e) + { + fail("Unable to get int property 'count'"); + } + } + else + { + fail(""); + } + } + }); + + try + { + _awaitCompletion.await(); + } + catch (InterruptedException e) + { + fail("Unable to wait for test completion"); + throw e; + } + +// consumer.close(); + + clientConnection.close(); + + assertEquals("The queue should have 0 msgs left", 0, getMessageCount(_queue.getQueueName())); + } + + +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java new file mode 100644 index 0000000000..fffe073362 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java @@ -0,0 +1,222 @@ +package org.apache.qpid.test.client.failover; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.log4j.Logger; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; + +public class FailoverTest extends TestCase implements ConnectionListener +{ + private static final Logger _logger = Logger.getLogger(FailoverTest.class); + + private static final int NUM_BROKERS = 2; + private static final String BROKER = "amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'"; + private static final String QUEUE = "queue"; + private static final int NUM_MESSAGES = 10; + private Connection con; + private AMQConnectionFactory conFactory; + private Session prodSess; + private AMQQueue q; + private MessageProducer prod; + private Session conSess; + private MessageConsumer consumer; + + private static int usedBrokers = 0; + private CountDownLatch failoverComplete; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + // Create two VM brokers + + for (int i = 0; i < NUM_BROKERS; i++) + { + usedBrokers++; + + TransportConnection.createVMBroker(usedBrokers); + } + //undo last addition + + conFactory = new AMQConnectionFactory(String.format(BROKER, usedBrokers - 1, usedBrokers)); + _logger.info("Connecting on:" + conFactory.getConnectionURL()); + con = conFactory.createConnection(); + ((AMQConnection) con).setConnectionListener(this); + con.start(); + failoverComplete = new CountDownLatch(1); + } + + private void init(boolean transacted, int mode) throws JMSException + { + prodSess = con.createSession(transacted, mode); + q = new AMQQueue("amq.direct", QUEUE); + prod = prodSess.createProducer(q); + conSess = con.createSession(transacted, mode); + consumer = conSess.createConsumer(q); + } + + @Override + protected void tearDown() throws Exception + { + try + { + con.close(); + } + catch (Exception e) + { + + } + + try + { + TransportConnection.killAllVMBrokers(); + ApplicationRegistry.removeAll(); + } + catch (Exception e) + { + fail("Unable to clean up"); + } + super.tearDown(); + } + + private void consumeMessages(int toConsume) throws JMSException + { + Message msg; + for (int i = 0; i < toConsume; i++) + { + msg = consumer.receive(1000); + assertNotNull("Message " + i + " was null!", msg); + assertEquals("message " + i, ((TextMessage) msg).getText()); + } + } + + private void sendMessages(int totalMessages) throws JMSException + { + for (int i = 0; i < totalMessages; i++) + { + prod.send(prodSess.createTextMessage("message " + i)); + } + +// try +// { +// Thread.sleep(100 * totalMessages); +// } +// catch (InterruptedException e) +// { +// //evil ignoring of IE +// } + } + + public void testP2PFailover() throws Exception + { + testP2PFailover(NUM_MESSAGES, true); + } + + public void testP2PFailoverWithMessagesLeft() throws Exception + { + testP2PFailover(NUM_MESSAGES, false); + } + + private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException + { + Message msg = null; + init(false, Session.AUTO_ACKNOWLEDGE); + sendMessages(totalMessages); + + // Consume some messages + int toConsume = totalMessages; + if (!consumeAll) + { + toConsume = totalMessages / 2; + } + + consumeMessages(toConsume); + + _logger.info("Failing over"); + + causeFailure(); + + msg = consumer.receive(500); + //todo: reinstate + assertNull("Should not have received message from new broker!", msg); + // Check that messages still sent / received + sendMessages(totalMessages); + consumeMessages(totalMessages); + } + + private void causeFailure() + { + _logger.info("Failover"); + + TransportConnection.killVMBroker(usedBrokers - 1); + ApplicationRegistry.remove(usedBrokers - 1); + + _logger.info("Awaiting Failover completion"); + try + { + failoverComplete.await(); + } + catch (InterruptedException e) + { + //evil ignore IE. + } + } + + public void testClientAckFailover() throws Exception + { + init(false, Session.CLIENT_ACKNOWLEDGE); + sendMessages(1); + Message msg = consumer.receive(); + assertNotNull("Expected msgs not received", msg); + + + causeFailure(); + + Exception failure = null; + try + { + msg.acknowledge(); + } + catch (Exception e) + { + failure = e; + } + assertNotNull("Exception should be thrown", failure); + } + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + failoverComplete.countDown(); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java new file mode 100644 index 0000000000..f83e6e51cb --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java @@ -0,0 +1,151 @@ +package org.apache.qpid.test.unit.ack; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.test.VMTestCase; + +public class AcknowledgeTest extends VMTestCase +{ + private static final int NUM_MESSAGES = 50; + private Connection _con; + private Queue _queue; + private MessageProducer _producer; + private Session _producerSession; + private Session _consumerSession; + private MessageConsumer _consumerA; + private MessageConsumer _consumerB; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _queue = (Queue) _context.lookup("queue"); + + //CreateQueue + ((ConnectionFactory) _context.lookup("connection")).createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close(); + + //Create Producer put some messages on the queue + _con = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + _con.start(); + } + + private void init(boolean transacted, int mode) throws JMSException { + _producerSession = _con.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerSession = _con.createSession(transacted, mode); + _producer = _producerSession.createProducer(_queue); + _consumerA = _consumerSession.createConsumer(_queue); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + try + { + TransportConnection.killAllVMBrokers(); + ApplicationRegistry.removeAll(); + } + catch (Exception e) + { + fail("Unable to clean up"); + } + + } + + private void consumeMessages(int toConsume, MessageConsumer consumer) throws JMSException + { + Message msg; + for (int i = 0; i < toConsume; i++) + { + msg = consumer.receive(1000); + assertNotNull("Message " + i + " was null!", msg); + assertEquals("message " + i, ((TextMessage) msg).getText()); + } + } + + private void sendMessages(int totalMessages) throws JMSException + { + for (int i = 0; i < totalMessages; i++) + { + _producer.send(_producerSession.createTextMessage("message " + i)); + } + } + + private void testMessageAck(boolean transacted, int mode) throws Exception + { + init(transacted, mode); + sendMessages(NUM_MESSAGES/2); + Thread.sleep(1500); + _consumerB = _consumerSession.createConsumer(_queue); + sendMessages(NUM_MESSAGES/2); + int count = 0; + Message msg = _consumerB.receive(100); + while (msg != null) + { + if (mode == Session.CLIENT_ACKNOWLEDGE) + { + msg.acknowledge(); + } + count++; + msg = _consumerB.receive(1500); + } + if (transacted) + { + _consumerSession.commit(); + } + _consumerA.close(); + _consumerB.close(); + _consumerSession.close(); + assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count, getMessageCount(_queue.getQueueName())); + } + + public void test2ConsumersAutoAck() throws Exception + { + testMessageAck(false, Session.AUTO_ACKNOWLEDGE); + } + + public void test2ConsumersClientAck() throws Exception + { + testMessageAck(true, Session.CLIENT_ACKNOWLEDGE); + } + + public void test2ConsumersTx() throws Exception + { + testMessageAck(true, Session.AUTO_ACKNOWLEDGE); + } + +} -- cgit v1.2.1