summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-17 15:53:42 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-17 15:53:42 +0000
commit9bd52fa485d73b3eb5c68d698e63243052a1db9c (patch)
treefea4a994556644221dbcf41e74f0c43a79cfd752 /qpid/java/broker-plugins
parent95fc93485ab66966713611a4e1429d917dabde64 (diff)
downloadqpid-python-9bd52fa485d73b3eb5c68d698e63243052a1db9c.tar.gz
QPID-6163 : [Java Broker] Disconnect clients which do not obey flow control
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632618 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java7
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java22
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java141
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java70
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java2
6 files changed, 161 insertions, 89 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 60bb5c6112..9bccfa53a3 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -65,7 +65,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
LogSubject, AuthorizationHolder
{
- private final Broker _broker;
+ private final Broker<?> _broker;
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
@@ -106,6 +106,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
return _reference;
}
+ public Broker<?> getBroker()
+ {
+ return _broker;
+ }
+
@Override
protected void invoke(Method method)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index b1c22fe823..dc5635654e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -62,6 +62,7 @@ import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
@@ -131,6 +132,8 @@ public class ServerSession extends Session
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private org.apache.qpid.server.model.Session<?> _modelObject;
+ private long _blockTime;
+ private long _blockingTimeout;
public static interface MessageDispositionChangeListener
@@ -182,6 +185,9 @@ public class ServerSession extends Session
getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
}
}, getVirtualHost());
+
+ _blockingTimeout = ((ServerConnection)connection).getBroker().getContextValue(Long.class,
+ Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
}
protected void setState(final State state)
@@ -774,6 +780,7 @@ public class ServerSession extends Session
{
invokeBlock();
}
+ _blockTime = System.currentTimeMillis();
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
}
@@ -798,7 +805,7 @@ public class ServerSession extends Session
{
if(_blocking.compareAndSet(true,false) && !isClosing())
{
-
+ _blockTime = 0l;
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
MessageFlow mf = new MessageFlow();
mf.setUnit(MessageCreditUnit.MESSAGE);
@@ -812,6 +819,17 @@ public class ServerSession extends Session
}
}
+ boolean blockingTimeoutExceeded()
+ {
+ long blockTime = _blockTime;
+ boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
+ if(b)
+ {
+ System.err.println(_blockingTimeout);
+ }
+ return b;
+ }
+
@Override
public Object getConnectionReference()
{
@@ -1065,7 +1083,7 @@ public class ServerSession extends Session
}
@Override
- public int compareTo(ServerSession o)
+ public int compareTo(AMQSessionModel o)
{
return getId().compareTo(o.getId());
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 14082091f9..77dba71fac 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -34,11 +34,13 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
@@ -331,84 +333,103 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
- final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
-
- final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
- if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ if(((ServerSession)ssn).blockingTimeoutExceeded())
{
- delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
- }
-
- final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+ getVirtualHost(ssn).getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED());
- final VirtualHostImpl virtualHost = getVirtualHost(ssn);
- try
- {
- virtualHost.getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName(), virtualHost.getName());
+ ((ServerSession) ssn).close(AMQConstant.MESSAGE_TOO_LARGE,
+ "Session flow control was requested, but not enforced by sender");
}
- catch (AccessControlException e)
+ else
{
- ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
- exception(ssn, xfr, errorCode, e.getMessage());
+ final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
- return;
- }
+ final DeliveryProperties delvProps =
+ xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
+ if (delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ {
+ delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
+ }
- final MessageStore store = virtualHost.getMessageStore();
- final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
- final ServerSession serverSession = (ServerSession) ssn;
- final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
- MessageReference<MessageTransferMessage> reference = message.newReference();
+ final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
- final InstanceProperties instanceProperties = new InstanceProperties()
- {
- @Override
- public Object getProperty(final Property prop)
+ final VirtualHostImpl virtualHost = getVirtualHost(ssn);
+ try
+ {
+ virtualHost.getSecurityManager()
+ .authorisePublish(messageMetaData.isImmediate(),
+ messageMetaData.getRoutingKey(),
+ exchange.getName(),
+ virtualHost.getName());
+ }
+ catch (AccessControlException e)
{
- switch(prop)
+ ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
+ exception(ssn, xfr, errorCode, e.getMessage());
+
+ return;
+ }
+
+ final MessageStore store = virtualHost.getMessageStore();
+ final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
+ final ServerSession serverSession = (ServerSession) ssn;
+ final MessageTransferMessage message =
+ new MessageTransferMessage(storeMessage, serverSession.getReference());
+ MessageReference<MessageTransferMessage> reference = message.newReference();
+
+ final InstanceProperties instanceProperties = new InstanceProperties()
+ {
+ @Override
+ public Object getProperty(final Property prop)
{
- case EXPIRATION:
- return message.getExpiration();
- case IMMEDIATE:
- return message.isImmediate();
- case MANDATORY:
- return (delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
- case PERSISTENT:
- return message.isPersistent();
- case REDELIVERED:
- return delvProps.getRedelivered();
+ switch (prop)
+ {
+ case EXPIRATION:
+ return message.getExpiration();
+ case IMMEDIATE:
+ return message.isImmediate();
+ case MANDATORY:
+ return (delvProps == null || !delvProps.getDiscardUnroutable())
+ && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
+ case PERSISTENT:
+ return message.isPersistent();
+ case REDELIVERED:
+ return delvProps.getRedelivered();
+ }
+ return null;
}
- return null;
- }
- };
+ };
- int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
+ int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
- if(enqueues == 0)
- {
- if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ if (enqueues == 0)
{
- RangeSet rejects = RangeSetFactory.createRangeSet();
- rejects.add(xfr.getId());
- MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
- ssn.invoke(reject);
+ if ((delvProps == null || !delvProps.getDiscardUnroutable())
+ && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ {
+ RangeSet rejects = RangeSetFactory.createRangeSet();
+ rejects.add(xfr.getId());
+ MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+ ssn.invoke(reject);
+ }
+ else
+ {
+ virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(),
+ messageMetaData.getRoutingKey()));
+ }
+ }
+
+ if (serverSession.isTransactional())
+ {
+ serverSession.processed(xfr);
}
else
{
- virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(),
- messageMetaData.getRoutingKey()));
+ serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
+ new CommandProcessedAction(serverSession, xfr));
}
+ reference.release();
}
-
- if(serverSession.isTransactional())
- {
- serverSession.processed(xfr);
- }
- else
- {
- serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
- }
- reference.release();
}
private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index 01dd523e3e..58de3edb61 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -18,14 +18,16 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.Binary;
-import static org.mockito.Mockito.mock;
-
public class ServerSessionTest extends QpidTestCase
{
@@ -59,6 +61,8 @@ public class ServerSessionTest extends QpidTestCase
public void testCompareTo() throws Exception
{
final Broker broker = mock(Broker.class);
+ when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
+
ServerConnection connection = new ServerConnection(1, broker);
connection.setVirtualHost(_virtualHost);
ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index d3ddaa16dd..e511878ff1 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -79,6 +79,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
@@ -201,6 +202,8 @@ public class AMQChannel
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
+ private long _blockTime;
+ private long _blockingTimeout;
private boolean _confirmOnPublish;
private long _confirmedMessageCounter;
@@ -217,7 +220,8 @@ public class AMQChannel
_logSubject = new ChannelLogSubject(this);
_messageStore = messageStore;
-
+ _blockingTimeout = connection.getBroker().getContextValue(Long.class,
+ Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
// by default the session is non-transactional
_transaction = new AsyncAutoCommitTransaction(_messageStore, this);
@@ -1317,7 +1321,7 @@ public class AMQChannel
}
@Override
- public int compareTo(AMQChannel o)
+ public int compareTo(AMQSessionModel o)
{
return getId().compareTo(o.getId());
}
@@ -1554,6 +1558,7 @@ public class AMQChannel
getVirtualHost().getEventLogger().message(_logSubject,
ChannelMessages.FLOW_ENFORCED("** All Queues **"));
flow(false);
+ _blockTime = System.currentTimeMillis();
}
}
}
@@ -1580,6 +1585,8 @@ public class AMQChannel
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName()));
flow(false);
+ _blockTime = System.currentTimeMillis();
+
}
}
}
@@ -2146,44 +2153,61 @@ public class AMQChannel
" immediate: " + immediate + " ]");
}
- VirtualHostImpl vHost = _connection.getVirtualHost();
- MessageDestination destination;
- if (isDefaultExchange(exchangeName))
- {
- destination = vHost.getDefaultDestination();
- }
- else
- {
- destination = vHost.getMessageDestination(exchangeName.toString());
- }
+ VirtualHostImpl vHost = _connection.getVirtualHost();
- // if the exchange does not exist we raise a channel exception
- if (destination == null)
+ if(blockingTimeoutExceeded())
{
- closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName);
+ getVirtualHost().getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED());
+ closeChannel(AMQConstant.MESSAGE_TOO_LARGE,
+ "Channel flow control was requested, but not enforced by sender");
}
else
{
+ MessageDestination destination;
- MessagePublishInfo info = new MessagePublishInfo(exchangeName,
- immediate,
- mandatory,
- routingKey);
+ if (isDefaultExchange(exchangeName))
+ {
+ destination = vHost.getDefaultDestination();
+ }
+ else
+ {
+ destination = vHost.getMessageDestination(exchangeName.toString());
+ }
- try
+ // if the exchange does not exist we raise a channel exception
+ if (destination == null)
{
- setPublishFrame(info, destination);
+ closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName);
}
- catch (AccessControlException e)
+ else
{
- _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ MessagePublishInfo info = new MessagePublishInfo(exchangeName,
+ immediate,
+ mandatory,
+ routingKey);
+
+ try
+ {
+ setPublishFrame(info, destination);
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+ }
}
}
}
+ private boolean blockingTimeoutExceeded()
+ {
+
+ return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout;
+ }
+
@Override
public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global)
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 379dcb01f2..8d71f980e5 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -722,7 +722,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
}
@Override
- public int compareTo(Session_1_0 o)
+ public int compareTo(AMQSessionModel o)
{
return getId().compareTo(o.getId());
}