diff options
2 files changed, 42 insertions, 1 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java index f2ab154b32..d4d8c9aaef 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java @@ -131,7 +131,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR } else { - channel.deadLetter(body.getDeliveryTag()); + channel.requeue(deliveryTag); } } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 610628a02d..14914664d6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.test.unit.transacted; +import org.apache.qpid.client.RejectBehaviour; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -638,4 +639,44 @@ public class CommitRollbackTest extends QpidBrokerTestCase } } } + + + public void testResendUnseenMessagesAfterRollback() throws Exception + { + resendAfterRollback(); + } + + public void testResendUnseenMessagesAfterRollbackWithServerReject() throws Exception + { + setTestSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.SERVER.toString()); + resendAfterRollback(); + } + + private void resendAfterRollback() throws Exception + { + newConnection(); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "message text"; + + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + assertNotNull("two messages were sent, but none has been received", _consumer.receive(1000)); + + _session.rollback(); + + _logger.info("receiving result"); + + assertNotNull("two messages were sent, but none has been received", _consumer.receive(1000)); + assertNotNull("two messages were sent, but only one has been received", _consumer.receive(1000)); + assertNull("Only two messages were sent, but more have been received", _consumer.receive(100)); + + _session.commit(); + } } |
