From 0f81a00b5cb061780b1aac0d2ff216e2c16de0ec Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 11 Jan 2012 09:52:16 +0000 Subject: QPID-3717 - Fixes based on review by Robbie Gemmell git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1229943 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/transport/ServerSession.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 7a2c07b9c8..7d06dd2c22 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -47,7 +47,6 @@ import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.configuration.SessionConfigType; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -90,7 +89,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); - private static final int HALF_INCOMING_CREDIT_THRESHOLD = 1 << 30; + private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30; private final UUID _id; private ConnectionConfig _connectionConfig; @@ -100,12 +99,9 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private final ConcurrentMap _blockingQueues = new ConcurrentHashMap(); - private final ConcurrentMap _blockingExchanges = new ConcurrentHashMap(); - - private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; - private final AtomicInteger _oustandingCredit = new AtomicInteger(Integer.MAX_VALUE); + private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); public static interface MessageDispositionChangeListener @@ -181,9 +177,11 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void enqueue(final ServerMessage message, final List queues) { - if(_oustandingCredit.decrementAndGet() < HALF_INCOMING_CREDIT_THRESHOLD) + if(_outstandingCredit.get() != UNLIMITED_CREDIT + && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) { - invoke(new MessageFlow("",MessageCreditUnit.MESSAGE,HALF_INCOMING_CREDIT_THRESHOLD)); + _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); + invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); } getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); PostEnqueueAction postTransactionAction; @@ -712,7 +710,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi MessageFlow mf = new MessageFlow(); mf.setUnit(MessageCreditUnit.MESSAGE); mf.setDestination(""); - _oustandingCredit.set(Integer.MAX_VALUE); + _outstandingCredit.set(Integer.MAX_VALUE); mf.setValue(Integer.MAX_VALUE); invoke(mf); -- cgit v1.2.1