From 8aabbfd6c4c70df5ace15b54dda8475662029e93 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 20 Oct 2012 13:43:04 +0000 Subject: QPID-4383 : Fix receipt of large messages, client ack git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1400447 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/amqp_1_0/transport/Delivery.java | 4 ++++ .../amqp_1_0/transport/ReceivingLinkEndpoint.java | 22 +++++++++++++--------- 2 files changed, 17 insertions(+), 9 deletions(-) (limited to 'java') diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java index 4135199045..aca781afb9 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java @@ -71,6 +71,10 @@ public class Delivery { setComplete(true); } + if(Boolean.TRUE.equals(transfer.getSettled())) + { + setSettled(true); + } } public List getTransfers() diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java index cf86fc2471..f8c2f115c5 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java @@ -113,33 +113,37 @@ public class ReceivingLinkEndpoint extends LinkEndpoint synchronized (getLock()) { TransientState transientState; - boolean existingState = _unsettledMap.containsKey(transfer.getDeliveryTag()); - _unsettledMap.put(transfer.getDeliveryTag(), transfer.getState()); + final Binary deliveryTag = delivery.getDeliveryTag(); + boolean existingState = _unsettledMap.containsKey(deliveryTag); + if(!existingState || transfer.getState() != null) + { + _unsettledMap.put(deliveryTag, transfer.getState()); + } if(!existingState) { transientState = new TransientState(transfer.getDeliveryId()); - if(Boolean.TRUE.equals(transfer.getSettled())) + if(delivery.isSettled()) { transientState.setSettled(true); } - _unsettledIds.put(transfer.getDeliveryTag(), transientState); + _unsettledIds.put(deliveryTag, transientState); setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE)); setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE)); } else { - transientState = _unsettledIds.get(transfer.getDeliveryTag()); + transientState = _unsettledIds.get(deliveryTag); transientState.incrementCredit(); - if(Boolean.TRUE.equals(transfer.getSettled())) + if(delivery.isSettled()) { transientState.setSettled(true); } } - if(transientState.isSettled()) + if(transientState.isSettled() && delivery.isComplete()) { - _unsettledMap.remove(transfer.getDeliveryTag()); + _unsettledMap.remove(deliveryTag); } getLinkEventListener().messageTransfer(transfer); @@ -371,7 +375,7 @@ public class ReceivingLinkEndpoint extends LinkEndpoint tag = iter.next(); tagsToUpdate.add(tag); - deliveryId = _unsettledIds.get(firstTag).getDeliveryId(); + deliveryId = _unsettledIds.get(tag).getDeliveryId(); if(deliveryId.equals(last.add(UnsignedInteger.ONE))) { -- cgit v1.2.1