From 5cdfd4cb313381431ab66376758b10c662f5b374 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 7 Mar 2015 12:40:26 +0000 Subject: QPID-6437 : ensure session/link flow notifies occur git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1664839 13f79535-47bb-0310-9956-ffa450edef68 --- .../amqp_1_0/transport/SendingLinkEndpoint.java | 9 +++- .../qpid/amqp_1_0/transport/SessionEndpoint.java | 57 +++++++++------------- 2 files changed, 32 insertions(+), 34 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java index 5dba729b5b..b52e37a18d 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java @@ -166,7 +166,14 @@ public class SendingLinkEndpoint extends LinkEndpoint setLinkCredit(limit.subtract(getDeliveryCount())); } } - + getSession().getConnection().addPostLockAction(new Runnable() + { + @Override + public void run() + { + flowStateChanged(); + } + }); } @Override diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index 28b84681d1..af263d0f45 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -458,49 +458,40 @@ public class SessionEndpoint { synchronized (getLock()) { - synchronized (getLock()) - { - UnsignedInteger handle = flow.getHandle(); - final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle); + UnsignedInteger handle = flow.getHandle(); + final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle); - final UnsignedInteger nextOutgoingId = - flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId(); - int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue()); - _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue()); + final UnsignedInteger nextOutgoingId = + flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId(); + int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue()); + _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue()); - if (endpoint != null) - { - getConnection().addPostLockAction(new Runnable() - { - @Override - public void run() - { - endpoint.receiveFlow(flow); - } - }); - } - else + if (endpoint != null) + { + endpoint.receiveFlow(flow); + } + else + { + final Collection allLinkEndpoints = _remoteLinkEndpoints.values(); + getConnection().addPostLockAction(new Runnable() { - final Collection allLinkEndpoints = _remoteLinkEndpoints.values(); - getConnection().addPostLockAction(new Runnable() + @Override + public void run() { - @Override - public void run() - { - for(LinkEndpoint le : allLinkEndpoints) - { - le.flowStateChanged(); - } + for(LinkEndpoint le : allLinkEndpoints) + { + le.flowStateChanged(); } - }); - } - - getLock().notifyAll(); + } + }); } + + getLock().notifyAll(); } + } public void receiveDisposition(final Disposition disposition) -- cgit v1.2.1