summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-02 15:28:08 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-02 15:28:08 +0000
commit7e9021835265bf6afaeb203b90e8af678593e6b3 (patch)
tree0880854c41e7290f6c58718c66601dd6ec449c60 /qpid/java/client/src
parenta09a5215aecde0d2080db436fc657e84145fa661 (diff)
downloadqpid-python-7e9021835265bf6afaeb203b90e8af678593e6b3.tar.gz
Handpatched branch
Revision: 502576 Author: ritchiem Date: 11:13:13, 02 February 2007 Message: QPID-343 Performance test suite doesn't output missing message count on failure. Updated PingAsyncTestPerf to output missing messsage count. Updated PingPongProducer so it doesn't use AMQShortStringx. ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Revision: 502271 Author: ritchiem Date: 16:36:54, 01 February 2007 Message: QPID-341 When using Queues and Topics defined via JNDI settings are not preserved. Removed extraction of destination/queue name and used BindingURL directly to create Destination. ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java Revision: 502268 Author: ritchiem Date: 16:32:56, 01 February 2007 Message: Increased logging on a failure to attain state ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Revision: 502261 Author: ritchiem Date: 16:25:57, 01 February 2007 Message: QPID-339 DispatcherTest.java was broker now it actually tests correctly. Added test to Check changing message listeners ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java Revision: 502253 Author: ritchiem Date: 16:01:14, 01 February 2007 Message: QPID-339 Java client hangs when starting up (intermittently) Patched the problem where the dispatcher would hang. The previous logic was flawed. Patch worked on by Robert Godfrey and Martin Ritchie. Added test to ensure that the connection is not automatically started. (Only added the test last time by mistake. This is the actual fix) With a test for the DispatcherTest ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java Revision: 502249 Author: ritchiem Date: 15:50:52, 01 February 2007 Message: QPID-330 Clients occasionally fail to notice connect The AMQConnection.java constructor now deals with the full connection process. The failover thread should not be started. This allows the connection method to be simplified and not Thread.sleep waiting for the connection. ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Revision: 502248 Author: ritchiem Date: 15:47:17, 01 February 2007 Message: QPID-339 Java client hangs when starting up (intermittently) Patched the problem where the dispatcher would hang. The previous logic was flawed. Patch worked on by Robert Godfrey and Martin Ritchie. Added test to ensure that the connection is not automatically started. ---- Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java Revision: 502182 Author: rgreig Date: 10:18:36, 01 February 2007 Message: (Submitted by Rupert Smith) Added comments as a reminder of improvements to be made to the tests. ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Revision: 502179 Author: rgreig Date: 10:13:21, 01 February 2007 Message: (Submitted by Rupert Smith) Added comments as a reminder of improvements to be made to the tests. ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Revision: 502172 Author: ritchiem Date: 09:37:39, 01 February 2007 Message: QPID-333 Committed test class rename to stop it being picked up by Surefire AMQTopic.java - whitespace ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java(Copy from path: /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java, Revision, 501823 Deleted : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java Revision: 501920 Author: ritchiem Date: 17:43:45, 31 January 2007 Message: Unused so removing ---- Deleted : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent Revision: 501917 Author: ritchiem Date: 17:31:04, 31 January 2007 Message: QPID-333 Message Properties on non Qpid Messages are not preserved Updated MessageConverter to have a constructor that takes a Message type. Updated MessageConverterTest to use the new NonQpidMessage to test it out. JMSHeaderAdapter.java - whitespace changes and comment noting that null return is required. ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java Revision: 501804 Author: rgreig Date: 11:29:33, 31 January 2007 Message: (Patch submitted by Rupert Smith) Added a ping latency test. Uploaded new junit-toolkit snapshot for self timed tests. ---- Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar.md5 Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar.sha1 Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom.md5 Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.sha1 Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 Modified : /incubator/qpid/trunk/qpid/java/perftests/pom.xml Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Added : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java Revision: 501465 Author: rgreig Date: 16:53:41, 30 January 2007 Message: (Submitted by Rupert Smith) Updated the README.txt to give a fuller explanation for the creation of the temporary local maven repository. ---- Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/README.txt Revision: 501457 Author: rgreig Date: 16:42:37, 30 January 2007 Message: (Submitted by Rupert Smith) Added PingClient.java which was forgotten from last commit. ---- Added : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java Revision: 501455 Author: rgreig Date: 16:40:20, 30 January 2007 Message: (Submitted by Rupert Smith) Ping tests refactored. Unused ping test classes removed. JUnit-toolkit 0.5-SNAPSHOT added to the build. ---- Added : /incubator/qpid/trunk/qpid/java/mvn-repo Added : /incubator/qpid/trunk/qpid/java/mvn-repo/README.txt Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.jar.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070130.111852-1.pom.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.sha1 Deleted : /incubator/qpid/trunk/qpid/java/perftests/bin Modified : /incubator/qpid/trunk/qpid/java/perftests/pom.xml Deleted : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java Deleted : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java Deleted : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java Deleted : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java Deleted : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java Deleted : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingPublisher.java Deleted : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java Deleted : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Deleted : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java Deleted : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Deleted : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java Modified : /incubator/qpid/trunk/qpid/java/pom.xml Revision: 501096 Author: rgreig Date: 16:37:13, 29 January 2007 Message: QPID-327 : Patch supplied by Rob Godfrey - [race condition] PoolingFilter : Possible race condition when completing a Job ---- Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Modified : /incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java Modified : /incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java Modified : /incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java Modified : /incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java Modified : /incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Modified : /incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Modified : /incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Modified : /incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Modified : /incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java Modified : /incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Modified : /incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Modified : /incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java Revision: 501011 Author: rgreig Date: 11:13:23, 29 January 2007 Message: QPID-313 : Patch supplied by Rob Godfrey - Call to attainState in makeBrokerConnection can miss the notification of state change. ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Revision: 501010 Author: rgreig Date: 11:11:29, 29 January 2007 Message: QPID-322 : Patch supplied by Rob Godfrey - Message reference count not being incremented when message added to UnacknowledgeMap ---- Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java Revision: 501004 Author: rgreig Date: 11:02:57, 29 January 2007 Message: QPID-320 : Patch supplied by Rob Godfrey - Simplify logic to deal with setting MessageListener only after connection start has been called ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Revision: 500284 Author: ritchiem Date: 17:02:58, 26 January 2007 Message: Updated script details and added guard for trafficlight being null. ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/pom.xml Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/perftests.log4j Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@502624 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java40
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java409
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java105
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java27
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java63
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java258
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java (renamed from qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java)4
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java173
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java5
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java92
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java410
17 files changed, 1212 insertions, 413 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 7903ef4d10..3379b092ae 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -209,12 +209,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Exception lastException = new Exception();
lastException.initCause(new ConnectException());
- while (lastException != null && checkException(lastException) && _failoverPolicy.failoverAllowed())
+ while (!_connected && _failoverPolicy.failoverAllowed())
{
try
{
makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
lastException = null;
+ _connected = true;
}
catch (Exception e)
{
@@ -226,35 +227,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.debug("Are we connected:" + _connected);
- // Then the Failover Thread will handle conneciton
- if (_failoverPolicy.failoverAllowed())
- {
- //TODO this needs to be redone so that we are not spinning.
- // A suitable object should be set that is then waited on
- // and only notified when a connection is made or when
- // the AMQConnection gets closed.
- while (!_connected && !_closed.get())
- {
- try
- {
- _logger.debug("Sleeping.");
- Thread.sleep(100);
- }
- catch (InterruptedException ie)
- {
- _logger.debug("Woken up.");
- }
- }
- if (!_failoverPolicy.failoverAllowed() || _failoverPolicy.getCurrentBrokerDetails() == null)
- {
- if (_lastAMQException != null)
- {
- throw _lastAMQException;
- }
- }
- }
- else
+ if (!_connected)
{
+
String message = null;
if (lastException != null)
@@ -275,7 +250,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
AMQException e = new AMQConnectionFailureException(message);
-
+
if (lastException != null)
{
if (lastException instanceof UnresolvedAddressException)
@@ -792,6 +767,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _protocolHandler;
}
+
+ public boolean started()
+ {
+ return _started;
+ }
public void bytesSent(long writtenBytes)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
index b2d2d2bec3..e9791d7126 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
@@ -334,7 +334,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
if (addr != null)
{
- return new AMQQueue(new AMQBindingURL((String) addr.getContent()).getQueueName());
+ return new AMQQueue(new AMQBindingURL((String) addr.getContent()));
}
}
@@ -344,7 +344,7 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
if (addr != null)
{
- return new AMQTopic(new AMQBindingURL((String) addr.getContent()).getDestinationName());
+ return new AMQTopic(new AMQBindingURL((String) addr.getContent()));
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 394f356b77..ff7bf4fd26 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -70,6 +70,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
+ private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
+
/**
* Used to reference durable subscribers so they requests for unsubscribe can be handled
* correctly. Note this only keeps a record of subscriptions which have been created
@@ -92,8 +94,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final FlowControllingBlockingQueue _queue;
- private final java.util.Queue<MessageConsumerPair> _reprocessQueue;
-
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
@@ -136,37 +136,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private long _nextProducerId;
/**
- * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
- */
- private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
-
- /**
- * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
- */
- private final AtomicBoolean _pausing = new AtomicBoolean(false);
-
- /**
- * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
- */
- private final AtomicBoolean _paused = new AtomicBoolean(false);
-
- /**
* Set when recover is called. This is to handle the case where recover() is called by application code
* during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
*/
private boolean _inRecovery;
- public void doDispatcherTask(DispatcherCallback dispatcherCallback)
- {
- synchronized (this)
- {
- _dispatcher.pause();
+ private boolean _connectionStopped;
- dispatcherCallback.whilePaused(_reprocessQueue);
-
- _dispatcher.reprocess();
- }
- }
+ private boolean _hasMessageListeners;
/**
@@ -174,7 +151,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private class Dispatcher extends Thread
{
- private final Logger _logger = Logger.getLogger(Dispatcher.class);
+ /**
+ * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
+ */
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private final Object _lock = new Object();
public Dispatcher()
{
@@ -183,105 +165,54 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void run()
{
- _stopped.set(false);
- while (!_stopped.get())
+ UnprocessedMessage message;
+
+ try
{
- if (_pausing.get())
+ while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null)
{
- try
+ synchronized (_lock)
{
- //Wait for unpausing
- synchronized (_pausing)
+
+ while (connectionStopped())
{
- synchronized (_paused)
- {
- _paused.notify();
- }
-
- _logger.info("dispatcher paused");
-
- _pausing.wait();
- _logger.info("dispatcher notified");
+ _lock.wait();
}
- }
- catch (InterruptedException e)
- {
- //do nothing... occurs when a pause request occurs will already
- // be here if another pause event is pending
- _logger.info("dispacher interrupted");
- }
-
- doReDispatch();
+ dispatchMessage(message);
- }
- else
- {
- doNormalDispatch();
- }
- }
+ while (connectionStopped())
+ {
+ _lock.wait();
+ }
- _logger.info("Dispatcher thread terminating for channel " + _channelId);
- }
+ }
- private void doNormalDispatch()
- {
- UnprocessedMessage message;
- try
- {
- while (!_stopped.get() && !_pausing.get() && (message = (UnprocessedMessage) _queue.take()) != null)
- {
- dispatchMessage(message);
}
}
catch (InterruptedException e)
{
- _logger.info("dispatcher normal dispatch interrupted");
+ ;
}
-
+ _logger.info("Dispatcher thread terminating for channel " + _channelId);
}
- private void doReDispatch()
+ // only call while holding lock
+ final boolean connectionStopped()
{
- _logger.info("doRedispatching");
-
- MessageConsumerPair messageConsumerPair;
-
- if (_reprocessQueue != null)
- {
- _logger.info("Reprocess Queue has size:" + _reprocessQueue.size());
- while (!_stopped.get() && ((messageConsumerPair = _reprocessQueue.poll()) != null))
- {
- reDispatchMessage(messageConsumerPair);
- }
- }
-
- if (_reprocessQueue == null || _reprocessQueue.isEmpty())
- {
- _logger.info("Reprocess Queue emptied");
- _pausing.set(false);
- }
- else
- {
- _logger.info("Reprocess Queue still contains contains:" + _reprocessQueue.size());
- }
-
+ return _connectionStopped;
}
- private void reDispatchMessage(MessageConsumerPair consumerPair)
+ void setConnectionStopped(boolean connectionStopped)
{
- if (consumerPair.getItem() instanceof AbstractJMSMessage)
+ synchronized (_lock)
{
- _logger.info("do renotify:" + consumerPair.getItem());
- consumerPair.getConsumer().notifyMessage((AbstractJMSMessage) consumerPair.getItem(), _channelId);
+ _connectionStopped = connectionStopped;
+ _lock.notify();
}
-
- // BasicMessageConsumer.notifyError(Throwable cause)
- // will put the cause in to the list which could come out here... need to watch this.
}
-
private void dispatchMessage(UnprocessedMessage message)
{
if (message.deliverBody != null)
@@ -337,40 +268,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- public void stopDispatcher()
+ public void close()
{
- _stopped.set(true);
- interrupt();
- }
-
- public void pause()
- {
- _logger.info("pausing");
- _pausing.set(true);
-
-
+ _closed.set(true);
interrupt();
- synchronized (_paused)
- {
- try
- {
- _paused.wait();
- }
- catch (InterruptedException e)
- {
- //do nothing
- }
- }
- }
+ //fixme awaitTermination
- public void reprocess()
- {
- synchronized (_pausing)
- {
- _logger.info("reprocessing");
- _pausing.notify();
- }
}
}
@@ -404,8 +308,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_defaultPrefetchHighMark = defaultPrefetchHighMark;
_defaultPrefetchLowMark = defaultPrefetchLowMark;
- _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>();
-
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
@@ -458,7 +360,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public BytesMessage createBytesMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -474,7 +376,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MapMessage createMapMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -490,7 +392,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public javax.jms.Message createMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -506,7 +408,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -522,7 +424,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -540,7 +442,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public StreamMessage createStreamMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
@@ -557,7 +459,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TextMessage createTextMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
@@ -574,7 +476,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TextMessage createTextMessage(String text) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -619,7 +521,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class);
+ _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxCommitOkBody.class);
}
catch (AMQException e)
{
@@ -638,7 +540,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class);
+ TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class);
}
catch (AMQException e)
{
@@ -650,7 +552,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
@@ -661,15 +563,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
_connection.getProtocolHandler().closeSession(this);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- "JMS client closing channel"); // replyText
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "JMS client closing channel"); // replyText
_connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -722,7 +624,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void closed(Throwable e)
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
@@ -803,7 +705,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_dispatcher != null)
{
- _dispatcher.stopDispatcher();
+ _dispatcher.close();
+ _dispatcher = null;
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
@@ -829,7 +732,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_dispatcher != null)
{
- _dispatcher.stopDispatcher();
+ _dispatcher.close();
+ _dispatcher = null;
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
@@ -863,8 +767,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- false)); // requeue
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ false)); // requeue
}
boolean isInRecovery()
@@ -1035,8 +939,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
public MessageConsumer createBrowserConsumer(Destination destination,
- String messageSelector,
- boolean noLocal)
+ String messageSelector,
+ boolean noLocal)
throws JMSException
{
checkValidDestination(destination);
@@ -1149,7 +1053,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
throw ex;
}
- synchronized(destination)
+ synchronized (destination)
{
_destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
_destinationConsumerCount.get(destination).incrementAndGet();
@@ -1200,16 +1104,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- false, // nowait
- false, // passive
- 0, // ticket
- type); // type
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ false, // nowait
+ false, // passive
+ 0, // ticket
+ type); // type
_connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
}
@@ -1224,16 +1128,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- true, // nowait
- false, // passive
- 0, // ticket
- type); // type
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ 0, // ticket
+ type); // type
protocolHandler.writeFrame(exchangeDeclare);
}
@@ -1259,15 +1163,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- null, // arguments
- amqd.isAutoDelete(), // autoDelete
- amqd.isDurable(), // durable
- amqd.isExclusive(), // exclusive
- true, // nowait
- false, // passive
- amqd.getQueueName(), // queue
- 0); // ticket
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ null, // arguments
+ amqd.isAutoDelete(), // autoDelete
+ amqd.isDurable(), // durable
+ amqd.isExclusive(), // exclusive
+ true, // nowait
+ false, // passive
+ amqd.getQueueName(), // queue
+ 0); // ticket
protocolHandler.writeFrame(queueDeclare);
return amqd.getQueueName();
@@ -1279,13 +1183,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- ft, // arguments
- amqd.getExchangeName(), // exchange
- true, // nowait
- queueName, // queue
- amqd.getRoutingKey(), // routingKey
- 0); // ticket
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ ft, // arguments
+ amqd.getExchangeName(), // exchange
+ true, // nowait
+ queueName, // queue
+ amqd.getRoutingKey(), // routingKey
+ 0); // ticket
protocolHandler.writeFrame(queueBind);
}
@@ -1308,11 +1212,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
}
- if(consumer.isAutoClose())
+ if (consumer.isAutoClose())
{
arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
}
- if(consumer.isNoConsume())
+ if (consumer.isNoConsume())
{
arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
@@ -1327,15 +1231,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- arguments, // arguments
- tag, // consumerTag
- consumer.isExclusive(), // exclusive
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
- consumer.isNoLocal(), // noLocal
- nowait, // nowait
- queueName, // queue
- 0); // ticket
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ arguments, // arguments
+ tag, // consumerTag
+ consumer.isExclusive(), // exclusive
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+ consumer.isNoLocal(), // noLocal
+ nowait, // nowait
+ queueName, // queue
+ 0); // ticket
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1516,12 +1420,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- false, // ifEmpty
- false, // ifUnused
- true, // nowait
- queueName, // queue
- 0); // ticket
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ false, // ifEmpty
+ false, // ifUnused
+ true, // nowait
+ queueName, // queue
+ 0); // ticket
_connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
}
catch (AMQException e)
@@ -1561,7 +1465,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
checkValidQueue(queue);
- return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector);
+ return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1611,10 +1515,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
- queueName, // queue
- routingKey); // routingKey
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
+ queueName, // queue
+ routingKey); // routingKey
AMQMethodEvent response = null;
try
{
@@ -1675,9 +1579,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- deliveryTag, // deliveryTag
- multiple); // multiple
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ deliveryTag, // deliveryTag
+ multiple); // multiple
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
@@ -1707,28 +1611,55 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
void start()
{
- if (_dispatcher != null)
+ //fixme This should be controlled by _stopped as it pairs with the stop method
+ //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled.
+ //will result in sending Flow messages for each subsequent call to flow.. only need to do this
+ // if we have called stop.
+ if (_startedAtLeastOnce.getAndSet(true))
{
//then we stopped this and are restarting, so signal server to resume delivery
unsuspendChannel();
}
- _dispatcher = new Dispatcher();
- _dispatcher.setDaemon(true);
- _dispatcher.start();
+
+ if (hasMessageListeners())
+ {
+ startDistpatcherIfNecessary();
+ }
}
- void stop()
+ private boolean hasMessageListeners()
{
- //stop the server delivering messages to this session
- suspendChannel();
+ return _hasMessageListeners;
+ }
- //stop the dispatcher thread
- _stopped.set(true);
+ void setHasMessageListeners()
+ {
+ _hasMessageListeners = true;
}
- boolean isStopped()
+ synchronized void startDistpatcherIfNecessary()
{
- return _stopped.get();
+ if (_dispatcher == null)
+ {
+ _dispatcher = new Dispatcher();
+ _dispatcher.setDaemon(true);
+ _dispatcher.start();
+ }
+ else
+ {
+ _dispatcher.setConnectionStopped(false);
+ }
+ }
+
+ void stop()
+ {
+ //stop the server delivering messages to this session
+ suspendChannel();
+
+ if (_dispatcher != null)
+ {
+ _dispatcher.setConnectionStopped(true);
+ }
}
/**
@@ -1775,7 +1706,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
Destination dest = consumer.getDestination();
- synchronized(dest)
+ synchronized (dest)
{
if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
{
@@ -1840,8 +1771,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- false); // active
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ false); // active
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
@@ -1852,15 +1783,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- true); // active
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ true); // active
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
public void confirmConsumerCancelled(String consumerTag)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
- if((consumer != null) && (consumer.isAutoClose()))
+ if ((consumer != null) && (consumer.isAutoClose()))
{
consumer.closeWhenNoMessages(true);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index bdf26e6a5e..c5e57d2d1c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -189,7 +189,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public String getMessageSelector() throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
return _messageSelector;
}
@@ -211,22 +211,27 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
//if the current listener is non-null and the session is not stopped, then
//it is an error to call this method.
//i.e. it is only valid to call this method if
//
- // (a) the session is stopped, in which case the dispatcher is not running
+ // (a) the connection is stopped, in which case the dispatcher is not running
// OR
// (b) the listener is null AND we are not receiving synchronously at present
//
- if (_session.isStopped())
+ if (!_session.getAMQConnection().started())
{
_messageListener.set(messageListener);
- _logger.debug("Session stopped : Message listener set for destination " + _destination);
+ _session.setHasMessageListeners();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
+ }
}
else
{
@@ -247,25 +252,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
synchronized (_session)
{
- //Pause Dispatcher
- _session.doDispatcherTask(new DispatcherCallback(this)
- {
- public void whilePaused(Queue<MessageConsumerPair> reprocessQueue)
- {
- // Prepend messages in _synchronousQueue to dispatcher queue
- _logger.debug("ReprocessQueue current size:" + reprocessQueue.size());
- for (Object item : _synchronousQueue)
- {
- reprocessQueue.offer(new MessageConsumerPair(_consumer, item));
- }
- _logger.debug("Added items to reprocessQueue:" + reprocessQueue.size());
-
- // Set Message Listener
- _logger.debug("Set Message Listener");
- _messageListener.set(messageListener);
- }
- }
- );
+ _messageListener.set(messageListener);
+ _session.setHasMessageListeners();
+ _session.startDistpatcherIfNecessary();
}
}
}
@@ -273,7 +262,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
{
- if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
_unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
String url = jmsMsg.getStringProperty(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString());
@@ -286,7 +275,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.warn("Unable to parse the supplied destination header: " + url);
}
-
+
}
_session.setInRecovery(false);
}
@@ -347,13 +336,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receive(long l) throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
acquireReceiving();
+ _session.startDistpatcherIfNecessary();
+
try
{
- if(closeOnAutoClose())
+ if (closeOnAutoClose())
{
return null;
}
@@ -388,7 +379,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private boolean closeOnAutoClose() throws JMSException
{
- if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
+ if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
{
close(false);
return true;
@@ -401,13 +392,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receiveNoWait() throws JMSException
{
- checkPreConditions();
+ checkPreConditions();
acquireReceiving();
+ _session.startDistpatcherIfNecessary();
+
try
{
- if(closeOnAutoClose())
+ if (closeOnAutoClose())
{
return null;
}
@@ -463,19 +456,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close(boolean sendClose) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
- if(sendClose)
+ if (sendClose)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- _consumerTag, // consumerTag
- false); // nowait
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ _consumerTag, // consumerTag
+ false); // nowait
try
{
@@ -572,8 +565,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
postDeliver(jmsMessage);
}
else
- {
- //This shouldn't be possible.
+ {
_synchronousQueue.put(jmsMessage);
}
}
@@ -607,7 +599,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private void postDeliver(AbstractJMSMessage msg) throws JMSException
{
- msg.setJMSDestination(_destination);
+ msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
case Session.CLIENT_ACKNOWLEDGE:
@@ -691,7 +683,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private void deregisterConsumer()
{
- _session.deregisterConsumer(this);
+ _session.deregisterConsumer(this);
}
public String getConsumerTag()
@@ -704,26 +696,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_consumerTag = consumerTag;
}
- public AMQSession getSession() {
- return _session;
- }
+ public AMQSession getSession()
+ {
+ return _session;
+ }
- private void checkPreConditions() throws JMSException{
+ private void checkPreConditions() throws JMSException
+ {
- this.checkNotClosed();
+ this.checkNotClosed();
- if(_session == null || _session.isClosed()){
- throw new javax.jms.IllegalStateException("Invalid Session");
- }
- }
+ if (_session == null || _session.isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Invalid Session");
+ }
+ }
public void acknowledge() throws JMSException
{
- if(!isClosed())
+ if (!isClosed())
{
Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator();
- while(tags.hasNext())
+ while (tags.hasNext())
{
_session.acknowledgeMessage(tags.next(), false);
tags.remove();
@@ -758,10 +753,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_closeWhenNoMessages = b;
- if(_closeWhenNoMessages
- && _synchronousQueue.isEmpty()
- && _receiving.get()
- && _messageListener != null)
+ if (_closeWhenNoMessages
+ && _synchronousQueue.isEmpty()
+ && _receiving.get()
+ && _messageListener != null)
{
_receivingThread.interrupt();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 74b91c3c30..a567427b26 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -389,9 +389,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
else
{
- //TODO; Do we really want to create an empty message here ?
- newMessage = (AbstractJMSMessage) _session.createMessage();
- return new MessageConverter(newMessage).getConvertedMessage();
+ newMessage = new MessageConverter(message).getConvertedMessage();
}
if (newMessage != null)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index 8640bbb999..ac63345473 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -121,20 +121,27 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
-
- clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
- clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
- clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
- clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
+
+ try
+ {
+ clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
+ clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
+ clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
+ clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- clientProperties, // clientProperties
- selectedLocale, // locale
- mechanism, // mechanism
- saslResponse)); // response
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ clientProperties, // clientProperties
+ selectedLocale, // locale
+ mechanism, // mechanism
+ saslResponse)); // response
}
catch (UnsupportedEncodingException e)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
index 58089f595b..7dcee74a1d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
@@ -25,7 +25,8 @@ import org.apache.log4j.Logger;
import javax.jms.*;
import java.util.Enumeration;
-public class MessageConverter {
+public class MessageConverter
+{
/**
* Log4J logger
@@ -117,6 +118,16 @@ public class MessageConverter {
_newMessage = (AbstractJMSMessage) nativeMessage;
setMessageProperties(message);
}
+
+ public MessageConverter(Message message) throws JMSException
+ {
+ //Send a message with just properties.
+ // Throwing away content
+ BytesMessage nativeMessage = new JMSBytesMessage();
+
+ _newMessage = (AbstractJMSMessage) nativeMessage;
+ setMessageProperties(message);
+ }
public AbstractJMSMessage getConvertedMessage()
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index fbf5451b3b..09a6f3be38 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -29,6 +29,7 @@ import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.failover.FailoverHandler;
@@ -147,6 +148,19 @@ public class AMQProtocolHandler extends IoHandlerAdapter
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
}
+ try
+ {
+
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+ threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+ threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+ }
+ catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ }
+
+
_protocolSession = new AMQProtocolSession(this, session, _connection);
_protocolSession.init();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 50bd1667f9..56b49230ff 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -53,6 +53,8 @@ public class AMQStateManager implements AMQMethodListener
private final Map _state2HandlersMap = new HashMap();
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+ private final Object _stateLock = new Object();
+ private static final long MAXIMUM_STATE_WAIT_TIME = 30000l;
public AMQStateManager()
{
@@ -62,7 +64,7 @@ public class AMQStateManager implements AMQMethodListener
protected AMQStateManager(AMQState state, boolean register)
{
_currentState = state;
- if(register)
+ if (register)
{
registerListeners();
}
@@ -118,17 +120,10 @@ public class AMQStateManager implements AMQMethodListener
public void changeState(AMQState newState) throws AMQException
{
_logger.debug("State changing to " + newState + " from old state " + _currentState);
- final AMQState oldState = _currentState;
- _currentState = newState;
-
- synchronized (_stateListeners)
+ synchronized (_stateLock)
{
- final Iterator it = _stateListeners.iterator();
- while (it.hasNext())
- {
- final StateListener l = (StateListener) it.next();
- l.stateChanged(oldState, newState);
- }
+ _currentState = newState;
+ _stateLock.notifyAll();
}
}
@@ -195,36 +190,34 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public void addStateListener(StateListener listener)
- {
- _logger.debug("Adding state listener");
- _stateListeners.add(listener);
- }
-
- public void removeStateListener(StateListener listener)
+ public void attainState(final AMQState s) throws AMQException
{
- _stateListeners.remove(listener);
- }
-
- public void attainState(AMQState s) throws AMQException
- {
- boolean needToWait = false;
- StateWaiter sw = null;
- synchronized (_stateListeners)
+ synchronized (_stateLock)
{
+ final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
+ long waitTime = MAXIMUM_STATE_WAIT_TIME;
+
+ while (_currentState != s && waitTime > 0)
+ {
+ try
+ {
+ _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Thread interrupted");
+ }
+ if (_currentState != s)
+ {
+ waitTime = waitUntilTime - System.currentTimeMillis();
+ }
+ }
if (_currentState != s)
{
- _logger.debug("Adding state wait to reach state " + s);
- sw = new StateWaiter(s);
- addStateListener(sw);
- // we use a boolean since we must release the lock before starting to wait
- needToWait = true;
+ _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
+ throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
}
}
- if (needToWait)
- {
- sw.waituntilStateHasChanged();
- }
// at this point the state will have changed.
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index d6364f45b0..5e6244d7cc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -71,7 +71,7 @@ public class SocketTransportConnection implements ITransportConnection
boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
if (readWriteThreading)
{
- cfg.setThreadModel(new ReadWriteThreadModel());
+ cfg.setThreadModel(ReadWriteThreadModel.getInstance());
}
SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index e9e23aefdb..99a4c5f30d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -70,7 +70,7 @@ public class TransportConnection
IoServiceConfig config = _acceptor.getDefaultConfig();
- config.setThreadModel(new ReadWriteThreadModel());
+ config.setThreadModel(ReadWriteThreadModel.getInstance());
}
public static ITransportConnection getInstance() throws AMQTransportConnectionException
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
new file mode 100644
index 0000000000..ecc8f1d1e9
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class DispatcherTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(DispatcherTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 6;
+ private int _receivedCount = 0;
+ private int _receivedCountWhileStopped = 0;
+ private Connection _clientConnection, _producerConnection;
+ private MessageConsumer _consumer;
+ MessageProducer _producer;
+ Session _clientSession, _producerSession;
+
+ private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1); //all messages Sent Lock
+ private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1); //all messages Sent Lock
+
+ private volatile boolean _connectionStopped = false;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
+ env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client 1
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer = _clientSession.createConsumer(queue);
+
+ //Create Producer
+ _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _producerConnection.start();
+
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _producer = _producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(_producerSession.createTextMessage("Message " + msg));
+ }
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+
+ _clientConnection.close();
+
+ _producerConnection.close();
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ public void testAsynchronousRecieve()
+ {
+
+ _logger.info("Test Start");
+
+
+ assertTrue(!((AMQConnection) _clientConnection).started());
+
+ //Set default Message Listener
+ try
+ {
+ _consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message);
+
+ _receivedCount++;
+
+ if (_receivedCount == MSG_COUNT)
+ {
+ _allFirstMessagesSent.countDown();
+ }
+
+ if (_connectionStopped)
+ {
+ _logger.info("Running with Message:" + _receivedCount);
+ }
+
+ if (_connectionStopped && _allFirstMessagesSent.getCount() == 0)
+ {
+ _receivedCountWhileStopped++;
+ }
+
+ if (_allFirstMessagesSent.getCount() == 0)
+ {
+ if (_receivedCount == MSG_COUNT * 2)
+ {
+ _allSecondMessagesSent.countDown();
+ }
+ }
+ }
+ });
+
+ assertTrue("Connecion should not be started", !((AMQConnection) _clientConnection).started());
+ _clientConnection.start();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Default ML on consumer1");
+ }
+
+
+ try
+ {
+ _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+
+ try
+ {
+ assertTrue("Connecion should be started", ((AMQConnection) _clientConnection).started());
+ _clientConnection.stop();
+ _connectionStopped = true;
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error stopping connection");
+ }
+
+
+ try
+ {
+ _logger.error("Send additional messages");
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(_producerSession.createTextMessage("Message " + msg));
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Unable to send additional messages", e);
+ }
+
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+
+ try
+ {
+ _logger.info("Restarting connection");
+
+ _connectionStopped = false;
+ _clientConnection.start();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error Setting Better ML on consumer1", e);
+ }
+
+
+ _logger.info("Waiting upto 2 seconds for messages");
+
+ try
+ {
+ _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+
+ assertEquals("Messages not received correctly", 0, _allFirstMessagesSent.getCount());
+ assertEquals("Messages not received correctly", 0, _allSecondMessagesSent.getCount());
+ assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount);
+ assertEquals("Messages received while stopped is not 0", 0, _receivedCountWhileStopped);
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(DispatcherTest.class);
+ }
+} \ No newline at end of file
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java b/qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java
index f7bea1b36a..c3434164d8 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/message/NonQpidObjectMessage.java
@@ -24,7 +24,7 @@ import javax.jms.*;
import java.util.Enumeration;
import java.io.Serializable;
-public class TestNonQpidTextMessage implements ObjectMessage {
+public class NonQpidObjectMessage implements ObjectMessage {
private JMSObjectMessage _realMessage;
private String _contentString;
@@ -34,7 +34,7 @@ public class TestNonQpidTextMessage implements ObjectMessage {
* does not inherit from the Qpid message superclasses
* and expand our unit testing of MessageConverter et al
*/
- public TestNonQpidTextMessage()
+ public NonQpidObjectMessage()
{
_realMessage = new JMSObjectMessage();
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
new file mode 100644
index 0000000000..48208193ce
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.message.AMQMessage;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.AMQConnectionFailureException;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class ConnectionStartTest extends TestCase
+{
+
+ String _broker = "vm://:1";
+
+ AMQConnection _connection;
+ private Session _consumerSess;
+ private MessageConsumer _consumer;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ try
+ {
+ AMQQueue queue = new AMQQueue("ConnectionStartTest");
+
+ AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "/test");
+
+ Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+
+ MessageProducer pub = pubSess.createProducer(queue);
+
+ pub.send(pubSess.createTextMessage("Initial Message"));
+
+ _connection = new AMQConnection(_broker, "guest", "guest", "fred", "/" +
+ "" +
+ "" +
+ "" +
+ "" +
+ "" +
+ "" +
+ "" +
+ "test");
+
+ _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+
+ _consumer = _consumerSess.createConsumer(queue);
+
+ pubCon.close();
+
+ }
+ catch (Exception e)
+ {
+ fail("Connection to " + _broker + " should succeed. Reason: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _connection.close();
+ TransportConnection.killVMBroker(1);
+ }
+
+ public void testSimpleReceiveConnection()
+ {
+ try
+ {
+ assertTrue("Connection should not be started", !_connection.started());
+ //Note that this next line will start the dispatcher in the session
+ // should really not be called before _connection start
+ assertTrue("There should not be messages waiting for the consumer", _consumer.receiveNoWait() == null);
+ _connection.start();
+ assertTrue("There should be messages waiting for the consumer", _consumer.receiveNoWait() == null);
+ assertTrue("Connection should be started", _connection.started());
+
+ }
+ catch (JMSException e)
+ {
+ fail("An error occured during test because:" + e);
+ }
+
+ }
+
+ public void testMessageListenerConnection()
+ {
+ final CountDownLatch _gotMessage = new CountDownLatch(1);
+
+ try
+ {
+ assertTrue("Connection should not be started", !_connection.started());
+ _consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ assertTrue("Connection should be started", _connection.started());
+ assertEquals("Mesage Received", "Initial Message", ((TextMessage) message).getText());
+ _gotMessage.countDown();
+ }
+ catch (JMSException e)
+ {
+ fail("Couldn't get message text because:" + e.getCause());
+ }
+ }
+ });
+
+ assertTrue("Connection should not be started", !_connection.started());
+ _connection.start();
+ assertTrue("Connection should be started", _connection.started());
+
+ try
+ {
+ _gotMessage.await(1000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ fail("Timed out awaiting message via onMessage");
+ }
+
+ }
+ catch (JMSException e)
+ {
+ fail("Failed because:" + e.getCause());
+ }
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ConnectionStartTest.class);
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
index 456748e0d2..b0d06ae148 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
@@ -26,7 +26,8 @@ import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.message.TestNonQpidTextMessage;
+import org.apache.qpid.client.message.NonQpidObjectMessage;
+
import javax.jms.*;
@@ -70,7 +71,7 @@ public class JMSPropertiesTest extends TestCase
MessageProducer producer = producerSession.createProducer(queue);
//create a test message to send
- ObjectMessage sentMsg = new TestNonQpidTextMessage();
+ ObjectMessage sentMsg = new NonQpidObjectMessage();
sentMsg.setJMSCorrelationID(JMS_CORR_ID);
sentMsg.setJMSDeliveryMode(JMS_DELIV_MODE);
sentMsg.setJMSType(JMS_TYPE);
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
index 6a335b8627..a8a5c7d8b2 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
@@ -31,10 +31,12 @@ import javax.jms.Message;
import javax.jms.Destination;
import javax.jms.TextMessage;
import javax.jms.MapMessage;
+import javax.jms.JMSException;
import java.util.HashMap;
-public class MessageConverterTest extends TestCase {
+public class MessageConverterTest extends TestCase
+{
public static final String JMS_CORR_ID = "QPIDID_01";
public static final int JMS_DELIV_MODE = 1;
@@ -50,53 +52,79 @@ public class MessageConverterTest extends TestCase {
super.setUp();
testTextMessage = new JMSTextMessage();
- //Add JMSProperties
- testTextMessage.setJMSCorrelationID(JMS_CORR_ID);
- testTextMessage.setJMSDeliveryMode(JMS_DELIV_MODE);
- testTextMessage.setJMSType(JMS_TYPE);
- testTextMessage.setJMSReplyTo(JMS_REPLY_TO);
+ //Set Message Text
testTextMessage.setText("testTextMessage text");
-
- //Add non-JMS properties
- testTextMessage.setStringProperty("testProp1","testValue1");
- testTextMessage.setDoubleProperty("testProp2",Double.MIN_VALUE);
+ setMessageProperties(testTextMessage);
testMapMessage = new JMSMapMessage();
- testMapMessage.setString("testMapString","testMapStringValue");
- testMapMessage.setDouble("testMapDouble",Double.MAX_VALUE);
+ testMapMessage.setString("testMapString", "testMapStringValue");
+ testMapMessage.setDouble("testMapDouble", Double.MAX_VALUE);
}
public void testSetProperties() throws Exception
{
- AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage();
-
- //check JMS prop values on newMessage match
- assertEquals("JMS Correlation ID mismatch",testTextMessage.getJMSCorrelationID(),newMessage.getJMSCorrelationID());
- assertEquals("JMS Delivery mode mismatch",testTextMessage.getJMSDeliveryMode(),newMessage.getJMSDeliveryMode());
- assertEquals("JMS Type mismatch",testTextMessage.getJMSType(),newMessage.getJMSType());
- assertEquals("JMS Reply To mismatch",testTextMessage.getJMSReplyTo(),newMessage.getJMSReplyTo());
-
- //check non-JMS standard props ok too
- assertEquals("Test String prop value mismatch",testTextMessage.getStringProperty("testProp1"),
- newMessage.getStringProperty("testProp1"));
- assertEquals("Test Double prop value mismatch",testTextMessage.getDoubleProperty("testProp2"),
- newMessage.getDoubleProperty("testProp2"));
+ AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage();
+ mesagePropertiesTest(testTextMessage, newMessage);
}
public void testJMSTextMessageConversion() throws Exception
{
- AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage();
- assertEquals("Converted message text mismatch",((JMSTextMessage)newMessage).getText(),testTextMessage.getText());
+ AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage();
+ assertEquals("Converted message text mismatch", ((JMSTextMessage) newMessage).getText(), testTextMessage.getText());
}
public void testJMSMapMessageConversion() throws Exception
{
- AbstractJMSMessage newMessage = new MessageConverter((MapMessage)testMapMessage).getConvertedMessage();
- assertEquals("Converted map message String mismatch",((JMSMapMessage)newMessage).getString("testMapString"),
- testMapMessage.getString("testMapString"));
- assertEquals("Converted map message Double mismatch",((JMSMapMessage)newMessage).getDouble("testMapDouble"),
- testMapMessage.getDouble("testMapDouble"));
+ AbstractJMSMessage newMessage = new MessageConverter((MapMessage) testMapMessage).getConvertedMessage();
+ assertEquals("Converted map message String mismatch", ((JMSMapMessage) newMessage).getString("testMapString"),
+ testMapMessage.getString("testMapString"));
+ assertEquals("Converted map message Double mismatch", ((JMSMapMessage) newMessage).getDouble("testMapDouble"),
+ testMapMessage.getDouble("testMapDouble"));
+
+ }
+
+ public void testMessageConversion() throws Exception
+ {
+ Message newMessage = new NonQpidMessage();
+ setMessageProperties(newMessage);
+ mesagePropertiesTest(testTextMessage, newMessage);
+ }
+
+ private void setMessageProperties(Message message) throws JMSException
+ {
+ message.setJMSCorrelationID(JMS_CORR_ID);
+ message.setJMSDeliveryMode(JMS_DELIV_MODE);
+ message.setJMSType(JMS_TYPE);
+ message.setJMSReplyTo(JMS_REPLY_TO);
+ //Add non-JMS properties
+ message.setStringProperty("testProp1", "testValue1");
+ message.setDoubleProperty("testProp2", Double.MIN_VALUE);
+ }
+
+
+ private void mesagePropertiesTest(Message expectedMessage, Message actualMessage)
+ {
+ try
+ {
+ //check JMS prop values on newMessage match
+ assertEquals("JMS Correlation ID mismatch", expectedMessage.getJMSCorrelationID(), actualMessage.getJMSCorrelationID());
+ assertEquals("JMS Delivery mode mismatch", expectedMessage.getJMSDeliveryMode(), actualMessage.getJMSDeliveryMode());
+ assertEquals("JMS Type mismatch", expectedMessage.getJMSType(), actualMessage.getJMSType());
+ assertEquals("JMS Reply To mismatch", expectedMessage.getJMSReplyTo(), actualMessage.getJMSReplyTo());
+
+ //check non-JMS standard props ok too
+ assertEquals("Test String prop value mismatch", expectedMessage.getStringProperty("testProp1"),
+ actualMessage.getStringProperty("testProp1"));
+
+ assertEquals("Test Double prop value mismatch", expectedMessage.getDoubleProperty("testProp2"),
+ actualMessage.getDoubleProperty("testProp2"));
+ }
+ catch (JMSException e)
+ {
+ fail("An error occured testing the property values" + e.getCause());
+ e.printStackTrace();
+ }
}
protected void tearDown() throws Exception
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java
new file mode 100644
index 0000000000..e992290513
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/NonQpidMessage.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.message;
+
+import javax.jms.Message;
+import javax.jms.JMSException;
+import javax.jms.Destination;
+import java.util.Enumeration;
+import java.util.Hashtable;
+
+public class NonQpidMessage implements Message
+{
+ private String _JMSMessageID;
+ private long _JMSTimestamp;
+ private byte[] _JMSCorrelationIDAsBytes;
+ private String _JMSCorrelationID;
+ private Destination _JMSReplyTo;
+ private Destination _JMSDestination;
+ private int _JMSDeliveryMode;
+ private boolean _JMSRedelivered;
+ private String _JMSType;
+ private long _JMSExpiration;
+ private int _JMSPriority;
+ private Hashtable _properties;
+
+ public NonQpidMessage()
+ {
+ _properties = new Hashtable();
+ _JMSPriority = javax.jms.Message.DEFAULT_PRIORITY;
+ _JMSDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
+ }
+
+ public String getJMSMessageID() throws JMSException
+ {
+ return _JMSMessageID;
+ }
+
+ public void setJMSMessageID(String string) throws JMSException
+ {
+ _JMSMessageID = string;
+ }
+
+ public long getJMSTimestamp() throws JMSException
+ {
+ return _JMSTimestamp;
+ }
+
+ public void setJMSTimestamp(long l) throws JMSException
+ {
+ _JMSTimestamp = l;
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+ {
+ return _JMSCorrelationIDAsBytes;
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
+ {
+ _JMSCorrelationIDAsBytes = bytes;
+ }
+
+ public void setJMSCorrelationID(String string) throws JMSException
+ {
+ _JMSCorrelationID = string;
+ }
+
+ public String getJMSCorrelationID() throws JMSException
+ {
+ return _JMSCorrelationID;
+ }
+
+ public Destination getJMSReplyTo() throws JMSException
+ {
+ return _JMSReplyTo;
+ }
+
+ public void setJMSReplyTo(Destination destination) throws JMSException
+ {
+ _JMSReplyTo = destination;
+ }
+
+ public Destination getJMSDestination() throws JMSException
+ {
+ return _JMSDestination;
+ }
+
+ public void setJMSDestination(Destination destination) throws JMSException
+ {
+ _JMSDestination = destination;
+ }
+
+ public int getJMSDeliveryMode() throws JMSException
+ {
+ return _JMSDeliveryMode;
+ }
+
+ public void setJMSDeliveryMode(int i) throws JMSException
+ {
+ _JMSDeliveryMode = i;
+ }
+
+ public boolean getJMSRedelivered() throws JMSException
+ {
+ return _JMSRedelivered;
+ }
+
+ public void setJMSRedelivered(boolean b) throws JMSException
+ {
+ _JMSRedelivered = b;
+ }
+
+ public String getJMSType() throws JMSException
+ {
+ return _JMSType;
+ }
+
+ public void setJMSType(String string) throws JMSException
+ {
+ _JMSType = string;
+ }
+
+ public long getJMSExpiration() throws JMSException
+ {
+ return _JMSExpiration;
+ }
+
+ public void setJMSExpiration(long l) throws JMSException
+ {
+ _JMSExpiration = l;
+ }
+
+ public int getJMSPriority() throws JMSException
+ {
+ return _JMSPriority;
+ }
+
+ public void setJMSPriority(int i) throws JMSException
+ {
+ _JMSPriority = i;
+ }
+
+ public void clearProperties() throws JMSException
+ {
+ _properties.clear();
+ }
+
+ public boolean propertyExists(String string) throws JMSException
+ {
+ return _properties.containsKey(string);
+ }
+
+ public boolean getBooleanProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Boolean)
+ {
+ return (Boolean) o;
+ }
+ else
+ {
+ return Boolean.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public byte getByteProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Byte)
+ {
+ return (Byte) o;
+ }
+ else
+ {
+ return Byte.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public short getShortProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Short)
+ {
+ return (Short) o;
+ }
+ else
+ {
+ return Short.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public int getIntProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Integer)
+ {
+ return (Integer) o;
+ }
+ else
+ {
+ return Integer.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public long getLongProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Long)
+ {
+ return (Long) o;
+ }
+ else
+ {
+ return Long.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public float getFloatProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Float)
+ {
+ return (Float) o;
+ }
+ else
+ {
+ return Float.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public double getDoubleProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Double)
+ {
+ return (Double) o;
+ }
+ else
+ {
+ return Double.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public String getStringProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof String)
+ {
+ return (String) o;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public Object getObjectProperty(String string) throws JMSException
+ {
+ if (propertyExists(string))
+ {
+ Object o = _properties.get(string);
+ if (o instanceof Boolean)
+ {
+ return (Boolean) o;
+ }
+ else
+ {
+ return Boolean.valueOf(null);
+ }
+ }
+ else
+ {
+ throw new JMSException("property does not exist: " + string);
+ }
+ }
+
+ public Enumeration getPropertyNames() throws JMSException
+ {
+ return _properties.keys();
+ }
+
+ public void setBooleanProperty(String string, boolean b) throws JMSException
+ {
+ _properties.put(string, b);
+ }
+
+ public void setByteProperty(String string, byte b) throws JMSException
+ {
+ _properties.put(string, b);
+ }
+
+ public void setShortProperty(String string, short i) throws JMSException
+ {
+ _properties.put(string, i);
+ }
+
+ public void setIntProperty(String string, int i) throws JMSException
+ {
+ _properties.put(string, i);
+ }
+
+ public void setLongProperty(String string, long l) throws JMSException
+ {
+ _properties.put(string, l);
+ }
+
+ public void setFloatProperty(String string, float v) throws JMSException
+ {
+ _properties.put(string, v);
+ }
+
+ public void setDoubleProperty(String string, double v) throws JMSException
+ {
+ _properties.put(string, v);
+ }
+
+ public void setStringProperty(String string, String string1) throws JMSException
+ {
+ _properties.put(string, string1);
+ }
+
+ public void setObjectProperty(String string, Object object) throws JMSException
+ {
+ _properties.put(string, object);
+ }
+
+ public void acknowledge() throws JMSException
+ {
+
+ }
+
+ public void clearBody() throws JMSException
+ {
+
+ }
+}