diff options
Diffstat (limited to 'java/client')
10 files changed, 152 insertions, 197 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java index 02386e84eb..b6badff24d 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java @@ -3,7 +3,7 @@ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at @@ -252,4 +252,3 @@ public class Client implements MessageListener new Client(); } } - diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 38325a1e41..39b3b80e74 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1303,4 +1303,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion); } + public boolean isFailingOver() + { + return (_protocolHandler.getFailoverLatch() != null); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 15c113a05d..42f07f97f9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -83,6 +83,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -219,6 +220,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + /** + * Contains a list of consumers which have been removed but which might still have + * messages to acknowledge, eg in client ack or transacted modes + */ + private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>(); + /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = new ConcurrentHashMap<Destination, AtomicInteger>(); @@ -387,7 +394,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @throws IllegalStateException If the session is closed. */ - public void acknowledge() throws IllegalStateException + public void acknowledge() throws JMSException { if (isClosed()) { @@ -611,20 +618,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) { -// i.next().acknowledgeLastDelivered(); -// } - - // get next acknowledgement to server - Long next = i.next().getLastDelivered(); - if (next != null && next > lastTag) - { - lastTag = next; - } + i.next().acknowledgeDelivered(); } - if (lastTag != -1) + if (_transacted) { - acknowledgeMessage(lastTag, true); + // Do the above, but for consumers which have been de-registered since the + // last commit + for (int i = 0; i < _removedConsumers.size(); i++) + { + // Sends acknowledgement to server + _removedConsumers.get(i).acknowledgeDelivered(); + _removedConsumers.remove(i); + } } // Commits outstanding messages sent and outstanding acknowledgements. @@ -760,7 +766,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false); + return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false); } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, @@ -776,7 +782,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false); + return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false); } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, @@ -1676,6 +1682,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _destinationConsumerCount.remove(dest); } } + + // Consumers that are closed in a transaction must be stored + // so that messages they have received can be acknowledged on commit + if (_transacted) + { + _removedConsumers.add(consumer); + } } } @@ -2445,6 +2458,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator it = consumers.iterator(); it.hasNext();) { BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); + consumer.failedOver(); registerConsumer(consumer, true); } } @@ -2543,17 +2557,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _messageDeliveryLock; } - /** - * Signifies that the session has pending sends to commit. - */ + /** Signifies that the session has pending sends to commit. */ public void markDirty() { _dirty = true; } - /** - * Signifies that the session has no pending sends to commit. - */ + /** Signifies that the session has no pending sends to commit. */ public void markClean() { _dirty = false; @@ -2562,6 +2572,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Check to see if failover has occured since the last call to markClean(commit or rollback). + * * @return boolean true if failover has occured. */ public boolean hasFailedOver() @@ -2571,6 +2582,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Check to see if any message have been sent in this transaction and have not been commited. + * * @return boolean true if a message has been sent but not commited */ public boolean isDirty() @@ -2624,7 +2636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // Reject messages on pre-receive queue - consumer.rollback(); + consumer.rollbackPendingMessages(); // Reject messages on pre-dispatch queue rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); @@ -2668,6 +2680,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } + for (int i = 0; i < _removedConsumers.size(); i++) + { + // Sends acknowledgement to server + _removedConsumers.get(i).rollback(); + _removedConsumers.remove(i); + } + setConnectionStopped(isStopped); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index ae31f5ebdd..610e0109b1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -33,14 +33,12 @@ import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; - import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -255,6 +253,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { + case Session.DUPS_OK_ACKNOWLEDGE: + _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag()); + _receivedDeliveryTags.add(msg.getDeliveryTag()); + break; case Session.CLIENT_ACKNOWLEDGE: _unacknowledgedDeliveryTags.add(msg.getDeliveryTag()); @@ -277,8 +279,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _session.setInRecovery(false); } - private void acquireReceiving() throws JMSException + /** + * @param immediate if true then return immediately if the connection is failing over + * + * @return boolean if the acquisition was successful + * + * @throws JMSException + * @throws InterruptedException + */ + private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException { + if (_connection.isFailingOver()) + { + if (immediate) + { + return false; + } + else + { + _connection.blockUntilNotFailingOver(); + } + } + if (!_receiving.compareAndSet(false, true)) { throw new javax.jms.IllegalStateException("Another thread is already receiving."); @@ -290,6 +312,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } _receivingThread = Thread.currentThread(); + return true; } private void releaseReceiving() @@ -343,7 +366,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer checkPreConditions(); - acquireReceiving(); + try + { + acquireReceiving(false); + } + catch (InterruptedException e) + { + _logger.warn("Interrupted: " + e); + if (isClosed()) + { + return null; + } + } _session.startDistpatcherIfNecessary(); @@ -424,7 +458,25 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { checkPreConditions(); - acquireReceiving(); + try + { + if (!acquireReceiving(true)) + { + //If we couldn't acquire the receiving thread then return null. + // This will occur if failing over. + return null; + } + } + catch (InterruptedException e) + { + /* + * This seems slightly shoddy but should never actually be executed + * since we told acquireReceiving to return immediately and it shouldn't + * block on anything. + */ + + return null; + } _session.startDistpatcherIfNecessary(); @@ -721,12 +773,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer break; case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetchHigh) + /*( if (++_outstanding >= _prefetchHigh) { _dups_ok_acknowledge_send = true; } - if (_outstanding <= _prefetchLow) + //Can't use <= as _prefetchHigh may equal _prefetchLow so no acking would occur. + if (_outstanding < _prefetchLow) { _dups_ok_acknowledge_send = false; } @@ -736,11 +789,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (!_session.isInRecovery()) { _session.acknowledgeMessage(msg.getDeliveryTag(), true); + _outstanding = 0; } } break; - + */ case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() if (!_session.isInRecovery()) @@ -777,20 +831,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } /** Acknowledge up to last message delivered (if any). Used when commiting. */ - void acknowledgeLastDelivered() + void acknowledgeDelivered() { - if (!_receivedDeliveryTags.isEmpty()) + while (!_receivedDeliveryTags.isEmpty()) { - long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - _session.acknowledgeMessage(lastDeliveryTag, true); + _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false); } } @@ -866,11 +911,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public void acknowledge() // throws JMSException + public void acknowledge() throws JMSException { - if (!isClosed()) + if (isClosed()) + { + throw new IllegalStateException("Consumer is closed"); + } + else if (_session.hasFailedOver()) + { + throw new JMSException("has failed over"); + } + else { - Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator(); while (tags.hasNext()) { @@ -878,10 +930,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer tags.remove(); } } - else - { - throw new IllegalStateException("Consumer is closed"); - } } /** Called on recovery to reset the list of delivery tags */ @@ -951,7 +999,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - // rollback pending messages + rollbackPendingMessages(); + } + + public void rollbackPendingMessages() + { if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) @@ -1016,4 +1068,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _synchronousQueue.clear(); } + + /** to be called when a failover has occured */ + public void failedOver() + { + clearReceiveQueue(); + clearUnackedMessages(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 1badbb601c..2b63475d71 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -294,18 +294,4 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener } } - public boolean equals(Object o) - { - - if (o instanceof BlockingMethodFrameListener) - { - BlockingMethodFrameListener other = (BlockingMethodFrameListener) o; - - return _channelId == other._channelId; - } - - return false; - } - - } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java index 4a4f4a0a38..c66603b7a0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java @@ -41,17 +41,4 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener return _expectedClass.isInstance(frame); } - public boolean equals(Object o) - { - if (o instanceof SpecificMethodFrameListener) - { - SpecificMethodFrameListener other = (SpecificMethodFrameListener) o; - - // here we need to check if the two classes are the same. - return (_channelId == other._channelId) && (_expectedClass.equals(other._expectedClass)); - } - - return false; - } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 3257caa796..e8a220f5e9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; /** @@ -101,27 +100,21 @@ public class TransportConnection _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") ? "Qpid NIO is new default" : "Sysproperty 'qpidnio' is set")); - - result = new MultiThreadSocketConnector(); } else { _logger.info("Using Mina NIO"); - result = new SocketConnector(); // non-blocking connector } - // Don't have the connector's worker thread wait around for other connections (we only use // one SocketConnector per connection at the moment anyway). This allows short-running // clients (like unit tests) to complete quickly. result.setWorkerTimeout(0); - return result; } }); break; - case VM: { _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); @@ -280,8 +273,7 @@ public class TransportConnection } AMQVMBrokerCreationException amqbce = - new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); - amqbce.initCause(e); + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e); throw amqbce; } @@ -294,14 +286,11 @@ public class TransportConnection _acceptor.unbindAll(); synchronized (_inVmPipeAddress) { - Iterator keys = _inVmPipeAddress.keySet().iterator(); - - while (keys.hasNext()) - { - int id = (Integer) keys.next(); - _inVmPipeAddress.remove(id); - } - } + _inVmPipeAddress.clear(); + } + _acceptor = null; + _currentInstance = -1; + _currentVMPort = -1; } public static void killVMBroker(int port) diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index 25a9e26285..dca6efba67 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -22,15 +22,12 @@ package org.apache.qpid.client.transport; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; - import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.PoolingFilter; -import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.pool.ReadWriteThreadModel; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +46,7 @@ public class VmPipeTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { - final VmPipeConnector ioConnector = new VmPipeConnector(); + final VmPipeConnector ioConnector = new QpidVmPipeConnector(); final IoServiceConfig cfg = ioConnector.getDefaultConfig(); cfg.setThreadModel(ReadWriteThreadModel.getInstance()); diff --git a/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java deleted file mode 100644 index 69684a81ea..0000000000 --- a/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.apache.qpid.framing; - -import junit.framework.TestCase; -import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; - -import org.apache.mina.common.ByteBuffer; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ - -public class SpecificMethodFrameListenerTest extends TestCase -{ - - SpecificMethodFrameListener close1a = new SpecificMethodFrameListener(1, ChannelCloseOkBody.class); - SpecificMethodFrameListener close1b = new SpecificMethodFrameListener(1, ChannelCloseOkBody.class); - SpecificMethodFrameListener close2 = new SpecificMethodFrameListener(2, ChannelCloseOkBody.class); - SpecificMethodFrameListener open1a = new SpecificMethodFrameListener(1, ChannelOpenOkBody.class); - SpecificMethodFrameListener open1b = new SpecificMethodFrameListener(1, ChannelOpenOkBody.class); - - public void testEquals() - { - //Check that the the same objects are equal - assertEquals("ChannelCloseOKBody a should equal a", close1a, close1a); - assertEquals("ChannelOpenOkBody a should equal a", open1a, open1a); - - //check that the same values in differnt objects are equal - assertEquals("ChannelCloseOKBody b should equal a", close1b, close1a); - assertEquals("ChannelCloseOKBody a should equal b", close1a, close1b); - assertEquals("ChannelOpenOkBody a should equal b", open1a, open1b); - assertEquals("ChannelOpenOkBody a should equal b", open1a, open1b); - - //Chec that different values fail - //Different channels - assertFalse("ChannelCloseOKBody channel 1 should NOT equal channel 2", close1a.equals(close2)); - assertFalse("ChannelCloseOKBody channel 1 should NOT equal channel 2", close2.equals(close1a)); - - //Different Bodies - assertFalse("ChannelCloseOKBody should not equal ChannelOpenOkBody", close1a.equals(open1a)); - assertFalse("ChannelOpenOkBody should not equal ChannelCloseOKBody", open1a.equals(close1a)); - } - - public void testProcessMethod() throws AMQFrameDecodingException - { - ChannelCloseOkBody ccob = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9).createChannelCloseOkBody(); - ChannelOpenOkBody coob = ((MethodRegistry_0_9)(MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9))).createChannelOpenOkBody(new byte[0]); - - assertTrue("This SpecificMethodFrameListener should process a ChannelCloseOkBody", close1a.processMethod(1, ccob)); - assertFalse("This SpecificMethodFrameListener should NOT process a ChannelOpenOkBody", close1a.processMethod(1, coob)); - - - - - } -} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 224463a446..e45312448c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -418,14 +418,14 @@ public class CommitRollbackTest extends TestCase { _logger.info("Got 2 redelivered, message was prefetched"); _gottwoRedelivered = true; - + } else { - _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured"); + _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured"); assertFalse("Already received message two", _gottwo); assertFalse("Already received message redelivered two", _gottwoRedelivered); - + _gottwo = true; } } @@ -437,7 +437,7 @@ public class CommitRollbackTest extends TestCase * This test sends two messages receives on of them but doesn't ack it. * The consumer is then closed * the first message should be returned as redelivered. - * the second message should be delivered normally. + * the second message should be delivered normally. * @throws Exception */ public void testSend2ThenCloseAfter1andTryAgain() throws Exception @@ -458,6 +458,7 @@ public class CommitRollbackTest extends TestCase assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); _logger.info("Closing Consumer"); + _consumer.close(); _logger.info("Creating New consumer"); @@ -465,33 +466,20 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); -// NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected. -// Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet. - result = _consumer.receive(5000); - assertNotNull("test message was consumed and rolled back, but is gone", result); -// The first message back will be either 1 or 2 being redelivered - if (result.getJMSRedelivered()) - { - assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); - } - else // or it will be msg 2 arriving the first time due to latency. - { - _logger.info("Message 2 wasn't prefetched so wasn't rejected"); - assertEquals("2", ((TextMessage) result).getText()); - } + // Message 2 may be marked as redelivered if it was prefetched. + result = _consumer.receive(5000); + assertNotNull("Second message was not consumed, but is gone", result); - Message result2 = _consumer.receive(5000); - assertNotNull("test message was consumed and rolled back, but is gone", result2); + // The first message back will be 2, message 1 has been received but not committed + // Closing the consumer does not commit the session. // if this is message 1 then it should be marked as redelivered - if("1".equals(((TextMessage) result2).getText())) + if("1".equals(((TextMessage) result).getText())) { - assertTrue("Messasge is not marked as redelivered" + result2, result2.getJMSRedelivered()); + fail("First message was recieved again"); } - assertNotSame("Messages should not have the same content",((TextMessage) result2).getText(), ((TextMessage) result).getText() ); - result = _consumer.receive(1000); assertNull("test message should be null:" + result, result); |
