From d054b41aaa1466b65c9dc2acf1b22ca98ec3128c Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Wed, 16 Apr 2008 13:32:13 +0000 Subject: QPID-901: updates to the java client to use the 0-10 final spec instead of the 0-10 preview spec; this includes improvements to the codegen process as well as some modifications to the shared code path in the client to not lose per message state when consumers are closed. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648692 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 69 +- .../org/apache/qpid/client/AMQSession_0_10.java | 115 +- .../org/apache/qpid/client/AMQSession_0_8.java | 21 + .../apache/qpid/client/BasicMessageConsumer.java | 100 +- .../qpid/client/BasicMessageConsumer_0_10.java | 57 +- .../qpid/client/BasicMessageProducer_0_10.java | 62 +- .../org/apache/qpid/client/XAResourceImpl.java | 127 +- .../qpid/client/message/AbstractJMSMessage.java | 2 + .../client/message/AbstractJMSMessageFactory.java | 39 +- .../PropertiesFileInitialContextFactory.java | 269 ---- .../java/org/apache/qpidity/nclient/Client.java | 21 +- .../org/apache/qpidity/nclient/DtxSession.java | 29 +- .../java/org/apache/qpidity/nclient/Session.java | 78 +- .../apache/qpidity/nclient/impl/ClientSession.java | 35 +- .../nclient/impl/ClientSessionDelegate.java | 14 +- .../apache/qpidity/nclient/impl/DemoClient.java | 24 +- .../qpidity/nclient/impl/LargeMsgDemoClient.java | 6 +- .../qpidity/nclient/interop/BasicInteropTest.java | 15 +- .../apache/qpidity/njms/ConnectionFactoryImpl.java | 521 -------- .../org/apache/qpidity/njms/ConnectionImpl.java | 503 -------- .../qpidity/njms/ConnectionMetaDataImpl.java | 165 --- .../apache/qpidity/njms/CustomJMSXProperty.java | 47 - .../org/apache/qpidity/njms/DestinationImpl.java | 259 ---- .../org/apache/qpidity/njms/ExceptionHelper.java | 60 - .../java/org/apache/qpidity/njms/MessageActor.java | 176 --- .../apache/qpidity/njms/MessageConsumerImpl.java | 672 ---------- .../apache/qpidity/njms/MessageProducerImpl.java | 384 ------ .../apache/qpidity/njms/QpidBrowserListener.java | 70 -- .../qpidity/njms/QpidExceptionListenerImpl.java | 51 - .../org/apache/qpidity/njms/QueueBrowserImpl.java | 256 ---- .../apache/qpidity/njms/QueueConnectionImpl.java | 36 - .../java/org/apache/qpidity/njms/QueueImpl.java | 136 -- .../org/apache/qpidity/njms/QueueReceiverImpl.java | 55 - .../org/apache/qpidity/njms/QueueSenderImpl.java | 131 -- .../org/apache/qpidity/njms/QueueSessionImpl.java | 154 --- .../java/org/apache/qpidity/njms/SessionImpl.java | 1325 -------------------- .../apache/qpidity/njms/TemporaryDestination.java | 41 - .../apache/qpidity/njms/TemporaryQueueImpl.java | 88 -- .../apache/qpidity/njms/TemporaryTopicImpl.java | 71 -- .../apache/qpidity/njms/TopicConnectionImpl.java | 35 - .../java/org/apache/qpidity/njms/TopicImpl.java | 129 -- .../apache/qpidity/njms/TopicPublisherImpl.java | 128 -- .../org/apache/qpidity/njms/TopicSessionImpl.java | 155 --- .../apache/qpidity/njms/TopicSubscriberImpl.java | 72 -- .../org/apache/qpidity/njms/XAConnectionImpl.java | 71 -- .../apache/qpidity/njms/XAQueueConnectionImpl.java | 72 -- .../apache/qpidity/njms/XAQueueSessionImpl.java | 64 - .../org/apache/qpidity/njms/XAResourceImpl.java | 507 -------- .../org/apache/qpidity/njms/XASessionImpl.java | 126 -- .../apache/qpidity/njms/XATopicConnectionImpl.java | 71 -- .../apache/qpidity/njms/XATopicSessionImpl.java | 63 - .../qpidity/njms/message/BytesMessageImpl.java | 863 ------------- .../qpidity/njms/message/MapMessageImpl.java | 628 ---------- .../qpidity/njms/message/MessageFactory.java | 75 -- .../apache/qpidity/njms/message/MessageHelper.java | 456 ------- .../apache/qpidity/njms/message/MessageImpl.java | 1011 --------------- .../qpidity/njms/message/ObjectMessageImpl.java | 178 --- .../apache/qpidity/njms/message/QpidMessage.java | 445 ------- .../qpidity/njms/message/StreamMessageImpl.java | 1115 ---------------- .../qpidity/njms/message/TextMessageImpl.java | 328 ----- .../org/apache/qpid/test/unit/ack/RecoverTest.java | 2 +- .../test/unit/close/TopicPublisherCloseTest.java | 3 + .../test/unit/transacted/CommitRollbackTest.java | 37 +- .../qpid/test/unit/transacted/TransactedTest.java | 9 +- 64 files changed, 389 insertions(+), 12538 deletions(-) delete mode 100644 java/client/src/main/java/org/apache/qpidity/naming/PropertiesFileInitialContextFactory.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TemporaryDestination.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TemporaryQueueImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TemporaryTopicImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TopicConnectionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TopicImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TopicPublisherImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TopicSessionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/TopicSubscriberImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XAConnectionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XAQueueConnectionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XAQueueSessionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XASessionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XATopicConnectionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/XATopicSessionImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/BytesMessageImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/MapMessageImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/MessageFactory.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/MessageHelper.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/MessageImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/ObjectMessageImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/QpidMessage.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/StreamMessageImpl.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/njms/message/TextMessageImpl.java (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a6c7b70bf6..f79523e546 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -201,8 +202,18 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess */ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); + /** + * All the not yet acknowledged message tags + */ + protected ConcurrentLinkedQueue _unacknowledgedMessageTags = new ConcurrentLinkedQueue(); + + /** + * All the delivered message tags + */ + protected ConcurrentLinkedQueue _deliveredMessageTags = new ConcurrentLinkedQueue(); + /** Holds the dispatcher thread for this session. */ - private Dispatcher _dispatcher; + protected Dispatcher _dispatcher; /** Holds the message factory factory for this session. */ protected MessageFactoryRegistry _messageFactoryRegistry; @@ -372,9 +383,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess throw new IllegalStateException("Session is already closed"); } - for (BasicMessageConsumer consumer : _consumers.values()) + while (true) { - consumer.acknowledge(); + Long tag = _unacknowledgedMessageTags.poll(); + if (tag == null) + { + break; + } + acknowledgeMessage(tag, false); } } @@ -553,15 +569,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess try { - // Acknowledge up to message last delivered (if any) for each consumer. - // need to send ack for messages delivered to consumers so far - for (Iterator i = _consumers.values().iterator(); i.hasNext();) + // Acknowledge all delivered messages + while (true) { - // Sends acknowledgement to server - i.next().acknowledgeLastDelivered(); + Long tag = _deliveredMessageTags.poll(); + if (tag == null) + { + break; + } + + acknowledgeMessage(tag, false); } - // Commits outstanding messages sent and outstanding acknowledgements. + // Commits outstanding messages and acknowledgments sendCommit(); } catch (AMQException e) @@ -1136,6 +1156,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return _suspended; } + protected void addUnacknowledgedMessage(long id) + { + _unacknowledgedMessageTags.add(id); + } + + protected void addDeliveredMessage(long id) + { + _deliveredMessageTags.add(id); + } + /** * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto * the queue read by the dispatcher. @@ -1167,7 +1197,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { AMQProtocolHandler protocolHandler = getProtocolHandler(); declareExchange(amqd, protocolHandler, false); - AMQShortString queueName = declareQueue(amqd, protocolHandler); + AMQShortString queueName = declareQueue(amqd, protocolHandler, false); bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd); } @@ -1215,11 +1245,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess suspendChannel(true); } - for (BasicMessageConsumer consumer : _consumers.values()) - { - consumer.clearUnackedMessages(); - } - if (_dispatcher != null) { _dispatcher.rollback(); @@ -1296,10 +1321,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess suspendChannel(true); } - if (_dispatcher != null) - { - _dispatcher.rollback(); - } + releaseForRollback(); sendRollback(); @@ -1319,6 +1341,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } + public abstract void releaseForRollback(); + public abstract void sendRollback() throws AMQException, FailoverException ; @@ -1973,7 +1997,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @todo Be aware of possible changes to parameter order as versions change. */ - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean noLocal) throws AMQException { /*return new FailoverRetrySupport(*/ @@ -2111,7 +2136,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess declareExchange(amqd, protocolHandler, false); - AMQShortString queueName = declareQueue(amqd, protocolHandler); + AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal()); // store the consumer queue name consumer.setQueuename(queueName); @@ -2317,7 +2342,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException; /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - private class Dispatcher extends Thread + class Dispatcher extends Thread { /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 16d5a07141..2d60877c5e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -31,9 +31,11 @@ import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; +import org.apache.qpidity.transport.MessageCreditUnit; +import org.apache.qpidity.transport.MessageFlowMode; import org.apache.qpidity.transport.RangeSet; import org.apache.qpidity.transport.Option; -import org.apache.qpidity.transport.BindingQueryResult; +import org.apache.qpidity.transport.ExchangeBoundResult; import org.apache.qpidity.transport.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +44,7 @@ import javax.jms.*; import javax.jms.IllegalStateException; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.HashMap; import java.util.UUID; import java.util.Map; @@ -68,11 +71,6 @@ public class AMQSession_0_10 extends AMQSession private Object _currentExceptionLock = new Object(); private QpidException _currentException; - /** - * All the not yet acknoledged message tags - */ - private ConcurrentLinkedQueue _unacknowledgedMessageTags = new ConcurrentLinkedQueue(); - //--- constructors /** @@ -125,19 +123,6 @@ public class AMQSession_0_10 extends AMQSession defaultPrefetchHigh, defaultPrefetchLow); } - //------- 0-10 specific methods - - /** - * Add a message tag to be acknowledged - * This is used for client ack mode - * - * @param tag The id of the message to be acknowledged - */ - void addMessageTag(long tag) - { - _unacknowledgedMessageTags.add(tag); - } - //------- overwritten methods of class AMQSession public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) @@ -219,7 +204,7 @@ public class AMQSession_0_10 extends AMQSession for (AMQShortString rk: destination.getBindingKeys()) { _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); - getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); + getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); } // We need to sync so that we get notify of an error. getQpidSession().sync(); @@ -237,7 +222,8 @@ public class AMQSession_0_10 extends AMQSession public void sendClose(long timeout) throws AMQException, FailoverException { getQpidSession().sync(); - getQpidSession().sessionClose(); + getQpidSession().sessionRequestTimeout(0); + getQpidSession().sessionDetach(getQpidSession().getName()); getCurrentException(); } @@ -285,18 +271,43 @@ public class AMQSession_0_10 extends AMQSession public void sendRecover() throws AMQException, FailoverException { // release all unack messages - /*RangeSet ranges = new RangeSet(); - for (long messageTag : _unacknowledgedMessageTags) + RangeSet ranges = new RangeSet(); + while (true) { - // release this message - ranges.add(messageTag); - }*/ - getQpidSession().messageRecover(Option.REQUEUE); + Long tag = _unacknowledgedMessageTags.poll(); + if (tag == null) + { + break; + } + ranges.add(tag); + } + getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); } + public void releaseForRollback() + { + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + + RangeSet ranges = new RangeSet(); + while (true) + { + Long tag = _deliveredMessageTags.poll(); + if (tag == null) + { + break; + } + + ranges.add(tag); + } + getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + } + /** * Release (0_8 notion of Reject) an acquired message * @@ -308,7 +319,7 @@ public class AMQSession_0_10 extends AMQSession // The value of requeue is always true RangeSet ranges = new RangeSet(); ranges.add(deliveryTag); - getQpidSession().messageRelease(ranges); + getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); //I don't think we need to sync } @@ -357,9 +368,8 @@ public class AMQSession_0_10 extends AMQSession rk = routingKey.toString(); } - Future result = - getQpidSession().bindingQuery(exchangeName.toString(),queueName.toString(), rk, null); - BindingQueryResult bindingQueryResult = result.get(); + ExchangeBoundResult bindingQueryResult = + getQpidSession().exchangeBound(exchangeName.toString(),queueName.toString(), rk, null).get(); if (rk == null) { @@ -394,25 +404,25 @@ public class AMQSession_0_10 extends AMQSession (Boolean.getBoolean("noAck") ?Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED), preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, - consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); + if (ClientProperties.MAX_PREFETCH == 0) { - getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); + getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT); } else { - getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); + getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.WINDOW); } - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. // only if not immediat prefetch if(ClientProperties.MAX_PREFETCH > 0 && (consumer.isStrated() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumer.getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, - ClientProperties.MAX_PREFETCH); + MessageCreditUnit.MESSAGE, + ClientProperties.MAX_PREFETCH); } getQpidSession().sync(); getCurrentException(); @@ -458,7 +468,8 @@ public class AMQSession_0_10 extends AMQSession /** * Declare a queue with the given queueName */ - public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean noLocal) throws AMQException, FailoverException { AMQShortString res; @@ -471,10 +482,16 @@ public class AMQSession_0_10 extends AMQSession { res = amqd.getAMQQueueName(); } - getQpidSession().queueDeclare(res.toString(), null, null, + Map arguments = null; + if (noLocal) + { + arguments = new HashMap(); + arguments.put("no-local", true); + } + getQpidSession().queueDeclare(res.toString(), null, arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NO_OPTION, amqd.isDurable() ? Option.DURABLE : Option.NO_OPTION, - amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); + !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); // passive --> false // We need to sync so that we get notify of an error. getQpidSession().sync(); @@ -519,18 +536,17 @@ public class AMQSession_0_10 extends AMQSession if (consumer.getMessageListener() != null) { getQpidSession().messageFlow(consumer.getConsumerTag().toString(), - Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1); } } else { getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, - ClientProperties.MAX_PREFETCH); + .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE, + ClientProperties.MAX_PREFETCH); } getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, - 0xFFFFFFFF); + .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); } catch (Exception e) { @@ -546,8 +562,8 @@ public class AMQSession_0_10 extends AMQSession public void sendRollback() throws AMQException, FailoverException { - getQpidSession().txRollback(); - // We need to sync so that we get notify of an error. + getQpidSession().txRollback(); + // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); } @@ -622,7 +638,8 @@ public class AMQSession_0_10 extends AMQSession } } - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean noLocal) throws AMQException { /*return new FailoverRetrySupport(*/ @@ -636,7 +653,7 @@ public class AMQSession_0_10 extends AMQSession { amqd.setQueueName(new AMQShortString("TempQueue" + UUID.randomUUID())); } - return send0_10QueueDeclare(amqd, protocolHandler); + return send0_10QueueDeclare(amqd, protocolHandler, noLocal); } }, _connection).execute(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index a087f9e02a..dd0fd9c457 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -146,6 +146,8 @@ public class AMQSession_0_8 extends AMQSession public void sendRecover() throws AMQException, FailoverException { + _unacknowledgedMessageTags.clear(); + if (isStrictAMQP()) { // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. @@ -162,6 +164,25 @@ public class AMQSession_0_8 extends AMQSession } } + public void releaseForRollback() + { + while (true) + { + Long tag = _deliveredMessageTags.poll(); + if (tag == null) + { + break; + } + + rejectMessage(tag, true); + } + + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + } + public void rejectMessage(long deliveryTag, boolean requeue) { if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 0ae282a12e..015a2ccc57 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -131,13 +131,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Me */ private boolean _dups_ok_acknowledge_send; - private ConcurrentLinkedQueue _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue(); - - /** - * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. - */ - private ConcurrentLinkedQueue _receivedDeliveryTags = new ConcurrentLinkedQueue(); - /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a * receive() is in progress. @@ -276,10 +269,9 @@ public abstract class BasicMessageConsumer extends Closeable implements Me protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { - if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { - _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); + _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); } _session.setInRecovery(false); @@ -744,33 +736,13 @@ public abstract class BasicMessageConsumer extends Closeable implements Me } else { - _receivedDeliveryTags.add(msg.getDeliveryTag()); + _session.addDeliveredMessage(msg.getDeliveryTag()); } break; } } - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - */ - void acknowledgeLastDelivered() - { - if (!_receivedDeliveryTags.isEmpty()) - { - long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - _session.acknowledgeMessage(lastDeliveryTag, true); - } - } - void notifyError(Throwable cause) { // synchronized (_closed) @@ -841,32 +813,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Me } } - public void acknowledge() // throws JMSException - { - if (!isClosed()) - { - - Iterator tags = _unacknowledgedDeliveryTags.iterator(); - while (tags.hasNext()) - { - _session.acknowledgeMessage(tags.next(), false); - tags.remove(); - } - } - else - { - throw new IllegalStateException("Consumer is closed"); - } - } - - /** - * Called on recovery to reset the list of delivery tags - */ - public void clearUnackedMessages() - { - _unacknowledgedDeliveryTags.clear(); - } - public boolean isAutoClose() { return _autoClose; @@ -890,15 +836,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Me public void rollback() { - clearUnackedMessages(); - - if (!_receivedDeliveryTags.isEmpty()) - { - _logger.debug("Rejecting received messages in _receivedDTs (RQ)"); - } - - rollbackReceivedMessages(); - // rollback pending messages if (_synchronousQueue.size() > 0) { @@ -944,39 +881,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Me } } - protected void rollbackReceivedMessages() - { - // rollback received but not committed messages - while (!_receivedDeliveryTags.isEmpty()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting the messages(" + _receivedDeliveryTags - .size() + ") in _receivedDTs (RQ)" + "for consumer with tag:" + _consumerTag); - } - - Long tag = _receivedDeliveryTags.poll(); - - if (tag != null) - { - if (_logger.isTraceEnabled()) - { - _logger.trace("Rejecting tag from _receivedDTs:" + tag); - } - - _session.rejectMessage(tag, true); - } - } - - if (!_receivedDeliveryTags.isEmpty()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Queue _receivedDTs (RQ) was not empty after rejection"); - } - } - } - public String debugIdentity() { return String.valueOf(_consumerTag); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index c40ec1e5cb..d7cce986aa 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -144,7 +144,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer:///[]/[]?