diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-01 18:09:10 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-01 18:09:10 +0000 |
| commit | 16a6c8873629972a7324509aac13bb950419176f (patch) | |
| tree | eb1b3c5e886b0adbf40005795ab1c9c414e46891 /java/broker/src | |
| parent | 054546a9f7dda8e6877550c46eb258c623e67de6 (diff) | |
| download | qpid-python-16a6c8873629972a7324509aac13bb950419176f.tar.gz | |
QPID-942 : Add Simplistic Producer Flow Control to the java Broker / java 0-8/0-9 client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@820739 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
12 files changed, 331 insertions, 22 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 180f0a992c..ea48bd7cc3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -27,13 +27,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.*; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; @@ -42,12 +41,8 @@ import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.UnauthorizedAccessException; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.queue.*; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.ClientDeliveryMethod; @@ -59,6 +54,7 @@ import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.logging.actors.AMQPChannelActor; @@ -119,6 +115,11 @@ public class AMQChannel private final AMQProtocolSession _session; private boolean _closing; + private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); + + private final AtomicBoolean _blocking = new AtomicBoolean(false); + + private LogActor _actor; private LogSubject _logSubject; @@ -798,6 +799,7 @@ public class AMQChannel _actor.message(_logSubject, ChannelMessages.CHN_1002("Started")); } + // This section takes two different approaches to perform to perform // the same function. Ensuring that the Subscription has taken note // of the change in Channel State @@ -978,4 +980,37 @@ public class AMQChannel { return _actor; } + + public void block(AMQQueue queue) + { + if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + { + + if(_blocking.compareAndSet(false,true)) + { + _actor.message(_logSubject, ChannelMessages.CHN_1005(queue.getName().toString())); + flow(false); + } + } + } + + public void unblock(AMQQueue queue) + { + if(_blockingQueues.remove(queue)) + { + if(_blocking.compareAndSet(true,false)) + { + _actor.message(_logSubject, ChannelMessages.CHN_1006()); + + flow(true); + } + } + } + + private void flow(boolean flow) + { + MethodRegistry methodRegistry = _session.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); + _session.writeFrame(responseBody.generateFrame(_channelId)); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 74bb7ee969..5c73e353de 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -108,4 +108,14 @@ public class QueueConfiguration return _config.getLong("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap()); } + public long getCapacity() + { + return _config.getLong("capacity", _vHostConfig.getCapacity()); + } + + public long getFlowResumeCapacity() + { + return _config.getLong("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity()); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index ed9d8acc08..01befbbfe8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -103,6 +103,8 @@ public class ServerConfiguration implements SignalHandler envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth"); envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize"); envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap"); + envVarMap.put("QPID_QUEUECAPACITY", "capacity"); + envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity"); envVarMap.put("QPID_SOCKETRECEIVEBUFFER", "connector.socketReceiveBuffer"); envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer"); envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay"); @@ -289,7 +291,6 @@ public class ServerConfiguration implements SignalHandler return conf; } - @Override public void handle(Signal arg0) { try @@ -507,6 +508,16 @@ public class ServerConfiguration implements SignalHandler return getConfig().getLong("minimumAlertRepeatGap", 0); } + public long getCapacity() + { + return getConfig().getLong("capacity", 0L); + } + + public long getFlowResumeCapacity() + { + return getConfig().getLong("flowResumeCapacity", getCapacity()); + } + public int getProcessors() { return getConfig().getInt("connector.processors", 4); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 0273a13262..6c72025ec2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -166,4 +166,15 @@ public class VirtualHostConfiguration return _config.getLong("queues.minimumAlertRepeatGap", 0); } + + public long getCapacity() + { + return _config.getLong("queues.capacity", 0l); + } + + public long getFlowResumeCapacity() + { + return _config.getLong("queues.flowResumeCapacity", getCapacity()); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties index 5ced7cc0b9..9169a1a651 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties @@ -249,12 +249,16 @@ CHN-1003 = Close # 0 - bytes allowed in prefetch # 1 - number of messagse. CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number} +CHN-1005 = Flow Control Enforced (Queue {0}) +CHN-1006 = Flow Control Removed #Queue # 0 - owner # 1 - priority QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}] QUE-1002 = Deleted +QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number} +QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number} #Exchange # 0 - type @@ -269,4 +273,4 @@ BND-1002 = Deleted #Subscription SUB-1001 = Create[ : Durable][ : Arguments : {0}] SUB-1002 = Close -SUB-1003 = State : {0}
\ No newline at end of file +SUB-1003 = State : {0} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 2a692344d0..184504717e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -160,6 +160,17 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void setMinimumAlertRepeatGap(long value); + long getCapacity(); + + void setCapacity(long capacity); + + + long getFlowResumeCapacity(); + + void setFlowResumeCapacity(long flowResumeCapacity); + + + void deleteMessageFromTop(StoreContext storeContext) throws AMQException; long clearQueue(StoreContext storeContext) throws AMQException; @@ -180,6 +191,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void stop(); + void checkCapacity(AMQChannel channel); + /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription * already exists. diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 7509350e65..267ccf43ea 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -26,11 +26,105 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.Map; +import java.util.HashMap; + public class AMQQueueFactory { public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); + private abstract static class QueueProperty + { + + private final AMQShortString _argumentName; + + + public QueueProperty(String argumentName) + { + _argumentName = new AMQShortString(argumentName); + } + + public AMQShortString getArgumentName() + { + return _argumentName; + } + + + public abstract void setPropertyValue(AMQQueue queue, Object value); + + } + + private abstract static class QueueLongProperty extends QueueProperty + { + + public QueueLongProperty(String argumentName) + { + super(argumentName); + } + + public void setPropertyValue(AMQQueue queue, Object value) + { + if(value instanceof Number) + { + setPropertyValue(queue, ((Number)value).longValue()); + } + + } + + abstract void setPropertyValue(AMQQueue queue, long value); + + + } + + private static final QueueProperty[] DECLAREABLE_PROPERTIES = { + new QueueLongProperty("x-qpid-maximum-message-age") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageAge(value); + } + }, + new QueueLongProperty("x-qpid-maximum-message-size") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageSize(value); + } + }, + new QueueLongProperty("x-qpid-maximum-message-count") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageCount(value); + } + }, + new QueueLongProperty("x-qpid-minimum-alert-repeat-gap") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMinimumAlertRepeatGap(value); + } + }, + new QueueLongProperty("x-qpid-capacity") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setCapacity(value); + } + }, + new QueueLongProperty("x-qpid-flow-resume-capacity") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setFlowResumeCapacity(value); + } + } + + }; + + + public static AMQQueue createAMQQueueImpl(AMQShortString name, boolean durable, AMQShortString owner, @@ -53,6 +147,18 @@ public class AMQQueueFactory //Register the new queue virtualHost.getQueueRegistry().registerQueue(q); q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString())); + + if(arguments != null) + { + for(QueueProperty p : DECLAREABLE_PROPERTIES) + { + if(arguments.containsKey(p.getArgumentName())) + { + p.setPropertyValue(q, arguments.get(p.getArgumentName())); + } + } + } + return q; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 01b249b847..3b58f05f93 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -42,8 +42,7 @@ public class QueueEntryImpl implements QueueEntry private final SimpleQueueEntryList _queueEntryList; - private AMQMessage _message; - + private final AMQMessage _message; private Set<Subscription> _rejectedBy = null; @@ -191,7 +190,7 @@ public class QueueEntryImpl implements QueueEntry public boolean immediateAndNotDelivered() { - return _message.immediateAndNotDelivered(); + return getMessage().immediateAndNotDelivered(); } public void setRedelivered(boolean b) @@ -393,4 +392,5 @@ public class QueueEntryImpl implements QueueEntry { return _queueEntryList; } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 4b9fc9ca55..8b6c15c0c3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1,11 +1,10 @@ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -35,6 +34,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.messages.QueueMessages; +import org.apache.qpid.server.AMQChannel; /* * @@ -96,6 +96,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final Executor _asyncDelivery; private final AtomicLong _totalMessagesReceived = new AtomicLong(); + private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>(); + /** max allowed size(KB) of a single message */ public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); @@ -122,6 +124,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private LogSubject _logSubject; private LogActor _logActor; + + private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); + private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); + protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { @@ -629,6 +635,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new FailedDequeueException(_name.toString(), e); } + checkCapacity(); + } private void decrementQueueSize(final QueueEntry entry) @@ -1173,6 +1181,58 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + public void checkCapacity(AMQChannel channel) + { + if(_capacity != 0l) + { + if(_atomicQueueSize.get() > _capacity) + { + //Overfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity)); + + if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null) + { + channel.block(this); + } + + if(_atomicQueueSize.get() <= _flowResumeCapacity) + { + + //Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + + channel.unblock(this); + _blockedChannels.remove(channel); + + } + + } + + + + } + } + + private void checkCapacity() + { + if(_capacity != 0L) + { + if(_atomicQueueSize.get() <= _flowResumeCapacity) + { + //Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + + + for(AMQChannel c : _blockedChannels.keySet()) + { + c.unblock(this); + _blockedChannels.remove(c); + } + } + } + } + + public void deliverAsync() { Runner runner = new Runner(_stateChangeCount.incrementAndGet()); @@ -1544,6 +1604,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + public void checkMessageStatus() throws AMQException { @@ -1651,6 +1712,27 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + public long getCapacity() + { + return _capacity; + } + + public void setCapacity(long capacity) + { + _capacity = capacity; + } + + public long getFlowResumeCapacity() + { + return _flowResumeCapacity; + } + + public void setFlowResumeCapacity(long flowResumeCapacity) + { + _flowResumeCapacity = flowResumeCapacity; + } + + public Set<NotificationCheck> getNotificationChecks() { return _notificationChecks; @@ -1720,6 +1802,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener setMaximumMessageSize(config.getMaximumMessageSize()); setMaximumMessageCount(config.getMaximumMessageCount()); setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); + _capacity = config.getCapacity(); + _flowResumeCapacity = config.getFlowResumeCapacity(); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 3c71282c57..450852cef7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -97,6 +97,7 @@ public class LocalTransactionalContext implements TransactionalContext try { QueueEntry entry = _queue.enqueue(getStoreContext(),_message); + _queue.checkCapacity(_channel); if(entry.immediateAndNotDelivered()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 28af36e3db..10d6021d27 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -91,6 +91,8 @@ public class NonTransactionalContext implements TransactionalContext public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException { QueueEntry entry = queue.enqueue(_storeContext, message); + queue.checkCapacity(_channel); + //following check implements the functionality //required by the 'immediate' flag: diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index b16a289f0a..0a5274ab88 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.AMQException; import org.apache.commons.configuration.Configuration; @@ -271,6 +272,15 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } + public boolean getBlockOnQueueFull() + { + return false; + } + + public void setBlockOnQueueFull(boolean block) + { + } + public long getMinimumAlertRepeatGap() { return 0; //To change body of implemented methods use File | Settings | File Templates. @@ -285,8 +295,7 @@ public class MockAMQQueue implements AMQQueue { return 0; //To change body of implemented methods use File | Settings | File Templates. } - - @Override + public void checkMessageStatus() throws AMQException { //To change body of implemented methods use File | Settings | File Templates. @@ -317,6 +326,10 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } + public void checkCapacity(AMQChannel channel) + { + } + public ManagedObject getManagedObject() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -327,12 +340,31 @@ public class MockAMQQueue implements AMQQueue return 0; //To change body of implemented methods use File | Settings | File Templates. } - @Override public void setMinimumAlertRepeatGap(long value) { } + public long getCapacity() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setCapacity(long capacity) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public long getFlowResumeCapacity() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setFlowResumeCapacity(long flowResumeCapacity) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public void configure(QueueConfiguration config) { |
