From 2c10159e28ff85e52840d5c6964123e4c410458d Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 2 Jan 2012 10:01:21 +0000 Subject: QPID-3713 : Implement producer side flow control for 0-10 in Java Broker git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1226382 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/logging/subjects/ChannelLogSubject.java | 30 ++++++++- .../qpid/server/protocol/AMQSessionModel.java | 6 ++ .../org/apache/qpid/server/queue/AMQQueue.java | 2 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 13 ++-- .../qpid/server/transport/ServerSession.java | 77 ++++++++++++++++++++-- .../org/apache/qpid/server/queue/MockAMQQueue.java | 3 +- .../main/java/org/apache/qpid/transport/Range.java | 8 ++- .../java/org/apache/qpid/transport/Session.java | 22 +++++-- .../qpid/server/queue/ProducerFlowControlTest.java | 23 +++++-- java/test-profiles/Java010Excludes | 3 - 10 files changed, 157 insertions(+), 30 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java index 9b357403a8..8266c1e79f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java @@ -22,6 +22,9 @@ package org.apache.qpid.server.logging.subjects; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.transport.ServerConnection; +import org.apache.qpid.server.transport.ServerSession; + import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; public class ChannelLogSubject extends AbstractLogSubject @@ -52,5 +55,30 @@ public class ChannelLogSubject extends AbstractLogSubject session.getVirtualHost().getName(), channel.getChannelId()); } - + + public ChannelLogSubject(ServerSession session) + { + /** + * LOG FORMAT used by the AMQPConnectorActor follows + * ChannelLogSubject.CHANNEL_FORMAT : + * con:{0}({1}@{2}/{3})/ch:{4} + * + * Uses a MessageFormat call to insert the required values according to + * these indices: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + * 4 - Channel ID + */ + ServerConnection connection = (ServerConnection) session.getConnection(); + setLogStringWithFormat(CHANNEL_FORMAT, + connection == null ? -1L : connection.getConnectionId(), + session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(), + (connection == null || connection.getConfig() == null) ? "?" : connection.getConfig().getAddress(), + session.getVirtualHost().getName(), + session.getChannel()); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index bc63403a86..c55fe321fc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.SimpleAMQQueue; public interface AMQSessionModel { @@ -51,4 +53,8 @@ public interface AMQSessionModel * @param idleClose time in milliseconds before closing connection with idle transaction */ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException; + + void block(AMQQueue queue); + + void unblock(AMQQueue queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 6dfdc5e8b4..a1f1c037ec 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -217,7 +217,7 @@ public interface AMQQueue extends Managable, Comparable, ExchangeRefer Map getArguments(); - void checkCapacity(AMQChannel channel); + void checkCapacity(AMQSessionModel channel); /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 25fc91b998..ebed781a1a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -164,7 +164,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private AtomicInteger _deliveredMessages = new AtomicInteger(); private AtomicBoolean _stopped = new AtomicBoolean(false); - private final ConcurrentMap _blockedChannels = new ConcurrentHashMap(); + private final ConcurrentMap _blockedChannels = new ConcurrentHashMap(); private final AtomicBoolean _deleted = new AtomicBoolean(false); private final List _deleteTaskList = new CopyOnWriteArrayList(); @@ -1528,7 +1528,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - public void checkCapacity(AMQChannel channel) + public void checkCapacity(AMQSessionModel channel) { if(_capacity != 0l) { @@ -1538,10 +1538,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //Overfull log message _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity)); - if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null) - { - channel.block(this); - } + _blockedChannels.putIfAbsent(channel, Boolean.TRUE); + + channel.block(this); if(_atomicQueueSize.get() <= _flowResumeCapacity) { @@ -1573,7 +1572,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - for(AMQChannel c : _blockedChannels.keySet()) + for(AMQSessionModel c : _blockedChannels.keySet()) { c.unblock(this); _blockedChannels.remove(c); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 8e6d33d3bc..7526b19058 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -25,7 +25,6 @@ import static org.apache.qpid.util.Serial.gt; import java.security.Principal; import java.text.MessageFormat; -import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -33,8 +32,11 @@ import java.util.Map; import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.AMQException; @@ -45,11 +47,13 @@ import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.configuration.SessionConfigType; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -66,6 +70,11 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlow; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageSetFlowMode; +import org.apache.qpid.transport.MessageStop; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.Range; @@ -81,6 +90,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); + private static final int HALF_INCOMING_CREDIT_THRESHOLD = 1 << 30; private final UUID _id; private ConnectionConfig _connectionConfig; @@ -88,6 +98,16 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private LogActor _actor = GenericActor.getInstance(this); private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction(); + private final ConcurrentMap _blockingQueues = new ConcurrentHashMap(); + + private final ConcurrentMap _blockingExchanges = new ConcurrentHashMap(); + + + private final AtomicBoolean _blocking = new AtomicBoolean(false); + private ChannelLogSubject _logSubject; + private final AtomicInteger _oustandingCredit = new AtomicInteger(Integer.MAX_VALUE); + + public static interface MessageDispositionChangeListener { public void onAccept(); @@ -132,7 +152,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi super(connection, delegate, name, expiry); _connectionConfig = connConfig; _transaction = new AutoCommitTransaction(this.getMessageStore()); - + _logSubject = new ChannelLogSubject(this); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); } @@ -161,6 +181,10 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void enqueue(final ServerMessage message, final List queues) { + if(_oustandingCredit.decrementAndGet() < HALF_INCOMING_CREDIT_THRESHOLD) + { + invoke(new MessageFlow("",MessageCreditUnit.MESSAGE,HALF_INCOMING_CREDIT_THRESHOLD)); + } getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); PostEnqueueAction postTransactionAction; if(isTransactional()) @@ -661,6 +685,43 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } + public void block(AMQQueue queue) + { + if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + { + + if(_blocking.compareAndSet(false,true)) + { + invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT)); + invoke(new MessageStop("")); + _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString())); + } + + + } + } + + public void unblock(AMQQueue queue) + { + if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty()) + { + if(_blocking.compareAndSet(true,false)) + { + + _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _oustandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); + + + } + } + } + + public String toLogString() { return "[" + @@ -701,7 +762,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } - private static class PostEnqueueAction implements ServerTransaction.Action + private class PostEnqueueAction implements ServerTransaction.Action { private List _queues; @@ -732,7 +793,13 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { try { - _queues.get(i).enqueue(_message, _transactional, null); + BaseQueue queue = _queues.get(i); + queue.enqueue(_message, _transactional, null); + if(queue instanceof AMQQueue) + { + ((AMQQueue)queue).checkCapacity(ServerSession.this); + } + } catch (AMQException e) { @@ -756,6 +823,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public boolean getBlocking() { - return false; //TODO: Blocking not implemented on 0-10 yet. + return _blocking.get(); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index f43af447ff..8b029f9a51 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -30,7 +30,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.txn.ServerTransaction; @@ -477,7 +476,7 @@ public class MockAMQQueue implements AMQQueue return null; } - public void checkCapacity(AMQChannel channel) + public void checkCapacity(AMQSessionModel channel) { } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Range.java b/java/common/src/main/java/org/apache/qpid/transport/Range.java index f976337788..c47171dc4b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Range.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Range.java @@ -185,6 +185,12 @@ public abstract class Range implements RangeSet } } + public String toString() + { + return "[" + point + ", " + point + "]"; + } + + } private static class RangeImpl extends Range @@ -283,7 +289,7 @@ public abstract class Range implements RangeSet return range; } - @Override + public void remove() { throw new UnsupportedOperationException(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 3e823ba6fe..d391181217 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -61,7 +61,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class Session extends SessionInvoker { private static final Logger log = Logger.get(Session.class); - + public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED } static class DefaultSessionListener implements SessionListener @@ -96,6 +96,9 @@ public class Session extends SessionInvoker private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT, Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT, ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT)); + private final long blockedSendTimeout = Long.getLong("qpid.flow_control_wait_failure", timeout); + private long blockedSendReportingPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + private boolean autoSync = false; private boolean incomingInit; @@ -228,10 +231,21 @@ public class Session extends SessionInvoker { try { - if (!credit.tryAcquire(timeout, TimeUnit.MILLISECONDS)) + long wait = blockedSendTimeout > blockedSendReportingPeriod ? blockedSendReportingPeriod : + blockedSendTimeout; + long totalWait = 1L; + while(totalWait <= blockedSendTimeout && !credit.tryAcquire(wait, TimeUnit.MILLISECONDS)) + { + totalWait+=wait; + log.warn("Message send delayed by " + (totalWait)/1000 + "s due to broker enforced flow control"); + + + } + if(totalWait > blockedSendTimeout) { + log.error("Message send failed due to timeout waiting on broker enforced flow control"); throw new SessionException - ("timed out waiting for message credit"); + ("timed out waiting for message credit"); } } catch (InterruptedException e) @@ -815,7 +829,7 @@ public class Session extends SessionInvoker while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { checkFailoverRequired("Session sync was interrupted by failover."); - log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); + log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, Arrays.asList(commands)); w.await(); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index 775d2c3eb0..47f334adf6 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -154,8 +154,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging // try to send 5 messages (should block after 4) sendMessagesAsync(producer, producerSession, 5, 50L); - Thread.sleep(5000); - List results = waitAndFindMatches("QUE-1003"); + List results = waitAndFindMatches("QUE-1003", 7000); assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size()); @@ -199,11 +198,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging // try to send 5 messages (should block after 4) MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); - Thread.sleep(TIMEOUT); List results = waitAndFindMatches("Message send delayed by", TIMEOUT); assertTrue("No delay messages logged by client",results.size()!=0); - results = findMatches("Message send failed due to timeout waiting on broker enforced flow control"); - assertEquals("Incorrect number of send failure messages logged by client",1,results.size()); + + List failedMessages = waitAndFindMatches("Message send failed due to timeout waiting on broker enforced" + + " flow control", TIMEOUT); + assertEquals("Incorrect number of send failure messages logged by client (got " + results.size() + " delay " + + "messages)",1,failedMessages.size()); @@ -325,8 +326,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging // try to send 5 messages (should block after 4) - MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); + MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 100L); + Thread.sleep(10000); Exception e = sender.getException(); @@ -440,6 +442,15 @@ public class ProducerFlowControlTest extends AbstractTestLogging e.printStackTrace(); throw new RuntimeException(e); } + + try + { + Thread.sleep(sleepPeriod); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } } } diff --git a/java/test-profiles/Java010Excludes b/java/test-profiles/Java010Excludes index 59cb5066f1..0de4c6bae5 100755 --- a/java/test-profiles/Java010Excludes +++ b/java/test-profiles/Java010Excludes @@ -45,9 +45,6 @@ org.apache.qpid.server.logging.SubscriptionLoggingTest#testSubscriptionSuspend // 0-10 is not supported by the MethodRegistry org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#* -//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker) -org.apache.qpid.server.queue.ProducerFlowControlTest#* - //QPID-1864: rollback with subscriptions does not work in 0-10 yet org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage -- cgit v1.2.1