From d054b41aaa1466b65c9dc2acf1b22ca98ec3128c Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Wed, 16 Apr 2008 13:32:13 +0000 Subject: QPID-901: updates to the java client to use the 0-10 final spec instead of the 0-10 preview spec; this includes improvements to the codegen process as well as some modifications to the shared code path in the client to not lose per message state when consumers are closed. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648692 13f79535-47bb-0310-9956-ffa450edef68 --- .../example/amqpexample/direct/DeclareQueue.java | 4 +- .../example/amqpexample/direct/DirectProducer.java | 8 +- .../qpid/example/amqpexample/direct/Listener.java | 9 +- .../example/amqpexample/fanout/DeclareQueue.java | 4 +- .../amqpexample/fanout/FannoutProducer.java | 8 +- .../qpid/example/amqpexample/fanout/Listener.java | 11 +- .../example/amqpexample/pubsub/TopicListener.java | 14 +- .../example/amqpexample/pubsub/TopicPublisher.java | 8 +- .../java/org/apache/qpid/client/AMQSession.java | 69 +- .../org/apache/qpid/client/AMQSession_0_10.java | 115 +- .../org/apache/qpid/client/AMQSession_0_8.java | 21 + .../apache/qpid/client/BasicMessageConsumer.java | 100 +- .../qpid/client/BasicMessageConsumer_0_10.java | 57 +- .../qpid/client/BasicMessageProducer_0_10.java | 62 +- .../org/apache/qpid/client/XAResourceImpl.java | 127 +- .../qpid/client/message/AbstractJMSMessage.java | 2 + .../client/message/AbstractJMSMessageFactory.java | 39 +- .../PropertiesFileInitialContextFactory.java | 269 ---- .../java/org/apache/qpidity/nclient/Client.java | 21 +- .../org/apache/qpidity/nclient/DtxSession.java | 29 +- .../java/org/apache/qpidity/nclient/Session.java | 78 +- .../apache/qpidity/nclient/impl/ClientSession.java | 35 +- .../nclient/impl/ClientSessionDelegate.java | 14 +- .../apache/qpidity/nclient/impl/DemoClient.java | 24 +- .../qpidity/nclient/impl/LargeMsgDemoClient.java | 6 +- .../qpidity/nclient/interop/BasicInteropTest.java | 15 +- .../apache/qpidity/njms/ConnectionFactoryImpl.java | 521 -------- .../org/apache/qpidity/njms/ConnectionImpl.java | 503 -------- .../qpidity/njms/ConnectionMetaDataImpl.java | 165 --- .../apache/qpidity/njms/CustomJMSXProperty.java | 47 - .../org/apache/qpidity/njms/DestinationImpl.java | 259 ---- .../org/apache/qpidity/njms/ExceptionHelper.java | 60 - .../java/org/apache/qpidity/njms/MessageActor.java | 176 --- .../apache/qpidity/njms/MessageConsumerImpl.java | 672 ---------- .../apache/qpidity/njms/MessageProducerImpl.java | 384 ------ .../apache/qpidity/njms/QpidBrowserListener.java | 70 -- .../qpidity/njms/QpidExceptionListenerImpl.java | 51 - .../org/apache/qpidity/njms/QueueBrowserImpl.java | 256 ---- .../apache/qpidity/njms/QueueConnectionImpl.java | 36 - .../java/org/apache/qpidity/njms/QueueImpl.java | 136 -- .../org/apache/qpidity/njms/QueueReceiverImpl.java | 55 - .../org/apache/qpidity/njms/QueueSenderImpl.java | 131 -- .../org/apache/qpidity/njms/QueueSessionImpl.java | 154 --- .../java/org/apache/qpidity/njms/SessionImpl.java | 1325 -------------------- .../apache/qpidity/njms/TemporaryDestination.java | 41 - .../apache/qpidity/njms/TemporaryQueueImpl.java | 88 -- .../apache/qpidity/njms/TemporaryTopicImpl.java | 71 -- .../apache/qpidity/njms/TopicConnectionImpl.java | 35 - .../java/org/apache/qpidity/njms/TopicImpl.java | 129 -- .../apache/qpidity/njms/TopicPublisherImpl.java | 128 -- .../org/apache/qpidity/njms/TopicSessionImpl.java | 155 --- .../apache/qpidity/njms/TopicSubscriberImpl.java | 72 -- .../org/apache/qpidity/njms/XAConnectionImpl.java | 71 -- .../apache/qpidity/njms/XAQueueConnectionImpl.java | 72 -- .../apache/qpidity/njms/XAQueueSessionImpl.java | 64 - .../org/apache/qpidity/njms/XAResourceImpl.java | 507 -------- .../org/apache/qpidity/njms/XASessionImpl.java | 126 -- .../apache/qpidity/njms/XATopicConnectionImpl.java | 71 -- .../apache/qpidity/njms/XATopicSessionImpl.java | 63 - .../qpidity/njms/message/BytesMessageImpl.java | 863 ------------- .../qpidity/njms/message/MapMessageImpl.java | 628 ---------- .../qpidity/njms/message/MessageFactory.java | 75 -- .../apache/qpidity/njms/message/MessageHelper.java | 456 ------- .../apache/qpidity/njms/message/MessageImpl.java | 1011 --------------- .../qpidity/njms/message/ObjectMessageImpl.java | 178 --- .../apache/qpidity/njms/message/QpidMessage.java | 445 ------- .../qpidity/njms/message/StreamMessageImpl.java | 1115 ---------------- .../qpidity/njms/message/TextMessageImpl.java | 328 ----- .../org/apache/qpid/test/unit/ack/RecoverTest.java | 2 +- .../test/unit/close/TopicPublisherCloseTest.java | 3 + .../test/unit/transacted/CommitRollbackTest.java | 37 +- .../qpid/test/unit/transacted/TransactedTest.java | 9 +- java/common/Composite.tpl | 173 +++ java/common/Constant.tpl | 14 + java/common/Enum.tpl | 36 + java/common/Invoker.tpl | 41 + java/common/MethodDelegate.tpl | 12 + java/common/Option.tpl | 19 + java/common/StructFactory.tpl | 39 + java/common/Type.tpl | 63 + java/common/build.xml | 7 +- java/common/codegen | 63 + java/common/generate | 567 --------- java/common/genutil.py | 207 +++ .../java/org/apache/qpidity/SecurityHelper.java | 8 +- .../main/java/org/apache/qpidity/ToyBroker.java | 21 +- .../main/java/org/apache/qpidity/ToyClient.java | 12 +- .../main/java/org/apache/qpidity/dtx/XidImpl.java | 26 +- .../java/org/apache/qpidity/transport/Channel.java | 10 +- .../apache/qpidity/transport/ChannelDelegate.java | 9 +- .../apache/qpidity/transport/ClientDelegate.java | 41 + .../qpidity/transport/ConnectionDelegate.java | 52 +- .../java/org/apache/qpidity/transport/Data.java | 2 +- .../java/org/apache/qpidity/transport/Method.java | 23 +- .../apache/qpidity/transport/ProtocolDelegate.java | 4 +- .../org/apache/qpidity/transport/RangeSet.java | 7 + .../java/org/apache/qpidity/transport/Session.java | 130 +- .../apache/qpidity/transport/SessionDelegate.java | 45 +- .../java/org/apache/qpidity/transport/Struct.java | 21 +- .../qpidity/transport/TransportConstants.java | 6 +- .../qpidity/transport/codec/AbstractDecoder.java | 212 ++-- .../qpidity/transport/codec/AbstractEncoder.java | 288 +++-- .../apache/qpidity/transport/codec/Decoder.java | 34 +- .../apache/qpidity/transport/codec/Encoder.java | 34 +- .../org/apache/qpidity/transport/codec/Sizer.java | 38 +- .../qpidity/transport/network/Assembler.java | 26 +- .../qpidity/transport/network/Disassembler.java | 96 +- .../apache/qpidity/transport/network/Frame.java | 15 +- .../qpidity/transport/network/InputHandler.java | 8 +- .../qpidity/transport/network/OutputHandler.java | 6 +- .../transport/network/mina/MinaHandler.java | 2 +- .../apache/qpidity/transport/util/Functions.java | 15 +- .../apache/qpidity/transport/ConnectionTest.java | 4 +- java/common/templating.py | 101 ++ java/cpp.async.testprofile | 4 +- 115 files changed, 1925 insertions(+), 13609 deletions(-) delete mode 100644 java/client/src/main/java/org/apache/qpidity/naming/PropertiesFileInitialContextFactory.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TemporaryDestination.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TemporaryQueueImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TemporaryTopicImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TopicConnectionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TopicImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TopicPublisherImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TopicSessionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TopicSubscriberImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XAConnectionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XAQueueConnectionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XAQueueSessionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XASessionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XATopicConnectionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XATopicSessionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/BytesMessageImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/MapMessageImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/MessageFactory.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/MessageHelper.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/MessageImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/ObjectMessageImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/QpidMessage.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/StreamMessageImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/TextMessageImpl.java create mode 100644 java/common/Composite.tpl create mode 100644 java/common/Constant.tpl create mode 100644 java/common/Enum.tpl create mode 100644 java/common/Invoker.tpl create mode 100644 java/common/MethodDelegate.tpl create mode 100644 java/common/Option.tpl create mode 100644 java/common/StructFactory.tpl create mode 100644 java/common/Type.tpl create mode 100755 java/common/codegen delete mode 100755 java/common/generate create mode 100644 java/common/genutil.py create mode 100644 java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java create mode 100644 java/common/templating.py (limited to 'java') diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java index a9257ccf70..8bb27847ce 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java @@ -31,13 +31,13 @@ public class DeclareQueue // declare and bind queue session.queueDeclare("message_queue", null, null); - session.queueBind("message_queue", "amq.direct", "routing_key", null); + session.exchangeBind("message_queue", "amq.direct", "routing_key", null); // confirm completion session.sync(); //cleanup - session.sessionClose(); + session.sessionDetach(session.getName()); try { con.close(); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java index 7c27051fb2..7329792a2b 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java @@ -8,6 +8,8 @@ import org.apache.qpidity.nclient.Connection; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessageListener; import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageAcceptMode; +import org.apache.qpidity.transport.MessageAcquireMode; public class DirectProducer implements MessageListener { @@ -65,13 +67,13 @@ public class DirectProducer implements MessageListener for (int i=0; i<10; i++) { - session.messageTransfer("amq.direct", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + session.messageTransfer("amq.direct", MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED); session.header(deliveryProps); session.data("Message " + i); session.endData(); } - session.messageTransfer("amq.direct", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + session.messageTransfer("amq.direct", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED); session.header(deliveryProps); session.data("That's all, folks!"); session.endData(); @@ -80,7 +82,7 @@ public class DirectProducer implements MessageListener session.sync(); //cleanup - session.sessionClose(); + session.sessionDetach(session.getName()); try { con.close(); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java index 244dbdbeae..b199c3a69a 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java @@ -9,6 +9,8 @@ import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessageListener; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; +import org.apache.qpidity.transport.MessageCreditUnit; + /** * This listens to messages on a queue and terminates * when it sees the final message @@ -84,8 +86,9 @@ public class Listener implements MessageListener // issue credits - session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES); - session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_MESSAGE, 11); + // XXX + session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 11); // confirm completion session.sync(); @@ -96,7 +99,7 @@ public class Listener implements MessageListener session.messageCancel("listener_destination"); //cleanup - session.sessionClose(); + session.sessionDetach(session.getName()); try { diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java index cb49e9d28f..67f24148d8 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java @@ -31,13 +31,13 @@ public class DeclareQueue // declare and bind queue session.queueDeclare("message_queue", null, null); - session.queueBind("message_queue", "amq.fanout",null, null); + session.exchangeBind("message_queue", "amq.fanout",null, null); // confirm completion session.sync(); //cleanup - session.sessionClose(); + session.sessionDetach(session.getName()); try { con.close(); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java index 1dfc468cd4..4c647fde36 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java @@ -4,6 +4,8 @@ import org.apache.qpidity.nclient.Client; import org.apache.qpidity.nclient.Connection; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageAcceptMode; +import org.apache.qpidity.transport.MessageAcquireMode; public class FannoutProducer { @@ -32,13 +34,13 @@ public class FannoutProducer for (int i=0; i<10; i++) { - session.messageTransfer("amq.fanout", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + session.messageTransfer("amq.fanout", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED); session.header(deliveryProps); session.data("Message " + i); session.endData(); } - session.messageTransfer("amq.fanout", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + session.messageTransfer("amq.fanout", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED); session.header(deliveryProps); session.data("That's all, folks!"); session.endData(); @@ -47,7 +49,7 @@ public class FannoutProducer session.sync(); //cleanup - session.sessionClose(); + session.sessionDetach(session.getName()); try { con.close(); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java index cf12e80b5c..1feb5cfe23 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java @@ -9,6 +9,10 @@ import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessageListener; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; +import org.apache.qpidity.transport.MessageAcceptMode; +import org.apache.qpidity.transport.MessageAcquireMode; +import org.apache.qpidity.transport.MessageCreditUnit; + /** * This listens to messages on a queue and terminates * when it sees the final message @@ -83,8 +87,9 @@ public class Listener implements MessageListener // issue credits - session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES); - session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_MESSAGE, 11); + // XXX + session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 11); // confirm completion session.sync(); @@ -95,7 +100,7 @@ public class Listener implements MessageListener session.messageCancel("listener_destination"); //cleanup - session.sessionClose(); + session.sessionDetach(session.getName()); try { con.close(); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java index e5c560860e..f26e5418b4 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java @@ -8,6 +8,7 @@ import org.apache.qpidity.nclient.Connection; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessageListener; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; +import org.apache.qpidity.transport.MessageCreditUnit; import org.apache.qpidity.transport.Option; @@ -44,11 +45,11 @@ public class TopicListener implements MessageListener } } - public void prepareQueue(Session session,String queueName,String routingKey) + public void prepareQueue(Session session,String queueName,String bindingKey) { session.queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE); - session.queueBind(queueName, "amq.topic", routingKey, null); - session.queueBind(queueName, "amq.topic", "control", null); + session.exchangeBind(queueName, "amq.topic", bindingKey, null); + session.exchangeBind(queueName, "amq.topic", "control", null); session.messageSubscribe(queueName,queueName, Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, @@ -56,8 +57,9 @@ public class TopicListener implements MessageListener new MessagePartListenerAdapter(this), null, Option.NO_OPTION); // issue credits - session.messageFlow(queueName, Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES); - session.messageFlow(queueName, Session.MESSAGE_FLOW_UNIT_MESSAGE, 24); + // XXX: need to be able to set to null + session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + session.messageFlow(queueName, MessageCreditUnit.MESSAGE, 24); } public void cancelSubscription(Session session,String dest) @@ -106,7 +108,7 @@ public class TopicListener implements MessageListener listener.cancelSubscription(session,"weather"); //cleanup - session.sessionClose(); + session.sessionDetach(session.getName()); try { con.close(); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java index b33a6b967b..c960643504 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java @@ -4,6 +4,8 @@ import org.apache.qpidity.nclient.Client; import org.apache.qpidity.nclient.Connection; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.transport.DeliveryProperties; +import org.apache.qpidity.transport.MessageAcceptMode; +import org.apache.qpidity.transport.MessageAcquireMode; public class TopicPublisher { @@ -16,7 +18,7 @@ public class TopicPublisher deliveryProps.setRoutingKey(routing_key); for (int i=0; i<5; i++) { - session.messageTransfer("amq.topic", Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + session.messageTransfer("amq.topic", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED); session.header(deliveryProps); session.data("Message " + i); session.endData(); @@ -26,7 +28,7 @@ public class TopicPublisher public void noMoreMessages(Session session) { - session.messageTransfer("amq.topic", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + session.messageTransfer("amq.topic", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED); session.header(new DeliveryProperties().setRoutingKey("control")); session.data("That's all, folks!"); session.endData(); @@ -61,7 +63,7 @@ public class TopicPublisher session.sync(); //cleanup - session.sessionClose(); + session.sessionDetach(session.getName()); try { con.close(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a6c7b70bf6..f79523e546 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -201,8 +202,18 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess */ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); + /** + * All the not yet acknowledged message tags + */ + protected ConcurrentLinkedQueue _unacknowledgedMessageTags = new ConcurrentLinkedQueue(); + + /** + * All the delivered message tags + */ + protected ConcurrentLinkedQueue _deliveredMessageTags = new ConcurrentLinkedQueue(); + /** Holds the dispatcher thread for this session. */ - private Dispatcher _dispatcher; + protected Dispatcher _dispatcher; /** Holds the message factory factory for this session. */ protected MessageFactoryRegistry _messageFactoryRegistry; @@ -372,9 +383,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess throw new IllegalStateException("Session is already closed"); } - for (BasicMessageConsumer consumer : _consumers.values()) + while (true) { - consumer.acknowledge(); + Long tag = _unacknowledgedMessageTags.poll(); + if (tag == null) + { + break; + } + acknowledgeMessage(tag, false); } } @@ -553,15 +569,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess try { - // Acknowledge up to message last delivered (if any) for each consumer. - // need to send ack for messages delivered to consumers so far - for (Iterator i = _consumers.values().iterator(); i.hasNext();) + // Acknowledge all delivered messages + while (true) { - // Sends acknowledgement to server - i.next().acknowledgeLastDelivered(); + Long tag = _deliveredMessageTags.poll(); + if (tag == null) + { + break; + } + + acknowledgeMessage(tag, false); } - // Commits outstanding messages sent and outstanding acknowledgements. + // Commits outstanding messages and acknowledgments sendCommit(); } catch (AMQException e) @@ -1136,6 +1156,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return _suspended; } + protected void addUnacknowledgedMessage(long id) + { + _unacknowledgedMessageTags.add(id); + } + + protected void addDeliveredMessage(long id) + { + _deliveredMessageTags.add(id); + } + /** * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto * the queue read by the dispatcher. @@ -1167,7 +1197,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { AMQProtocolHandler protocolHandler = getProtocolHandler(); declareExchange(amqd, protocolHandler, false); - AMQShortString queueName = declareQueue(amqd, protocolHandler); + AMQShortString queueName = declareQueue(amqd, protocolHandler, false); bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd); } @@ -1215,11 +1245,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess suspendChannel(true); } - for (BasicMessageConsumer consumer : _consumers.values()) - { - consumer.clearUnackedMessages(); - } - if (_dispatcher != null) { _dispatcher.rollback(); @@ -1296,10 +1321,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess suspendChannel(true); } - if (_dispatcher != null) - { - _dispatcher.rollback(); - } + releaseForRollback(); sendRollback(); @@ -1319,6 +1341,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } + public abstract void releaseForRollback(); + public abstract void sendRollback() throws AMQException, FailoverException ; @@ -1973,7 +1997,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @todo Be aware of possible changes to parameter order as versions change. */ - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean noLocal) throws AMQException { /*return new FailoverRetrySupport(*/ @@ -2111,7 +2136,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess declareExchange(amqd, protocolHandler, false); - AMQShortString queueName = declareQueue(amqd, protocolHandler); + AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal()); // store the consumer queue name consumer.setQueuename(queueName); @@ -2317,7 +2342,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException; /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - private class Dispatcher extends Thread + class Dispatcher extends Thread { /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 16d5a07141..2d60877c5e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -31,9 +31,11 @@ import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; +import org.apache.qpidity.transport.MessageCreditUnit; +import org.apache.qpidity.transport.MessageFlowMode; import org.apache.qpidity.transport.RangeSet; import org.apache.qpidity.transport.Option; -import org.apache.qpidity.transport.BindingQueryResult; +import org.apache.qpidity.transport.ExchangeBoundResult; import org.apache.qpidity.transport.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +44,7 @@ import javax.jms.*; import javax.jms.IllegalStateException; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.HashMap; import java.util.UUID; import java.util.Map; @@ -68,11 +71,6 @@ public class AMQSession_0_10 extends AMQSession private Object _currentExceptionLock = new Object(); private QpidException _currentException; - /** - * All the not yet acknoledged message tags - */ - private ConcurrentLinkedQueue _unacknowledgedMessageTags = new ConcurrentLinkedQueue(); - //--- constructors /** @@ -125,19 +123,6 @@ public class AMQSession_0_10 extends AMQSession defaultPrefetchHigh, defaultPrefetchLow); } - //------- 0-10 specific methods - - /** - * Add a message tag to be acknowledged - * This is used for client ack mode - * - * @param tag The id of the message to be acknowledged - */ - void addMessageTag(long tag) - { - _unacknowledgedMessageTags.add(tag); - } - //------- overwritten methods of class AMQSession public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) @@ -219,7 +204,7 @@ public class AMQSession_0_10 extends AMQSession for (AMQShortString rk: destination.getBindingKeys()) { _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); - getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); + getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); } // We need to sync so that we get notify of an error. getQpidSession().sync(); @@ -237,7 +222,8 @@ public class AMQSession_0_10 extends AMQSession public void sendClose(long timeout) throws AMQException, FailoverException { getQpidSession().sync(); - getQpidSession().sessionClose(); + getQpidSession().sessionRequestTimeout(0); + getQpidSession().sessionDetach(getQpidSession().getName()); getCurrentException(); } @@ -285,18 +271,43 @@ public class AMQSession_0_10 extends AMQSession public void sendRecover() throws AMQException, FailoverException { // release all unack messages - /*RangeSet ranges = new RangeSet(); - for (long messageTag : _unacknowledgedMessageTags) + RangeSet ranges = new RangeSet(); + while (true) { - // release this message - ranges.add(messageTag); - }*/ - getQpidSession().messageRecover(Option.REQUEUE); + Long tag = _unacknowledgedMessageTags.poll(); + if (tag == null) + { + break; + } + ranges.add(tag); + } + getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); } + public void releaseForRollback() + { + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + + RangeSet ranges = new RangeSet(); + while (true) + { + Long tag = _deliveredMessageTags.poll(); + if (tag == null) + { + break; + } + + ranges.add(tag); + } + getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + } + /** * Release (0_8 notion of Reject) an acquired message * @@ -308,7 +319,7 @@ public class AMQSession_0_10 extends AMQSession // The value of requeue is always true RangeSet ranges = new RangeSet(); ranges.add(deliveryTag); - getQpidSession().messageRelease(ranges); + getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); //I don't think we need to sync } @@ -357,9 +368,8 @@ public class AMQSession_0_10 extends AMQSession rk = routingKey.toString(); } - Future result = - getQpidSession().bindingQuery(exchangeName.toString(),queueName.toString(), rk, null); - BindingQueryResult bindingQueryResult = result.get(); + ExchangeBoundResult bindingQueryResult = + getQpidSession().exchangeBound(exchangeName.toString(),queueName.toString(), rk, null).get(); if (rk == null) { @@ -394,25 +404,25 @@ public class AMQSession_0_10 extends AMQSession (Boolean.getBoolean("noAck") ?Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED), preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, - consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); + if (ClientProperties.MAX_PREFETCH == 0) { - getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); + getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT); } else { - getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); + getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.WINDOW); } - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. // only if not immediat prefetch if(ClientProperties.MAX_PREFETCH > 0 && (consumer.isStrated() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumer.getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, - ClientProperties.MAX_PREFETCH); + MessageCreditUnit.MESSAGE, + ClientProperties.MAX_PREFETCH); } getQpidSession().sync(); getCurrentException(); @@ -458,7 +468,8 @@ public class AMQSession_0_10 extends AMQSession /** * Declare a queue with the given queueName */ - public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean noLocal) throws AMQException, FailoverException { AMQShortString res; @@ -471,10 +482,16 @@ public class AMQSession_0_10 extends AMQSession { res = amqd.getAMQQueueName(); } - getQpidSession().queueDeclare(res.toString(), null, null, + Map arguments = null; + if (noLocal) + { + arguments = new HashMap(); + arguments.put("no-local", true); + } + getQpidSession().queueDeclare(res.toString(), null, arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NO_OPTION, amqd.isDurable() ? Option.DURABLE : Option.NO_OPTION, - amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); + !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); // passive --> false // We need to sync so that we get notify of an error. getQpidSession().sync(); @@ -519,18 +536,17 @@ public class AMQSession_0_10 extends AMQSession if (consumer.getMessageListener() != null) { getQpidSession().messageFlow(consumer.getConsumerTag().toString(), - Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1); } } else { getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, - ClientProperties.MAX_PREFETCH); + .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE, + ClientProperties.MAX_PREFETCH); } getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, - 0xFFFFFFFF); + .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); } catch (Exception e) { @@ -546,8 +562,8 @@ public class AMQSession_0_10 extends AMQSession public void sendRollback() throws AMQException, FailoverException { - getQpidSession().txRollback(); - // We need to sync so that we get notify of an error. + getQpidSession().txRollback(); + // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); } @@ -622,7 +638,8 @@ public class AMQSession_0_10 extends AMQSession } } - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean noLocal) throws AMQException { /*return new FailoverRetrySupport(*/ @@ -636,7 +653,7 @@ public class AMQSession_0_10 extends AMQSession { amqd.setQueueName(new AMQShortString("TempQueue" + UUID.randomUUID())); } - return send0_10QueueDeclare(amqd, protocolHandler); + return send0_10QueueDeclare(amqd, protocolHandler, noLocal); } }, _connection).execute(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index a087f9e02a..dd0fd9c457 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -146,6 +146,8 @@ public class AMQSession_0_8 extends AMQSession public void sendRecover() throws AMQException, FailoverException { + _unacknowledgedMessageTags.clear(); + if (isStrictAMQP()) { // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. @@ -162,6 +164,25 @@ public class AMQSession_0_8 extends AMQSession } } + public void releaseForRollback() + { + while (true) + { + Long tag = _deliveredMessageTags.poll(); + if (tag == null) + { + break; + } + + rejectMessage(tag, true); + } + + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + } + public void rejectMessage(long deliveryTag, boolean requeue) { if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 0ae282a12e..015a2ccc57 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -131,13 +131,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Me */ private boolean _dups_ok_acknowledge_send; - private ConcurrentLinkedQueue _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue(); - - /** - * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. - */ - private ConcurrentLinkedQueue _receivedDeliveryTags = new ConcurrentLinkedQueue(); - /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a * receive() is in progress. @@ -276,10 +269,9 @@ public abstract class BasicMessageConsumer extends Closeable implements Me protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { - if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { - _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); + _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); } _session.setInRecovery(false); @@ -744,33 +736,13 @@ public abstract class BasicMessageConsumer extends Closeable implements Me } else { - _receivedDeliveryTags.add(msg.getDeliveryTag()); + _session.addDeliveredMessage(msg.getDeliveryTag()); } break; } } - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - */ - void acknowledgeLastDelivered() - { - if (!_receivedDeliveryTags.isEmpty()) - { - long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - _session.acknowledgeMessage(lastDeliveryTag, true); - } - } - void notifyError(Throwable cause) { // synchronized (_closed) @@ -841,32 +813,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Me } } - public void acknowledge() // throws JMSException - { - if (!isClosed()) - { - - Iterator tags = _unacknowledgedDeliveryTags.iterator(); - while (tags.hasNext()) - { - _session.acknowledgeMessage(tags.next(), false); - tags.remove(); - } - } - else - { - throw new IllegalStateException("Consumer is closed"); - } - } - - /** - * Called on recovery to reset the list of delivery tags - */ - public void clearUnackedMessages() - { - _unacknowledgedDeliveryTags.clear(); - } - public boolean isAutoClose() { return _autoClose; @@ -890,15 +836,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Me public void rollback() { - clearUnackedMessages(); - - if (!_receivedDeliveryTags.isEmpty()) - { - _logger.debug("Rejecting received messages in _receivedDTs (RQ)"); - } - - rollbackReceivedMessages(); - // rollback pending messages if (_synchronousQueue.size() > 0) { @@ -944,39 +881,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Me } } - protected void rollbackReceivedMessages() - { - // rollback received but not committed messages - while (!_receivedDeliveryTags.isEmpty()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting the messages(" + _receivedDeliveryTags - .size() + ") in _receivedDTs (RQ)" + "for consumer with tag:" + _consumerTag); - } - - Long tag = _receivedDeliveryTags.poll(); - - if (tag != null) - { - if (_logger.isTraceEnabled()) - { - _logger.trace("Rejecting tag from _receivedDTs:" + tag); - } - - _session.rejectMessage(tag, true); - } - } - - if (!_receivedDeliveryTags.isEmpty()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Queue _receivedDTs (RQ) was not empty after rejection"); - } - } - } - public String debugIdentity() { return String.valueOf(_consumerTag); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index c40ec1e5cb..d7cce986aa 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -144,7 +144,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer:///[]/[]?