diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-01-16 12:13:19 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-01-16 12:13:19 +0000 |
| commit | a6b4188b807dce93ee1a54958bd92de4142a6d48 (patch) | |
| tree | f9cce3fad26832dfb5f308933baf553c99fd3b6c /qpid/java/client | |
| parent | fd0be1dce887dd4b7057ddbe13bfb627f4152390 (diff) | |
| download | qpid-python-a6b4188b807dce93ee1a54958bd92de4142a6d48.tar.gz | |
Merged Trunk Changes to version 489140.
The point where we achieved 100% JMS compilance.
Revision: 489140
Author: ritchiem
Date: 17:04:33, 20 December 2006
Message:
QPID-225
Applied Patch for queue browsing with client acknowledgement
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Revision: 489113
Author: bhupendrab
Date: 15:25:45, 20 December 2006
Message:
renamed the jar
----
Modified : /incubator/qpid/trunk/qpid/java/distribution/src/main/assembly/management-eclipse-plugin.xml
Revision: 489106
Author: ritchiem
Date: 14:54:01, 20 December 2006
Message:
QPID-101
Initial Implementation of Queue Browsing by Robert Godfrey and Martin Ritchie
AMQChannel.java - record messages browsed so not to discard them on ack.
FilterManagerFactory.java - Added a NoConsumerFilter
ConcurrentSelectorDeliveryManager.java - Update to send browsers messages without taking the message from other consumers
Subscription.java - Added autoClose and isBrowser methods
SubscriptionTestHelper.java / RemoteSubscriptionImpl.java / SubscriptionImpl.java - implemented new interface methods
Added NoConsumerFilter.java
Patches from Rob Godfrey for client implmentation
AMQSession.java - Added AUTO_CLOSE and NO_CONSUME properties to arguments FieldTable for consume method.
BasicMessageConsumer.java - updates to correctly close consumer when an BasicCancel is received from the broker.
AMQProtocolSession.java - method to allow cancellation of the client
AMQStateManager.java - added handler for BasicCancelOkMethodHandler.java
Added new AMQQueueBrowser.java BasicCancelOkMethodHandler.java
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Added : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Added : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
Modified : /incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Revision: 489083
Author: ritchiem
Date: 13:26:12, 20 December 2006
Message:
Updated FilterTypes to be more accurate NO_CONSUME and AUTO_CLOSE
----
Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
Revision: 489082
Author: ritchiem
Date: 13:22:27, 20 December 2006
Message:
QPID-233
Applied patch from Rupert Smith
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
Revision: 489078
Author: ritchiem
Date: 12:57:27, 20 December 2006
Message:
Added new enum for AMQP Filter types
----
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
Revision: 489070
Author: ritchiem
Date: 12:46:20, 20 December 2006
Message:
QPID-21 outstanding issues:
Fixed an issue where a consumer with no_local set would not have its filters applied to messages.
Fixed problem where new consumers would start with an empty PDQ rather than checking the existing queue of messages for messages of interest.
AMQQueue.java - Added code check exisiting queue data for messages for the new subscriber with a filter.
DeliveryManager.java - added populatePreDeliveryQueue
SynchronizedDeliveryManager.java/ConcurrentDeliveryManager.java - implemented new DeliveryManager.java interface
SubscriptionImpl.java - fixed issue with no_local subscribers had their filters ignored.
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
Revision: 488726
Author: ritchiem
Date: 17:02:19, 19 December 2006
Message:
QPID-222
ensured that the TXBuffer of a message is set to null when re queuing.
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Revision: 488715
Author: ritchiem
Date: 16:14:28, 19 December 2006
Message:
Maven output clean up.
Mainly removed exception stack traces from expected exceptions.
----
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
Revision: 488713
Author: ritchiem
Date: 16:09:39, 19 December 2006
Message:
Maven output clean up.
Mainly removed exception stack traces from expected exceptions.
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
Revision: 488712
Author: ritchiem
Date: 16:07:12, 19 December 2006
Message:
QPID-216
BasicConsumeMethodHandler.java - Pulled the nolocal param from the method body and passed down channel to subscription.
SubscriptionFactory.java / AMQQueue.java/AMQChannel.java - passed the nolocal parameter through to the Subscription
ConnectionStartOkMethodHandler.java - Saved the client properties so the client identifier can be used in comparison with the publisher id to implement no_local
AMQMinaProtocolSession.java - added _clientProperties to store the sent client properties.
AMQProtocolSession.java - interface changes to get/set ClientProperties
ConcurrentSelectorDeliveryManager.java - only need to do hasInterset as this will take care of the hasFilters optimisation check.
SubscriptionImpl.java - Added code to do comparison of client ids to determin insterest in a given message.
SubscriptionSet.java - tidied up code to use hasInterest as this is where the nolocal is implemented.
ConnectionStartMethodHandler.java - Moved literal values to a ClientProperties.java enumeration and a QpidProperties.java values.
QpidConnectionMetaData.java - updated to get values from QpidProperties.java
MockProtocolSession.java - null implementation of new get/set methods
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
Revision: 488711
Author: bhupendrab
Date: 16:00:13, 19 December 2006
Message:
QPID-188
Unit test for Exchange MBeans
----
Added : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
Revision: 488705
Author: bhupendrab
Date: 15:17:25, 19 December 2006
Message:
QPID-188
Adding unit tests for Java broker JMX functionality
----
Added : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
Revision: 488701
Author: bhupendrab
Date: 15:09:50, 19 December 2006
Message:
QPID-188
Adding unit tests for Java broker JMX functionality
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
Revision: 488624
Author: ritchiem
Date: 10:51:39, 19 December 2006
Message:
QPID-21
Added:
SelectorParser.jj - ActiveMQ selector javacc grammar used to generate SelectorParser.java
server/filter - Selector Filtering code from ActiveMQ project adjusted to suite our class and package structure.
server/message - Decorator classes to allow access to the JMSMessage inside the AMQMessage
ConcurrentSelectorDeliveryManager.java - A new DeliveryManager that utilises PreDeliveryQueues to implement selectors
AMQInvalidSelectorException.java - thrown on client and broker when the Selector text is invalid.
Common: log4j.properties to remove error log4j warnings on Common tests.
Modified:
broker/pom.xml - to generate SelectorParser.java
AMQChannel.java - Addition of argument fieldtable for filter setup.
BasicConsumeMethodHandler.java - writing of InvalidSelector channel close exception.
AMQMessage.java - Added decorator to get access to the enclosed JMSMessage
AMQQueue.java - Enhanced 'deliverymanager' property to allow the selection of the ConcurrentSelectorDeliveryManager.
Subscription.java - Enhanced interface to allow a subscription to state an 'interest' in a given message.
SubscriptionFactory.java - Added method to allow passing of filter arguments.
SubscriptionImpl.java - Implemented new Subscription.java methods.
SubscriptionManager.java - Added ability to get a list of current subscribers.
SubscriptionSet.java - augmented nextSubscriber to allow the subscriber to exert the new hasInterest feature.
SynchronizedDeliveryManager.java - fixed Logging class
AMQSession - Added filter extraction from consume call and pass it on to the registration.
ChannelCloseMethodHandler.java - Handle the reception and correct raising of the InvalidSelector Exception
AbstractJMSMessage.java - Expanded imports
BlockingMethodFrameListener.java - added extra info to a debug output line.
SocketTransportConnection.java - made output an info not a warn.
PropertiesFileInitialContextFactory.java - updated to allow the PROVIDER_URL to specify a property file to read in for the initial values.
ClusteredSubscriptionManager.java - Implementation of SubscriptionSet.java
NestedSubscriptionManager.java - Implementation of SubscriptionManager.java
RemoteSubscriptionImpl.java - Implementation Subscription.java
AMQConstant.java - Added '322' "Invalid Selector"
SubscriptionTestHelper.java - Implementation of Subscription.java
Edited specs/amqp-8.0.xml to add field table to consume method.
Thanks to the ActiveMQ project for writing the initial SelectorParser.jj and associated filter Expressions.
----
Modified : /incubator/qpid/trunk/qpid/java/broker/pom.xml
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/grammar(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/grammar, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/grammar/SelectorParser.jj, Revision, 488302
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java, Revision, 488302
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms, Revision, 488302
Replacing : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java, Revision, 488302
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java, Revision, 488302
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java, Revision, 488302
Modified : /incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
Modified : /incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
Modified : /incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/log4j.properties
Added : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java(Copy from path: /incubator/qpid/branches/jmsselectors/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java, Revision, 488302
Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
Modified : /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified : /incubator/qpid/trunk/qpid/specs/amqp-8.0.xml
Revision: 488596
Author: rgreig
Date: 09:29:19, 19 December 2006
Message:
QPID-215 : Patch supplied by Rob Godfrey - Implement custom JMSX properties
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
Added : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
Revision: 488594
Author: bhupendrab
Date: 09:13:29, 19 December 2006
Message:
Name corrected
----
Added : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java(Copy from path: /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQueueMBeanTest.java, Revision, 488281
Deleted : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQueueMBeanTest.java
Revision: 488450
Author: vinoski
Date: 23:09:14, 18 December 2006
Message:
clean up warnings about unused variables
Remove all warnings in common, broker, client, and systests regarding
unused variables, as indicated by Eclipse builds.
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanIntrospector.java
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/plain/PlainSaslServer.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
Modified : /incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@496666 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
31 files changed, 834 insertions, 172 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 638aa555a2..820b8c3f83 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -139,6 +139,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ private AMQException _lastAMQException = null; + + /* + * The connection meta data + */ + private QpidConnectionMetaData _connectionMetaData; + public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { @@ -281,6 +287,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw e; } + _connectionMetaData = new QpidConnectionMetaData(this); } protected boolean checkException(Throwable thrown) @@ -550,7 +557,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ConnectionMetaData getMetaData() throws JMSException { checkNotClosed(); - return QpidConnectionMetaData.instance(); + return _connectionMetaData; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java new file mode 100644 index 0000000000..5c753946a6 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.log4j.Logger; + +import java.util.Enumeration; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.*; +import javax.jms.IllegalStateException; + +public class AMQQueueBrowser implements QueueBrowser +{ + private static final Logger _logger = Logger.getLogger(AMQQueueBrowser.class); + + + private AtomicBoolean _isClosed = new AtomicBoolean(); + private final AMQSession _session; + private final AMQQueue _queue; + private final ArrayList<BasicMessageConsumer> _consumers = new ArrayList<BasicMessageConsumer>(); + private final String _messageSelector; + + + AMQQueueBrowser(AMQSession session, AMQQueue queue, String messageSelector) throws JMSException + { + _session = session; + _queue = queue; + _messageSelector = (messageSelector == null) || (messageSelector.trim().length() == 0) ? null : messageSelector; + BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + consumer.close(); + } + + public Queue getQueue() throws JMSException + { + checkState(); + return _queue; + } + + private void checkState() throws JMSException + { + if (_isClosed.get()) + { + throw new IllegalStateException("Queue Browser"); + } + if (_session.isClosed()) + { + throw new IllegalStateException("Session is closed"); + } + + } + + public String getMessageSelector() throws JMSException + { + + checkState(); + return _messageSelector; + } + + public Enumeration getEnumeration() throws JMSException + { + checkState(); + final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + _consumers.add(consumer); + + return new Enumeration() + { + + + Message _nextMessage = consumer.receive(); + + + public boolean hasMoreElements() + { + _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); + return (_nextMessage != null); + } + + public Object nextElement() + { + Message msg = _nextMessage; + try + { + _logger.info("QB:nextElement about to receive"); + + _nextMessage = consumer.receive(); + _logger.info("QB:nextElement received:" + _nextMessage); + } + catch (JMSException e) + { + _logger.warn("Exception caught while queue browsing", e); + _nextMessage = null; + } + + return msg; + } + }; + } + + public void close() throws JMSException + { + for (BasicMessageConsumer consumer : _consumers) + { + consumer.close(); + } + _consumers.clear(); + } + + +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index c25eb1f2c3..2136d565f1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -23,13 +23,15 @@ package org.apache.qpid.client; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; @@ -69,15 +71,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; /** - * Used to reference durable subscribers so they requests for unsubscribe can be handled - * correctly. Note this only keeps a record of subscriptions which have been created - * in the current instance. It does not remember subscriptions between executions of the - * client + * Used to reference durable subscribers so they requests for unsubscribe can be handled + * correctly. Note this only keeps a record of subscriptions which have been created + * in the current instance. It does not remember subscriptions between executions of the + * client */ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = - new ConcurrentHashMap<BasicMessageConsumer, String>(); + new ConcurrentHashMap<BasicMessageConsumer, String>(); /** * Used in the consume method. We generate the consume tag on the client so that we can use the nowait @@ -143,6 +145,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _inRecovery; + /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ @@ -176,7 +179,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (message.deliverBody != null) { - final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag); + final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag); if (consumer == null) { @@ -210,17 +213,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); } + else if (errorCode == AMQConstant.NO_ROUTE.getCode()) + { + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); + } else { - if (errorCode == AMQConstant.NO_ROUTE.getCode()) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); - } - else - { - _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } + _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); } + } catch (Exception e) { @@ -318,7 +319,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public BytesMessage createBytesMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -334,7 +335,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MapMessage createMapMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -350,7 +351,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public javax.jms.Message createMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -366,7 +367,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -382,7 +383,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage(Serializable object) throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -400,7 +401,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public StreamMessage createStreamMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); @@ -417,7 +418,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); @@ -434,7 +435,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage(String text) throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -504,7 +505,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { //Ensure we only try and close an open session. if (!_closed.getAndSet(true)) @@ -569,7 +570,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request @@ -721,11 +722,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void acknowledge() throws JMSException { - if(isClosed()) + if (isClosed()) { throw new IllegalStateException("Session is already closed"); } - for(BasicMessageConsumer consumer : _consumers.values()) + for (BasicMessageConsumer consumer : _consumers.values()) { consumer.acknowledge(); } @@ -734,7 +735,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public MessageListener getMessageListener() throws JMSException { checkNotClosed(); @@ -843,7 +843,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, false, null, - null); + null, + false, + false); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException @@ -855,7 +857,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, false, messageSelector, - null); + null, + false, + false); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) @@ -868,7 +872,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi noLocal, false, messageSelector, - null); + null, + false, + false); + } + + public MessageConsumer createBrowserConsumer(Destination destination, + String messageSelector, + boolean noLocal) + throws JMSException + { + checkValidDestination(destination); + return createConsumerImpl(destination, + _defaultPrefetchHighMark, + _defaultPrefetchLowMark, + noLocal, + false, + messageSelector, + null, + true, + true); } public MessageConsumer createConsumer(Destination destination, @@ -878,7 +901,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null); + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false); } @@ -890,7 +913,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector) throws JMSException { checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false); } public MessageConsumer createConsumer(Destination destination, @@ -902,7 +925,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, - selector, rawSelector); + selector, rawSelector, false, false); } public MessageConsumer createConsumer(Destination destination, @@ -915,7 +938,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, - selector, rawSelector); + selector, rawSelector, false, false); } protected MessageConsumer createConsumerImpl(final Destination destination, @@ -924,7 +947,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final boolean noLocal, final boolean exclusive, final String selector, - final FieldTable rawSelector) throws JMSException + final FieldTable rawSelector, + final boolean noConsume, + final boolean autoClose) throws JMSException { checkTemporaryDestination(destination); @@ -948,12 +973,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal, _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, - _acknowledgeMode); + _acknowledgeMode, noConsume, autoClose); try { registerConsumer(consumer, false); } + catch (AMQInvalidSelectorException ise) + { + JMSException ex = new InvalidSelectorException(ise.getMessage()); + ex.setLinkedException(ise); + throw ex; + } catch (AMQException e) { JMSException ex = new JMSException("Error registering consumer: " + e); @@ -963,7 +994,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized(destination) { - _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger()); + _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); _destinationConsumerCount.get(destination).incrementAndGet(); } @@ -975,16 +1006,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void checkTemporaryDestination(Destination destination) throws JMSException { - if((destination instanceof TemporaryDestination)) + if ((destination instanceof TemporaryDestination)) { _logger.debug("destination is temporary"); final TemporaryDestination tempDest = (TemporaryDestination) destination; - if(tempDest.getSession() != this) + if (tempDest.getSession() != this) { _logger.debug("destination is on different session"); throw new JMSException("Cannot consume from a temporary destination created onanother session"); } - if(tempDest.isDeleted()) + if (tempDest.isDeleted()) { _logger.debug("destination is deleted"); throw new JMSException("Cannot consume from a deleted destination"); @@ -1065,12 +1096,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @return the consumer tag generated by the broker */ private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler, - boolean nowait) throws AMQException + boolean nowait, String messageSelector) throws AMQException { //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag String tag = Integer.toString(_nextTag++); + FieldTable arguments = FieldTableFactory.newFieldTable(); + if (messageSelector != null && !messageSelector.equals("")) + { + arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); + } + if(consumer.isAutoClose()) + { + arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); + } + if(consumer.isNoConsume()) + { + arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); + } + consumer.setConsumerTag(tag); // we must register the consumer in the map before we actually start listening _consumers.put(tag, consumer); @@ -1080,7 +1125,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, queueName, tag, consumer.isNoLocal(), consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, - consumer.isExclusive(), nowait); + consumer.isExclusive(), nowait, arguments); if (nowait) { protocolHandler.writeFrame(jmsConsume); @@ -1220,7 +1265,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection); + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); TopicSubscriberAdaptor subscriber = _subscriptions.get(name); if (subscriber != null) { @@ -1247,8 +1292,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - _subscriptions.put(name,subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name); + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); return subscriber; } @@ -1278,8 +1323,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); - _subscriptions.put(name,subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name); + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); return subscriber; } @@ -1291,16 +1336,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public QueueBrowser createBrowser(Queue queue) throws JMSException { - checkNotClosed(); - checkValidQueue(queue); - throw new UnsupportedOperationException("Queue browsing not supported"); + return createBrowser(queue, null); } public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { checkNotClosed(); checkValidQueue(queue); - throw new UnsupportedOperationException("Queue browsing not supported"); + return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1476,7 +1519,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - consumeFromQueue(consumer, queueName, protocolHandler, nowait); + try + { + consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); + } + catch (JMSException e) //thrown by getMessageSelector + { + throw new AMQException(e.getMessage(), e); + } } /** @@ -1489,7 +1539,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _consumers.remove(consumer.getConsumerTag()); String subscriptionName = _reverseSubscriptionMap.remove(consumer); - if(subscriptionName != null) + if (subscriptionName != null) { _subscriptions.remove(subscriptionName); } @@ -1497,7 +1547,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi Destination dest = consumer.getDestination(); synchronized(dest) { - if(_destinationConsumerCount.get(dest).decrementAndGet() == 0) + if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) { _destinationConsumerCount.remove(dest); } @@ -1567,6 +1617,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(channelFlowFrame); } + public void confirmConsumerCancelled(String consumerTag) + { + BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); + if((consumer != null) && (consumer.isAutoClose())) + { + consumer.closeWhenNoMessages(true); + } + } + + /* * I could have combined the last 3 methods, but this way it improves readability */ @@ -1576,7 +1636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { throw new javax.jms.InvalidDestinationException("Invalid Topic"); } - if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this) + if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this) { throw new JMSException("Cannot create a subscription on a temporary topic created in another session"); } @@ -1597,4 +1657,5 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi throw new javax.jms.InvalidDestinationException("Invalid Queue"); } } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index d3d9db3806..cefaca8d52 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -22,6 +22,8 @@ package org.apache.qpid.client; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; @@ -39,6 +41,7 @@ import javax.jms.MessageListener; import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import javax.jms.Destination; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -142,10 +145,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private Thread _receivingThread; + /** + * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive + * on the queue. This is used for queue browsing. + */ + private boolean _autoClose; + private boolean _closeWhenNoMessages; + + private boolean _noConsume; + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, - boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, - int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode) + boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, + int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -161,6 +173,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _exclusive = exclusive; _acknowledgeMode = acknowledgeMode; _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true); + _autoClose = autoClose; + _noConsume = noConsume; } public AMQDestination getDestination() @@ -241,6 +255,17 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); + String url = jmsMsg.getStringProperty(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString()); + try + { + Destination dest = AMQDestination.createDestination(new AMQBindingURL(url)); + jmsMsg.setJMSDestination(dest); + } + catch (URLSyntaxException e) + { + _logger.warn("Unable to parse the supplied destination header: " + url); + } + } _session.setInRecovery(false); } @@ -307,6 +332,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { + if(closeOnAutoClose()) + { + return null; + } Object o = null; if (l > 0) { @@ -336,6 +365,19 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + private boolean closeOnAutoClose() throws JMSException + { + if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) + { + close(false); + return true; + } + else + { + return false; + } + } + public Message receiveNoWait() throws JMSException { checkPreConditions(); @@ -344,6 +386,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { + if(closeOnAutoClose()) + { + return null; + } Object o = _synchronousQueue.poll(); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) @@ -388,22 +434,31 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + public void close() throws JMSException { + close(true); + } + + public void close(boolean sendClose) throws JMSException + { synchronized(_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) { - final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false); - - try + if(sendClose) { - _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); - } - catch (AMQException e) - { - _logger.error("Error closing consumer: " + e, e); - throw new JMSException("Error closing consumer: " + e); + final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false); + + try + { + _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + } + catch (AMQException e) + { + _logger.error("Error closing consumer: " + e, e); + throw new JMSException("Error closing consumer: " + e); + } } deregisterConsumer(); @@ -499,6 +554,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer msg.setJMSDestination(_destination); switch (_acknowledgeMode) { + case Session.CLIENT_ACKNOWLEDGE: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + break; case Session.DUPS_OK_ACKNOWLEDGE: if (++_outstanding >= _prefetchHigh) { @@ -525,7 +586,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } break; case Session.SESSION_TRANSACTED: - _lastDeliveryTag = msg.getDeliveryTag(); + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + _lastDeliveryTag = msg.getDeliveryTag(); + } break; } } @@ -616,4 +684,29 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _unacknowledgedDeliveryTags.clear(); } + + public boolean isAutoClose() + { + return _autoClose; + } + + + public boolean isNoConsume() + { + return _noConsume; + } + + public void closeWhenNoMessages(boolean b) + { + _closeWhenNoMessages = b; + + if(_closeWhenNoMessages + && _synchronousQueue.isEmpty() + && _receiving.get() + && _messageListener != null) + { + _receivingThread.interrupt(); + } + + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index e11d70cf41..7a5fcbccf9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -507,8 +507,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException { checkTemporaryDestination(destination); + origMessage.setJMSDestination(destination); + AbstractJMSMessage message = convertToNativeMessage(origMessage); + message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL()); AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), destination.getRoutingKey(), mandatory, immediate); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java b/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java new file mode 100644 index 0000000000..3a7b7a7b3d --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java @@ -0,0 +1,47 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.*;
+
+public enum CustomJMXProperty
+{
+ JMSX_QPID_JMSDESTINATIONURL,
+ JMSXGroupID,
+ JMSXGroupSeq;
+
+ private static Enumeration _names;
+
+ public static synchronized Enumeration asEnumeration()
+ {
+ if(_names == null)
+ {
+ CustomJMXProperty[] properties = values();
+ ArrayList<String> nameList = new ArrayList<String>(properties.length);
+ for(CustomJMXProperty property : properties)
+ {
+ nameList.add(property.toString());
+ }
+ _names = Collections.enumeration(nameList);
+ }
+ return _names;
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java index 10a65c2ad8..d9e946c397 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java @@ -1,50 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.client; +import org.apache.qpid.common.QpidProperties; + import java.util.Enumeration; import javax.jms.ConnectionMetaData; import javax.jms.JMSException; -public class QpidConnectionMetaData implements ConnectionMetaData { - - private static QpidConnectionMetaData _instance = new QpidConnectionMetaData(); - - private QpidConnectionMetaData(){ - } - - public static QpidConnectionMetaData instance(){ - return _instance; - } - - public int getJMSMajorVersion() throws JMSException { - return 1; - } - - public int getJMSMinorVersion() throws JMSException { - return 1; - } - - public String getJMSProviderName() throws JMSException { - return "Apache Qpid"; - } - - public String getJMSVersion() throws JMSException { - return "1.1"; - } - - public Enumeration getJMSXPropertyNames() throws JMSException { - return null; - } - - public int getProviderMajorVersion() throws JMSException { - return 0; - } - - public int getProviderMinorVersion() throws JMSException { - return 9; - } - - public String getProviderVersion() throws JMSException { - return "Incubating-M1"; - } +public class QpidConnectionMetaData implements ConnectionMetaData +{ + + + QpidConnectionMetaData(AMQConnection conn) + { + } + + public int getJMSMajorVersion() throws JMSException + { + return 1; + } + + public int getJMSMinorVersion() throws JMSException + { + return 1; + } + + public String getJMSProviderName() throws JMSException + { + return "Apache " + QpidProperties.getProductName(); + } + + public String getJMSVersion() throws JMSException + { + return "1.1"; + } + + public Enumeration getJMSXPropertyNames() throws JMSException + { + return CustomJMXProperty.asEnumeration(); + } + + public int getProviderMajorVersion() throws JMSException + { + return 0; + } + + public int getProviderMinorVersion() throws JMSException + { + return 8; + } + + public String getProviderVersion() throws JMSException + { + return QpidProperties.getProductName() + " (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ " + + getProtocolVersion() + "] )"; + } + + private String getProtocolVersion() + { + // TODO - Implement based on connection negotiated protocol + return "0.8"; + } + + public String getBrokerVersion() + { + // TODO - get broker version + return "<unkown>"; + } + + public String getClientVersion() + { + return QpidProperties.getBuildVerision(); + } + + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java new file mode 100644 index 0000000000..d855e97204 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java @@ -0,0 +1,35 @@ +package org.apache.qpid.client.handler; + +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.log4j.Logger; + +/** + * @author Apache Software Foundation + */ +public class BasicCancelOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); + private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + + public static BasicCancelOkMethodHandler getInstance() + { + return _instance; + } + + private BasicCancelOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.debug("New BasicCancelOk method received"); + BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); + evt.getProtocolSession().confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index 2bd93f1508..fd2968cdfd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.protocol.AMQConstant; @@ -46,7 +47,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { - _logger.debug("ChannelClose method received"); + _logger.debug("ChannelClose method received"); ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); int errorCode = method.replyCode; @@ -65,17 +66,21 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener { throw new AMQNoConsumersException("Error: " + reason, null); } + else if (errorCode == AMQConstant.NO_ROUTE.getCode()) + { + throw new AMQNoRouteException("Error: " + reason, null); + } + else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode()) + { + _logger.info("Broker responded with Invalid Selector."); + + throw new AMQInvalidSelectorException(reason); + } else { - if (errorCode == AMQConstant.NO_ROUTE.getCode()) - { - throw new AMQNoRouteException("Error: " + reason, null); - } - else - { - throw new AMQChannelClosedException(errorCode, "Error: " + reason); - } + throw new AMQChannelClosedException(errorCode, "Error: " + reason); } + } evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java index 8785e7d44e..b5001a6e64 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java @@ -20,20 +20,14 @@ */ package org.apache.qpid.client.handler; -import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQMethodEvent; -import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.ConnectionOpenOkBody; public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener { - - private static final Logger _logger = Logger.getLogger(ConnectionOpenOkMethodHandler.class); - private static final ConnectionOpenOkMethodHandler _instance = new ConnectionOpenOkMethodHandler(); public static ConnectionOpenOkMethodHandler getInstance() @@ -47,8 +41,6 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { - AMQProtocolSession session = evt.getProtocolSession(); - ConnectionOpenOkBody method = (ConnectionOpenOkBody) evt.getMethod(); stateManager.changeState(AMQState.CONNECTION_OPEN); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 9333df3fe4..f7b0cb5331 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -22,6 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.common.QpidProperties; import org.apache.qpid.client.protocol.AMQMethodEvent; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.security.AMQCallbackHandler; @@ -119,10 +121,11 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); FieldTable clientProperties = FieldTableFactory.newFieldTable(); - clientProperties.put("instance", ps.getClientID()); - clientProperties.put("product", "Qpid"); - clientProperties.put("version", "1.0"); - clientProperties.put("platform", getFullSystemInfo()); + + clientProperties.put(ClientProperties.instance.toString(), ps.getClientID()); + clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName()); + clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVerision()); + clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo()); ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism, saslResponse, selectedLocale)); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 23d6c0151e..40d8b28411 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -26,7 +26,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.client.*; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.BasicMessageConsumer; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; @@ -46,7 +47,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach protected ByteBuffer _data; private boolean _readableProperties = false; - private boolean _readableMessage = false; + protected boolean _readableMessage = false; + protected boolean _changedData; private Destination _destination; private BasicMessageConsumer _consumer; @@ -60,6 +62,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } _readableProperties = false; _readableMessage = (data != null); + _changedData = (data == null); } protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException @@ -172,13 +175,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public Destination getJMSDestination() throws JMSException { - // TODO: implement this once we have sorted out how to figure out the exchange class - return _destination; + return _destination; } public void setJMSDestination(Destination destination) throws JMSException { - _destination = destination; + _destination = destination; } public int getJMSDeliveryMode() throws JMSException @@ -522,16 +524,16 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach return !_readableMessage; } - public void reset() + public void reset() { - if (_readableMessage) + if (!_changedData) { _data.rewind(); } else { _data.flip(); - _readableMessage = true; + _changedData = false; } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index f5c9f7111a..d769300c69 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -59,6 +59,12 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag super(messageNbr, contentHeader, data); } + public void reset() + { + super.reset(); + _readableMessage = true; + } + public String getMimeType() { return MIME_TYPE; @@ -226,48 +232,56 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag public void writeBoolean(boolean b) throws JMSException { checkWritable(); + _changedData = true; _data.put(b ? (byte) 1 : (byte) 0); } public void writeByte(byte b) throws JMSException { checkWritable(); + _changedData = true; _data.put(b); } public void writeShort(short i) throws JMSException { checkWritable(); + _changedData = true; _data.putShort(i); } public void writeChar(char c) throws JMSException { checkWritable(); + _changedData = true; _data.putChar(c); } public void writeInt(int i) throws JMSException { checkWritable(); + _changedData = true; _data.putInt(i); } public void writeLong(long l) throws JMSException { checkWritable(); + _changedData = true; _data.putLong(l); } public void writeFloat(float v) throws JMSException { checkWritable(); + _changedData = true; _data.putFloat(v); } public void writeDouble(double v) throws JMSException { checkWritable(); + _changedData = true; _data.putDouble(v); } @@ -281,7 +295,7 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag _data.putShort((short)encodedString.limit()); _data.put(encodedString); - + _changedData = true; //_data.putString(string, Charset.forName("UTF-8").newEncoder()); // we must add the null terminator manually //_data.put((byte)0); @@ -298,12 +312,14 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag { checkWritable(); _data.put(bytes); + _changedData = true; } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException { checkWritable(); _data.put(bytes, offset, length); + _changedData = true; } public void writeObject(Object object) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 4fb070d2ff..35c5377f14 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -112,7 +112,7 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag } } - + public Serializable getObject() throws JMSException { ObjectInputStream in = null; @@ -123,18 +123,18 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag try { - _data.rewind(); + _data.rewind(); in = new ObjectInputStream(_data.asInputStream()); return (Serializable) in.readObject(); } catch (IOException e) - { - e.printStackTrace(); - throw new MessageFormatException("Could not deserialize message: " + e); + { + e.printStackTrace(); + throw new MessageFormatException("Could not deserialize message: " + e); } catch (ClassNotFoundException e) { - e.printStackTrace(); + e.printStackTrace(); throw new MessageFormatException("Could not deserialize message: " + e); } finally diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index c2dfdc1b65..6709ff802d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -86,6 +86,12 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess super(messageNbr, contentHeader, data); } + public void reset() + { + super.reset(); + _readableMessage = true; + } + public String getMimeType() { return MIME_TYPE; @@ -103,6 +109,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess { checkWritable(); _data.put(type); + _changedData = true; } public boolean readBoolean() throws JMSException @@ -693,7 +700,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess { _data.putString(string, Charset.forName("UTF-8").newEncoder()); // we must write the null terminator ourselves - _data.put((byte)0); + _data.put((byte) 0); } catch (CharacterCodingException e) { @@ -706,7 +713,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeBytes(byte[] bytes) throws JMSException { - writeBytes(bytes, 0, bytes == null?0:bytes.length); + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index 76f8a1c32f..d8394b0489 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -117,6 +117,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text { _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding())); } + _changedData=true; } _decodedValue = text; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index a4ed89719b..6a40fd3133 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -406,4 +406,12 @@ public class AMQProtocolSession implements ProtocolVersionList HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } + + public void confirmConsumerCancelled(int channelId, String consumerTag) + { + final Integer chId = channelId; + final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId); + + session.confirmConsumerCancelled(consumerTag); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 492571b6af..21ae3fc71f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -110,7 +110,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener } else { - throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught. + throw new AMQException("Woken up due to " + _error.getClass(), _error); // FIXME: This will wrap FailoverException and prevent it being caught. } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 887850c06e..50bd1667f9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -103,6 +103,7 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance()); frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance()); + frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance()); frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance()); frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance()); frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 0de2850080..d6364f45b0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -59,7 +59,7 @@ public class SocketTransportConnection implements ITransportConnection // once more testing of the performance of the simple allocator has been done if (!Boolean.getBoolean("amqj.enablePooledAllocator")) { - _logger.warn("Using SimpleByteBufferAllocator"); + _logger.info("Using SimpleByteBufferAllocator"); ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 9f31f7f010..d12ab01bdc 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -127,7 +127,7 @@ public class RecoverTest extends TestCase _logger.info("Starting connection"); con.start(); TextMessage tm = (TextMessage) consumer.receive(); - TextMessage tm2 = (TextMessage) consumer.receive(); + consumer.receive(); tm.acknowledge(); _logger.info("Received 2 messages, acknowledge() first message, should acknowledge both"); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java index c88024f39f..1e9de221d4 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java @@ -31,7 +31,7 @@ import javax.jms.*; public class MultipleConnectionTest extends TestCase { public static final String _defaultBroker = "vm://:1"; - public static String _connectionString = _defaultBroker; + public String _connectionString = _defaultBroker; private static class Receiver { @@ -176,9 +176,6 @@ public class MultipleConnectionTest extends TestCase { String broker = argv.length > 0 ? argv[0] : _defaultBroker; - int connections = 7; - int sessions = 2; - MultipleConnectionTest test = new MultipleConnectionTest(); test._connectionString = broker; test.test(); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index 7f76baa157..17679788bd 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -116,7 +116,9 @@ public class PropertyValueTest extends TestCase implements MessageListener m.setIntProperty("Int", (int) Integer.MAX_VALUE); m.setJMSCorrelationID("Correlation"); - m.setJMSPriority(100); + //fixme the m.setJMSMessage has no effect + producer.setPriority(8); + m.setJMSPriority(3); // Queue Queue q; @@ -182,10 +184,8 @@ public class PropertyValueTest extends TestCase implements MessageListener (int) Integer.MAX_VALUE, m.getIntProperty("Int")); Assert.assertEquals("Check CorrelationID properties are correctly transported", "Correlation", m.getJMSCorrelationID()); - - _logger.warn("getJMSPriority not being verified."); -// Assert.assertEquals("Check Priority properties are correctly transported", -// 100, m.getJMSPriority()); + Assert.assertEquals("Check Priority properties are correctly transported", + 8, m.getJMSPriority()); // Queue Assert.assertEquals("Check ReplyTo properties are correctly transported", diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java new file mode 100644 index 0000000000..27a2ccb32e --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.basic; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.BasicMessageProducer; +import org.apache.qpid.client.transport.TransportConnection; + +import org.apache.log4j.Logger; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.DeliveryMode; + +import junit.framework.TestCase; + +public class SelectorTest extends TestCase implements MessageListener +{ + + private final static Logger _logger = org.apache.log4j.Logger.getLogger(SelectorTest.class); + + private AMQConnection _connection; + private AMQDestination _destination; + private AMQSession _session; + private int count; + public String _connectionString = "vm://:1"; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + private void init(AMQConnection connection) throws Exception + { + init(connection, new AMQQueue(randomize("SessionStartTest"), true)); + } + + private void init(AMQConnection connection, AMQDestination destination) throws Exception + { + _connection = connection; + _destination = destination; + connection.start(); + + + String selector = null; +// selector = "Cost = 2 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; +// selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; + + _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + //_session.createConsumer(destination).setMessageListener(this); + _session.createConsumer(destination, selector).setMessageListener(this); + } + + public synchronized void test() throws JMSException, InterruptedException + { + try + { + Message msg = _session.createTextMessage("Message"); + msg.setJMSPriority(1); + msg.setIntProperty("Cost", 2); + msg.setJMSType("Special"); + + _logger.info("Sending Message:" + msg); + + ((BasicMessageProducer) _session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT); + System.out.println("Message sent, waiting for response..."); + wait(1000); + + if (count > 0) + { + _logger.info("Got message"); + } + + if (count == 0) + { + fail("Did not get message!"); + //throw new RuntimeException("Did not get message!"); + } + } + finally + { + _session.close(); + _connection.close(); + } + } + + public synchronized void onMessage(Message message) + { + count++; + _logger.info("Got Message:" + message); + notify(); + } + + private static String randomize(String in) + { + return in + System.currentTimeMillis(); + } + + public static void main(String[] argv) throws Exception + { + SelectorTest test = new SelectorTest(); + test._connectionString = argv.length == 0 ? "localhost:5672" : argv[0]; + test.setUp(); + test.test(); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(SelectorTest.class); + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index 7d4c2d06d3..ac789eb915 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -121,7 +121,7 @@ public class ChannelCloseOkTest extends TestCase { if (_connection != null) { - System.out.println(">>>>>>>>>>>>>>.. closing"); + _log.info(">>>>>>>>>>>>>>.. closing"); _connection.close(); } } @@ -137,7 +137,7 @@ public class ChannelCloseOkTest extends TestCase { public void onException(JMSException jmsException) { - _log.error("onException - ", jmsException); + _log.warn("onException - "+jmsException.getMessage()); } }); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java index de517459df..0b3ed931f8 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java @@ -25,7 +25,6 @@ import org.apache.qpid.client.transport.TransportConnection; import javax.jms.Connection; import javax.jms.Session; import javax.jms.MessageConsumer; -import javax.jms.Message; /** * @author Apache Software Foundation @@ -72,7 +71,7 @@ public class CloseWithBlockingReceiveTest extends TestCase }; long startTime = System.currentTimeMillis(); new Thread(r).start(); - Message m = consumer.receive(10000); + consumer.receive(10000); assertTrue(System.currentTimeMillis() - startTime < 10000); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 0da4147351..c5aa62032c 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -54,8 +54,7 @@ public class ConnectionTest extends TestCase { try { - Connection connection = new AMQConnection(_broker, "guest", "guest", - "fred", "/test"); + new AMQConnection(_broker, "guest", "guest", "fred", "/test"); } catch (Exception e) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java index bd4b3b3987..cddd73898d 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java @@ -108,7 +108,7 @@ public class MapMessageTest extends TestCase JMSMapMessage mm = TestMessageHelper.newJMSMapMessage(); mm.setString("value", null); - char c = mm.getChar("value"); + mm.getChar("value"); fail("Expected NullPointerException"); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java new file mode 100644 index 0000000000..27736ac473 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java @@ -0,0 +1,70 @@ +package org.apache.qpid.test.unit.message;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQHeadersExchange;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.PropertyFieldTable;
+
+import javax.jms.*;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class JMSDestinationTest extends TestCase
+{
+
+ private static final Logger _logger = Logger.getLogger(JMSDestinationTest.class);
+
+ public String _connectionString = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+ public void testJMSDestination() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ TextMessage sentMsg = producerSession.createTextMessage("hello");
+ assertNull(sentMsg.getJMSDestination());
+
+ producer.send(sentMsg);
+
+ assertEquals(sentMsg.getJMSDestination(), queue);
+
+ con2.close();
+
+ con.start();
+
+ TextMessage rm = (TextMessage) consumer.receive();
+ assertNotNull(rm);
+
+ assertEquals(rm.getJMSDestination(),queue);
+ con.close();
+ }
+
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 315ba6ae4c..83969822c4 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -89,10 +89,10 @@ public class StreamMessageTest extends TestCase StreamMessage msg2 = (StreamMessage) consumer.receive(); - byte b1 = msg2.readByte(); + msg2.readByte(); try { - byte b2 = msg2.readByte(); + msg2.readByte(); } catch (Exception e) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 14ceaa75f1..794316d2f5 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -260,7 +260,7 @@ public class TopicSessionTest extends TestCase TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); try { - MessageConsumer consumer2 = session2.createConsumer(topic); + session2.createConsumer(topic); fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session"); } catch (JMSException je) |
