summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java61
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java115
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java7
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java73
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java36
10 files changed, 152 insertions, 197 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java
index 02386e84eb..b6badff24d 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java
@@ -3,7 +3,7 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+ copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -252,4 +252,3 @@ public class Client implements MessageListener
new Client();
}
}
-
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 38325a1e41..39b3b80e74 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1303,4 +1303,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
}
+ public boolean isFailingOver()
+ {
+ return (_protocolHandler.getFailoverLatch() != null);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 15c113a05d..42f07f97f9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -83,6 +83,7 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -219,6 +220,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private Map<AMQShortString, BasicMessageConsumer> _consumers =
new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+ /**
+ * Contains a list of consumers which have been removed but which might still have
+ * messages to acknowledge, eg in client ack or transacted modes
+ */
+ private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>();
+
/** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
new ConcurrentHashMap<Destination, AtomicInteger>();
@@ -387,7 +394,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @throws IllegalStateException If the session is closed.
*/
- public void acknowledge() throws IllegalStateException
+ public void acknowledge() throws JMSException
{
if (isClosed())
{
@@ -611,20 +618,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
{
-// i.next().acknowledgeLastDelivered();
-// }
-
- // get next acknowledgement to server
- Long next = i.next().getLastDelivered();
- if (next != null && next > lastTag)
- {
- lastTag = next;
- }
+ i.next().acknowledgeDelivered();
}
- if (lastTag != -1)
+ if (_transacted)
{
- acknowledgeMessage(lastTag, true);
+ // Do the above, but for consumers which have been de-registered since the
+ // last commit
+ for (int i = 0; i < _removedConsumers.size(); i++)
+ {
+ // Sends acknowledgement to server
+ _removedConsumers.get(i).acknowledgeDelivered();
+ _removedConsumers.remove(i);
+ }
}
// Commits outstanding messages sent and outstanding acknowledgements.
@@ -760,7 +766,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -776,7 +782,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -1676,6 +1682,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_destinationConsumerCount.remove(dest);
}
}
+
+ // Consumers that are closed in a transaction must be stored
+ // so that messages they have received can be acknowledged on commit
+ if (_transacted)
+ {
+ _removedConsumers.add(consumer);
+ }
}
}
@@ -2445,6 +2458,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (Iterator it = consumers.iterator(); it.hasNext();)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+ consumer.failedOver();
registerConsumer(consumer, true);
}
}
@@ -2543,17 +2557,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return _messageDeliveryLock;
}
- /**
- * Signifies that the session has pending sends to commit.
- */
+ /** Signifies that the session has pending sends to commit. */
public void markDirty()
{
_dirty = true;
}
- /**
- * Signifies that the session has no pending sends to commit.
- */
+ /** Signifies that the session has no pending sends to commit. */
public void markClean()
{
_dirty = false;
@@ -2562,6 +2572,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Check to see if failover has occured since the last call to markClean(commit or rollback).
+ *
* @return boolean true if failover has occured.
*/
public boolean hasFailedOver()
@@ -2571,6 +2582,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Check to see if any message have been sent in this transaction and have not been commited.
+ *
* @return boolean true if a message has been sent but not commited
*/
public boolean isDirty()
@@ -2624,7 +2636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// Reject messages on pre-receive queue
- consumer.rollback();
+ consumer.rollbackPendingMessages();
// Reject messages on pre-dispatch queue
rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
@@ -2668,6 +2680,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
+ for (int i = 0; i < _removedConsumers.size(); i++)
+ {
+ // Sends acknowledgement to server
+ _removedConsumers.get(i).rollback();
+ _removedConsumers.remove(i);
+ }
+
setConnectionStopped(isStopped);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index ae31f5ebdd..610e0109b1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -33,14 +33,12 @@ import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
-
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -255,6 +253,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag());
+ _receivedDeliveryTags.add(msg.getDeliveryTag());
+ break;
case Session.CLIENT_ACKNOWLEDGE:
_unacknowledgedDeliveryTags.add(msg.getDeliveryTag());
@@ -277,8 +279,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_session.setInRecovery(false);
}
- private void acquireReceiving() throws JMSException
+ /**
+ * @param immediate if true then return immediately if the connection is failing over
+ *
+ * @return boolean if the acquisition was successful
+ *
+ * @throws JMSException
+ * @throws InterruptedException
+ */
+ private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException
{
+ if (_connection.isFailingOver())
+ {
+ if (immediate)
+ {
+ return false;
+ }
+ else
+ {
+ _connection.blockUntilNotFailingOver();
+ }
+ }
+
if (!_receiving.compareAndSet(false, true))
{
throw new javax.jms.IllegalStateException("Another thread is already receiving.");
@@ -290,6 +312,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
_receivingThread = Thread.currentThread();
+ return true;
}
private void releaseReceiving()
@@ -343,7 +366,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
checkPreConditions();
- acquireReceiving();
+ try
+ {
+ acquireReceiving(false);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted: " + e);
+ if (isClosed())
+ {
+ return null;
+ }
+ }
_session.startDistpatcherIfNecessary();
@@ -424,7 +458,25 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
checkPreConditions();
- acquireReceiving();
+ try
+ {
+ if (!acquireReceiving(true))
+ {
+ //If we couldn't acquire the receiving thread then return null.
+ // This will occur if failing over.
+ return null;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ /*
+ * This seems slightly shoddy but should never actually be executed
+ * since we told acquireReceiving to return immediately and it shouldn't
+ * block on anything.
+ */
+
+ return null;
+ }
_session.startDistpatcherIfNecessary();
@@ -721,12 +773,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
break;
case Session.DUPS_OK_ACKNOWLEDGE:
- if (++_outstanding >= _prefetchHigh)
+ /*( if (++_outstanding >= _prefetchHigh)
{
_dups_ok_acknowledge_send = true;
}
- if (_outstanding <= _prefetchLow)
+ //Can't use <= as _prefetchHigh may equal _prefetchLow so no acking would occur.
+ if (_outstanding < _prefetchLow)
{
_dups_ok_acknowledge_send = false;
}
@@ -736,11 +789,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (!_session.isInRecovery())
{
_session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ _outstanding = 0;
}
}
break;
-
+ */
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
if (!_session.isInRecovery())
@@ -777,20 +831,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
/** Acknowledge up to last message delivered (if any). Used when commiting. */
- void acknowledgeLastDelivered()
+ void acknowledgeDelivered()
{
- if (!_receivedDeliveryTags.isEmpty())
+ while (!_receivedDeliveryTags.isEmpty())
{
- long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- _session.acknowledgeMessage(lastDeliveryTag, true);
+ _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false);
}
}
@@ -866,11 +911,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- public void acknowledge() // throws JMSException
+ public void acknowledge() throws JMSException
{
- if (!isClosed())
+ if (isClosed())
+ {
+ throw new IllegalStateException("Consumer is closed");
+ }
+ else if (_session.hasFailedOver())
+ {
+ throw new JMSException("has failed over");
+ }
+ else
{
-
Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator();
while (tags.hasNext())
{
@@ -878,10 +930,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
tags.remove();
}
}
- else
- {
- throw new IllegalStateException("Consumer is closed");
- }
}
/** Called on recovery to reset the list of delivery tags */
@@ -951,7 +999,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- // rollback pending messages
+ rollbackPendingMessages();
+ }
+
+ public void rollbackPendingMessages()
+ {
if (_synchronousQueue.size() > 0)
{
if (_logger.isDebugEnabled())
@@ -1016,4 +1068,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_synchronousQueue.clear();
}
+
+ /** to be called when a failover has occured */
+ public void failedOver()
+ {
+ clearReceiveQueue();
+ clearUnackedMessages();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 1badbb601c..2b63475d71 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -294,18 +294,4 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
}
}
- public boolean equals(Object o)
- {
-
- if (o instanceof BlockingMethodFrameListener)
- {
- BlockingMethodFrameListener other = (BlockingMethodFrameListener) o;
-
- return _channelId == other._channelId;
- }
-
- return false;
- }
-
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
index 4a4f4a0a38..c66603b7a0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
@@ -41,17 +41,4 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener
return _expectedClass.isInstance(frame);
}
- public boolean equals(Object o)
- {
- if (o instanceof SpecificMethodFrameListener)
- {
- SpecificMethodFrameListener other = (SpecificMethodFrameListener) o;
-
- // here we need to check if the two classes are the same.
- return (_channelId == other._channelId) && (_expectedClass.equals(other._expectedClass));
- }
-
- return false;
- }
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 3257caa796..e8a220f5e9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
/**
@@ -101,27 +100,21 @@ public class TransportConnection
_logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
? "Qpid NIO is new default"
: "Sysproperty 'qpidnio' is set"));
-
-
result = new MultiThreadSocketConnector();
}
else
{
_logger.info("Using Mina NIO");
-
result = new SocketConnector(); // non-blocking connector
}
-
// Don't have the connector's worker thread wait around for other connections (we only use
// one SocketConnector per connection at the moment anyway). This allows short-running
// clients (like unit tests) to complete quickly.
result.setWorkerTimeout(0);
-
return result;
}
});
break;
-
case VM:
{
_instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
@@ -280,8 +273,7 @@ public class TransportConnection
}
AMQVMBrokerCreationException amqbce =
- new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
- amqbce.initCause(e);
+ new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e);
throw amqbce;
}
@@ -294,14 +286,11 @@ public class TransportConnection
_acceptor.unbindAll();
synchronized (_inVmPipeAddress)
{
- Iterator keys = _inVmPipeAddress.keySet().iterator();
-
- while (keys.hasNext())
- {
- int id = (Integer) keys.next();
- _inVmPipeAddress.remove(id);
- }
- }
+ _inVmPipeAddress.clear();
+ }
+ _acceptor = null;
+ _currentInstance = -1;
+ _currentVMPort = -1;
}
public static void killVMBroker(int port)
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index 25a9e26285..dca6efba67 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -22,15 +22,12 @@ package org.apache.qpid.client.transport;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
-
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.pool.ReadWriteThreadModel;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +46,7 @@ public class VmPipeTransportConnection implements ITransportConnection
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
{
- final VmPipeConnector ioConnector = new VmPipeConnector();
+ final VmPipeConnector ioConnector = new QpidVmPipeConnector();
final IoServiceConfig cfg = ioConnector.getDefaultConfig();
cfg.setThreadModel(ReadWriteThreadModel.getInstance());
diff --git a/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java
deleted file mode 100644
index 69684a81ea..0000000000
--- a/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.qpid.framing;
-
-import junit.framework.TestCase;
-import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-
-import org.apache.mina.common.ByteBuffer;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-
-public class SpecificMethodFrameListenerTest extends TestCase
-{
-
- SpecificMethodFrameListener close1a = new SpecificMethodFrameListener(1, ChannelCloseOkBody.class);
- SpecificMethodFrameListener close1b = new SpecificMethodFrameListener(1, ChannelCloseOkBody.class);
- SpecificMethodFrameListener close2 = new SpecificMethodFrameListener(2, ChannelCloseOkBody.class);
- SpecificMethodFrameListener open1a = new SpecificMethodFrameListener(1, ChannelOpenOkBody.class);
- SpecificMethodFrameListener open1b = new SpecificMethodFrameListener(1, ChannelOpenOkBody.class);
-
- public void testEquals()
- {
- //Check that the the same objects are equal
- assertEquals("ChannelCloseOKBody a should equal a", close1a, close1a);
- assertEquals("ChannelOpenOkBody a should equal a", open1a, open1a);
-
- //check that the same values in differnt objects are equal
- assertEquals("ChannelCloseOKBody b should equal a", close1b, close1a);
- assertEquals("ChannelCloseOKBody a should equal b", close1a, close1b);
- assertEquals("ChannelOpenOkBody a should equal b", open1a, open1b);
- assertEquals("ChannelOpenOkBody a should equal b", open1a, open1b);
-
- //Chec that different values fail
- //Different channels
- assertFalse("ChannelCloseOKBody channel 1 should NOT equal channel 2", close1a.equals(close2));
- assertFalse("ChannelCloseOKBody channel 1 should NOT equal channel 2", close2.equals(close1a));
-
- //Different Bodies
- assertFalse("ChannelCloseOKBody should not equal ChannelOpenOkBody", close1a.equals(open1a));
- assertFalse("ChannelOpenOkBody should not equal ChannelCloseOKBody", open1a.equals(close1a));
- }
-
- public void testProcessMethod() throws AMQFrameDecodingException
- {
- ChannelCloseOkBody ccob = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9).createChannelCloseOkBody();
- ChannelOpenOkBody coob = ((MethodRegistry_0_9)(MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9))).createChannelOpenOkBody(new byte[0]);
-
- assertTrue("This SpecificMethodFrameListener should process a ChannelCloseOkBody", close1a.processMethod(1, ccob));
- assertFalse("This SpecificMethodFrameListener should NOT process a ChannelOpenOkBody", close1a.processMethod(1, coob));
-
-
-
-
- }
-}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 224463a446..e45312448c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -418,14 +418,14 @@ public class CommitRollbackTest extends TestCase
{
_logger.info("Got 2 redelivered, message was prefetched");
_gottwoRedelivered = true;
-
+
}
else
{
- _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured");
+ _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured");
assertFalse("Already received message two", _gottwo);
assertFalse("Already received message redelivered two", _gottwoRedelivered);
-
+
_gottwo = true;
}
}
@@ -437,7 +437,7 @@ public class CommitRollbackTest extends TestCase
* This test sends two messages receives on of them but doesn't ack it.
* The consumer is then closed
* the first message should be returned as redelivered.
- * the second message should be delivered normally.
+ * the second message should be delivered normally.
* @throws Exception
*/
public void testSend2ThenCloseAfter1andTryAgain() throws Exception
@@ -458,6 +458,7 @@ public class CommitRollbackTest extends TestCase
assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
_logger.info("Closing Consumer");
+
_consumer.close();
_logger.info("Creating New consumer");
@@ -465,33 +466,20 @@ public class CommitRollbackTest extends TestCase
_logger.info("receiving result");
-// NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected.
-// Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet.
- result = _consumer.receive(5000);
- assertNotNull("test message was consumed and rolled back, but is gone", result);
-// The first message back will be either 1 or 2 being redelivered
- if (result.getJMSRedelivered())
- {
- assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
- }
- else // or it will be msg 2 arriving the first time due to latency.
- {
- _logger.info("Message 2 wasn't prefetched so wasn't rejected");
- assertEquals("2", ((TextMessage) result).getText());
- }
+ // Message 2 may be marked as redelivered if it was prefetched.
+ result = _consumer.receive(5000);
+ assertNotNull("Second message was not consumed, but is gone", result);
- Message result2 = _consumer.receive(5000);
- assertNotNull("test message was consumed and rolled back, but is gone", result2);
+ // The first message back will be 2, message 1 has been received but not committed
+ // Closing the consumer does not commit the session.
// if this is message 1 then it should be marked as redelivered
- if("1".equals(((TextMessage) result2).getText()))
+ if("1".equals(((TextMessage) result).getText()))
{
- assertTrue("Messasge is not marked as redelivered" + result2, result2.getJMSRedelivered());
+ fail("First message was recieved again");
}
- assertNotSame("Messages should not have the same content",((TextMessage) result2).getText(), ((TextMessage) result).getText() );
-
result = _consumer.receive(1000);
assertNull("test message should be null:" + result, result);