summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-01 18:09:10 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-01 18:09:10 +0000
commit16a6c8873629972a7324509aac13bb950419176f (patch)
treeeb1b3c5e886b0adbf40005795ab1c9c414e46891 /java/broker/src
parent054546a9f7dda8e6877550c46eb258c623e67de6 (diff)
downloadqpid-python-16a6c8873629972a7324509aac13bb950419176f.tar.gz
QPID-942 : Add Simplistic Producer Flow Control to the java Broker / java 0-8/0-9 client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@820739 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java55
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java106
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java92
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java38
12 files changed, 331 insertions, 22 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 180f0a992c..ea48bd7cc3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -27,13 +27,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
@@ -42,12 +41,8 @@ import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.UnauthorizedAccessException;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
@@ -59,6 +54,7 @@ import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
@@ -119,6 +115,11 @@ public class AMQChannel
private final AMQProtocolSession _session;
private boolean _closing;
+ private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+
+ private final AtomicBoolean _blocking = new AtomicBoolean(false);
+
+
private LogActor _actor;
private LogSubject _logSubject;
@@ -798,6 +799,7 @@ public class AMQChannel
_actor.message(_logSubject, ChannelMessages.CHN_1002("Started"));
}
+
// This section takes two different approaches to perform to perform
// the same function. Ensuring that the Subscription has taken note
// of the change in Channel State
@@ -978,4 +980,37 @@ public class AMQChannel
{
return _actor;
}
+
+ public void block(AMQQueue queue)
+ {
+ if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+ {
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1005(queue.getName().toString()));
+ flow(false);
+ }
+ }
+ }
+
+ public void unblock(AMQQueue queue)
+ {
+ if(_blockingQueues.remove(queue))
+ {
+ if(_blocking.compareAndSet(true,false))
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1006());
+
+ flow(true);
+ }
+ }
+ }
+
+ private void flow(boolean flow)
+ {
+ MethodRegistry methodRegistry = _session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
+ _session.writeFrame(responseBody.generateFrame(_channelId));
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index 74bb7ee969..5c73e353de 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -108,4 +108,14 @@ public class QueueConfiguration
return _config.getLong("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap());
}
+ public long getCapacity()
+ {
+ return _config.getLong("capacity", _vHostConfig.getCapacity());
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _config.getLong("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity());
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index ed9d8acc08..01befbbfe8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -103,6 +103,8 @@ public class ServerConfiguration implements SignalHandler
envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth");
envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize");
envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap");
+ envVarMap.put("QPID_QUEUECAPACITY", "capacity");
+ envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity");
envVarMap.put("QPID_SOCKETRECEIVEBUFFER", "connector.socketReceiveBuffer");
envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer");
envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay");
@@ -289,7 +291,6 @@ public class ServerConfiguration implements SignalHandler
return conf;
}
- @Override
public void handle(Signal arg0)
{
try
@@ -507,6 +508,16 @@ public class ServerConfiguration implements SignalHandler
return getConfig().getLong("minimumAlertRepeatGap", 0);
}
+ public long getCapacity()
+ {
+ return getConfig().getLong("capacity", 0L);
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return getConfig().getLong("flowResumeCapacity", getCapacity());
+ }
+
public int getProcessors()
{
return getConfig().getInt("connector.processors", 4);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 0273a13262..6c72025ec2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -166,4 +166,15 @@ public class VirtualHostConfiguration
return _config.getLong("queues.minimumAlertRepeatGap", 0);
}
+
+ public long getCapacity()
+ {
+ return _config.getLong("queues.capacity", 0l);
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _config.getLong("queues.flowResumeCapacity", getCapacity());
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
index 5ced7cc0b9..9169a1a651 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
+++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
@@ -249,12 +249,16 @@ CHN-1003 = Close
# 0 - bytes allowed in prefetch
# 1 - number of messagse.
CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+CHN-1005 = Flow Control Enforced (Queue {0})
+CHN-1006 = Flow Control Removed
#Queue
# 0 - owner
# 1 - priority
QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
QUE-1002 = Deleted
+QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
+QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
#Exchange
# 0 - type
@@ -269,4 +273,4 @@ BND-1002 = Deleted
#Subscription
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
-SUB-1003 = State : {0} \ No newline at end of file
+SUB-1003 = State : {0}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 2a692344d0..184504717e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -160,6 +160,17 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void setMinimumAlertRepeatGap(long value);
+ long getCapacity();
+
+ void setCapacity(long capacity);
+
+
+ long getFlowResumeCapacity();
+
+ void setFlowResumeCapacity(long flowResumeCapacity);
+
+
+
void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
long clearQueue(StoreContext storeContext) throws AMQException;
@@ -180,6 +191,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void stop();
+ void checkCapacity(AMQChannel channel);
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
* already exists.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 7509350e65..267ccf43ea 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -26,11 +26,105 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Map;
+import java.util.HashMap;
+
public class AMQQueueFactory
{
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+ private abstract static class QueueProperty
+ {
+
+ private final AMQShortString _argumentName;
+
+
+ public QueueProperty(String argumentName)
+ {
+ _argumentName = new AMQShortString(argumentName);
+ }
+
+ public AMQShortString getArgumentName()
+ {
+ return _argumentName;
+ }
+
+
+ public abstract void setPropertyValue(AMQQueue queue, Object value);
+
+ }
+
+ private abstract static class QueueLongProperty extends QueueProperty
+ {
+
+ public QueueLongProperty(String argumentName)
+ {
+ super(argumentName);
+ }
+
+ public void setPropertyValue(AMQQueue queue, Object value)
+ {
+ if(value instanceof Number)
+ {
+ setPropertyValue(queue, ((Number)value).longValue());
+ }
+
+ }
+
+ abstract void setPropertyValue(AMQQueue queue, long value);
+
+
+ }
+
+ private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
+ new QueueLongProperty("x-qpid-maximum-message-age")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageAge(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-maximum-message-size")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageSize(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-maximum-message-count")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageCount(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMinimumAlertRepeatGap(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-capacity")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setCapacity(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-flow-resume-capacity")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setFlowResumeCapacity(value);
+ }
+ }
+
+ };
+
+
+
public static AMQQueue createAMQQueueImpl(AMQShortString name,
boolean durable,
AMQShortString owner,
@@ -53,6 +147,18 @@ public class AMQQueueFactory
//Register the new queue
virtualHost.getQueueRegistry().registerQueue(q);
q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+
+ if(arguments != null)
+ {
+ for(QueueProperty p : DECLAREABLE_PROPERTIES)
+ {
+ if(arguments.containsKey(p.getArgumentName()))
+ {
+ p.setPropertyValue(q, arguments.get(p.getArgumentName()));
+ }
+ }
+ }
+
return q;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 01b249b847..3b58f05f93 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -42,8 +42,7 @@ public class QueueEntryImpl implements QueueEntry
private final SimpleQueueEntryList _queueEntryList;
- private AMQMessage _message;
-
+ private final AMQMessage _message;
private Set<Subscription> _rejectedBy = null;
@@ -191,7 +190,7 @@ public class QueueEntryImpl implements QueueEntry
public boolean immediateAndNotDelivered()
{
- return _message.immediateAndNotDelivered();
+ return getMessage().immediateAndNotDelivered();
}
public void setRedelivered(boolean b)
@@ -393,4 +392,5 @@ public class QueueEntryImpl implements QueueEntry
{
return _queueEntryList;
}
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 4b9fc9ca55..8b6c15c0c3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1,11 +1,10 @@
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +34,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.AMQChannel;
/*
*
@@ -96,6 +96,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final Executor _asyncDelivery;
private final AtomicLong _totalMessagesReceived = new AtomicLong();
+ private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+
/** max allowed size(KB) of a single message */
public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
@@ -122,6 +124,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private LogSubject _logSubject;
private LogActor _logActor;
+
+ private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+ private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
+
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
@@ -629,6 +635,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new FailedDequeueException(_name.toString(), e);
}
+ checkCapacity();
+
}
private void decrementQueueSize(final QueueEntry entry)
@@ -1173,6 +1181,58 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ public void checkCapacity(AMQChannel channel)
+ {
+ if(_capacity != 0l)
+ {
+ if(_atomicQueueSize.get() > _capacity)
+ {
+ //Overfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity));
+
+ if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
+ {
+ channel.block(this);
+ }
+
+ if(_atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+
+ //Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+
+ channel.unblock(this);
+ _blockedChannels.remove(channel);
+
+ }
+
+ }
+
+
+
+ }
+ }
+
+ private void checkCapacity()
+ {
+ if(_capacity != 0L)
+ {
+ if(_atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+ //Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+
+
+ for(AMQChannel c : _blockedChannels.keySet())
+ {
+ c.unblock(this);
+ _blockedChannels.remove(c);
+ }
+ }
+ }
+ }
+
+
public void deliverAsync()
{
Runner runner = new Runner(_stateChangeCount.incrementAndGet());
@@ -1544,6 +1604,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+
public void checkMessageStatus() throws AMQException
{
@@ -1651,6 +1712,27 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ public long getCapacity()
+ {
+ return _capacity;
+ }
+
+ public void setCapacity(long capacity)
+ {
+ _capacity = capacity;
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _flowResumeCapacity;
+ }
+
+ public void setFlowResumeCapacity(long flowResumeCapacity)
+ {
+ _flowResumeCapacity = flowResumeCapacity;
+ }
+
+
public Set<NotificationCheck> getNotificationChecks()
{
return _notificationChecks;
@@ -1720,6 +1802,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
setMaximumMessageSize(config.getMaximumMessageSize());
setMaximumMessageCount(config.getMaximumMessageCount());
setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
+ _capacity = config.getCapacity();
+ _flowResumeCapacity = config.getFlowResumeCapacity();
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 3c71282c57..450852cef7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -97,6 +97,7 @@ public class LocalTransactionalContext implements TransactionalContext
try
{
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
+ _queue.checkCapacity(_channel);
if(entry.immediateAndNotDelivered())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 28af36e3db..10d6021d27 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -91,6 +91,8 @@ public class NonTransactionalContext implements TransactionalContext
public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
{
QueueEntry entry = queue.enqueue(_storeContext, message);
+ queue.checkCapacity(_channel);
+
//following check implements the functionality
//required by the 'immediate' flag:
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index b16a289f0a..0a5274ab88 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
import org.apache.commons.configuration.Configuration;
@@ -271,6 +272,15 @@ public class MockAMQQueue implements AMQQueue
//To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean getBlockOnQueueFull()
+ {
+ return false;
+ }
+
+ public void setBlockOnQueueFull(boolean block)
+ {
+ }
+
public long getMinimumAlertRepeatGap()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
@@ -285,8 +295,7 @@ public class MockAMQQueue implements AMQQueue
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
-
- @Override
+
public void checkMessageStatus() throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -317,6 +326,10 @@ public class MockAMQQueue implements AMQQueue
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void checkCapacity(AMQChannel channel)
+ {
+ }
+
public ManagedObject getManagedObject()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -327,12 +340,31 @@ public class MockAMQQueue implements AMQQueue
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- @Override
public void setMinimumAlertRepeatGap(long value)
{
}
+ public long getCapacity()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setCapacity(long capacity)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setFlowResumeCapacity(long flowResumeCapacity)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void configure(QueueConfiguration config)
{