diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 15:53:42 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 15:53:42 +0000 |
| commit | 9bd52fa485d73b3eb5c68d698e63243052a1db9c (patch) | |
| tree | fea4a994556644221dbcf41e74f0c43a79cfd752 /qpid/java/broker-plugins | |
| parent | 95fc93485ab66966713611a4e1429d917dabde64 (diff) | |
| download | qpid-python-9bd52fa485d73b3eb5c68d698e63243052a1db9c.tar.gz | |
QPID-6163 : [Java Broker] Disconnect clients which do not obey flow control
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632618 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
6 files changed, 161 insertions, 89 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 60bb5c6112..9bccfa53a3 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -65,7 +65,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S LogSubject, AuthorizationHolder { - private final Broker _broker; + private final Broker<?> _broker; private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); @@ -106,6 +106,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S return _reference; } + public Broker<?> getBroker() + { + return _broker; + } + @Override protected void invoke(Method method) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index b1c22fe823..dc5635654e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -62,6 +62,7 @@ import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; @@ -131,6 +132,8 @@ public class ServerSession extends Session private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private org.apache.qpid.server.model.Session<?> _modelObject; + private long _blockTime; + private long _blockingTimeout; public static interface MessageDispositionChangeListener @@ -182,6 +185,9 @@ public class ServerSession extends Session getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); } }, getVirtualHost()); + + _blockingTimeout = ((ServerConnection)connection).getBroker().getContextValue(Long.class, + Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT); } protected void setState(final State state) @@ -774,6 +780,7 @@ public class ServerSession extends Session { invokeBlock(); } + _blockTime = System.currentTimeMillis(); getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); } @@ -798,7 +805,7 @@ public class ServerSession extends Session { if(_blocking.compareAndSet(true,false) && !isClosing()) { - + _blockTime = 0l; getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); MessageFlow mf = new MessageFlow(); mf.setUnit(MessageCreditUnit.MESSAGE); @@ -812,6 +819,17 @@ public class ServerSession extends Session } } + boolean blockingTimeoutExceeded() + { + long blockTime = _blockTime; + boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout; + if(b) + { + System.err.println(_blockingTimeout); + } + return b; + } + @Override public Object getConnectionReference() { @@ -1065,7 +1083,7 @@ public class ServerSession extends Session } @Override - public int compareTo(ServerSession o) + public int compareTo(AMQSessionModel o) { return getId().compareTo(o.getId()); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 14082091f9..77dba71fac 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -34,11 +34,13 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; +import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageDestination; @@ -331,84 +333,103 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageTransfer(Session ssn, final MessageTransfer xfr) { - final MessageDestination exchange = getDestinationForMessage(ssn, xfr); - - final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties(); - if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration()) + if(((ServerSession)ssn).blockingTimeoutExceeded()) { - delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); - } - - final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); + getVirtualHost(ssn).getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED()); - final VirtualHostImpl virtualHost = getVirtualHost(ssn); - try - { - virtualHost.getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName(), virtualHost.getName()); + ((ServerSession) ssn).close(AMQConstant.MESSAGE_TOO_LARGE, + "Session flow control was requested, but not enforced by sender"); } - catch (AccessControlException e) + else { - ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS; - exception(ssn, xfr, errorCode, e.getMessage()); + final MessageDestination exchange = getDestinationForMessage(ssn, xfr); - return; - } + final DeliveryProperties delvProps = + xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties(); + if (delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration()) + { + delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); + } - final MessageStore store = virtualHost.getMessageStore(); - final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); - final ServerSession serverSession = (ServerSession) ssn; - final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); - MessageReference<MessageTransferMessage> reference = message.newReference(); + final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); - final InstanceProperties instanceProperties = new InstanceProperties() - { - @Override - public Object getProperty(final Property prop) + final VirtualHostImpl virtualHost = getVirtualHost(ssn); + try + { + virtualHost.getSecurityManager() + .authorisePublish(messageMetaData.isImmediate(), + messageMetaData.getRoutingKey(), + exchange.getName(), + virtualHost.getName()); + } + catch (AccessControlException e) { - switch(prop) + ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS; + exception(ssn, xfr, errorCode, e.getMessage()); + + return; + } + + final MessageStore store = virtualHost.getMessageStore(); + final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); + final ServerSession serverSession = (ServerSession) ssn; + final MessageTransferMessage message = + new MessageTransferMessage(storeMessage, serverSession.getReference()); + MessageReference<MessageTransferMessage> reference = message.newReference(); + + final InstanceProperties instanceProperties = new InstanceProperties() + { + @Override + public Object getProperty(final Property prop) { - case EXPIRATION: - return message.getExpiration(); - case IMMEDIATE: - return message.isImmediate(); - case MANDATORY: - return (delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT; - case PERSISTENT: - return message.isPersistent(); - case REDELIVERED: - return delvProps.getRedelivered(); + switch (prop) + { + case EXPIRATION: + return message.getExpiration(); + case IMMEDIATE: + return message.isImmediate(); + case MANDATORY: + return (delvProps == null || !delvProps.getDiscardUnroutable()) + && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT; + case PERSISTENT: + return message.isPersistent(); + case REDELIVERED: + return delvProps.getRedelivered(); + } + return null; } - return null; - } - }; + }; - int enqueues = serverSession.enqueue(message, instanceProperties, exchange); + int enqueues = serverSession.enqueue(message, instanceProperties, exchange); - if(enqueues == 0) - { - if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) + if (enqueues == 0) { - RangeSet rejects = RangeSetFactory.createRangeSet(); - rejects.add(xfr.getId()); - MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); - ssn.invoke(reject); + if ((delvProps == null || !delvProps.getDiscardUnroutable()) + && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) + { + RangeSet rejects = RangeSetFactory.createRangeSet(); + rejects.add(xfr.getId()); + MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); + ssn.invoke(reject); + } + else + { + virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(), + messageMetaData.getRoutingKey())); + } + } + + if (serverSession.isTransactional()) + { + serverSession.processed(xfr); } else { - virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(), - messageMetaData.getRoutingKey())); + serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, + new CommandProcessedAction(serverSession, xfr)); } + reference.release(); } - - if(serverSession.isTransactional()) - { - serverSession.processed(xfr); - } - else - { - serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr)); - } - reference.release(); } private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr, diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java index 01dd523e3e..58de3edb61 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java @@ -18,14 +18,16 @@ */ package org.apache.qpid.server.protocol.v0_10; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.Binary; -import static org.mockito.Mockito.mock; - public class ServerSessionTest extends QpidTestCase { @@ -59,6 +61,8 @@ public class ServerSessionTest extends QpidTestCase public void testCompareTo() throws Exception { final Broker broker = mock(Broker.class); + when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l); + ServerConnection connection = new ServerConnection(1, broker); connection.setVirtualHost(_virtualHost); ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(), diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index d3ddaa16dd..e511878ff1 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -79,6 +79,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; @@ -201,6 +202,8 @@ public class AMQChannel private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private Session<?> _modelObject; + private long _blockTime; + private long _blockingTimeout; private boolean _confirmOnPublish; private long _confirmedMessageCounter; @@ -217,7 +220,8 @@ public class AMQChannel _logSubject = new ChannelLogSubject(this); _messageStore = messageStore; - + _blockingTimeout = connection.getBroker().getContextValue(Long.class, + Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT); // by default the session is non-transactional _transaction = new AsyncAutoCommitTransaction(_messageStore, this); @@ -1317,7 +1321,7 @@ public class AMQChannel } @Override - public int compareTo(AMQChannel o) + public int compareTo(AMQSessionModel o) { return getId().compareTo(o.getId()); } @@ -1554,6 +1558,7 @@ public class AMQChannel getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **")); flow(false); + _blockTime = System.currentTimeMillis(); } } } @@ -1580,6 +1585,8 @@ public class AMQChannel { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName())); flow(false); + _blockTime = System.currentTimeMillis(); + } } } @@ -2146,44 +2153,61 @@ public class AMQChannel " immediate: " + immediate + " ]"); } - VirtualHostImpl vHost = _connection.getVirtualHost(); - MessageDestination destination; - if (isDefaultExchange(exchangeName)) - { - destination = vHost.getDefaultDestination(); - } - else - { - destination = vHost.getMessageDestination(exchangeName.toString()); - } + VirtualHostImpl vHost = _connection.getVirtualHost(); - // if the exchange does not exist we raise a channel exception - if (destination == null) + if(blockingTimeoutExceeded()) { - closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName); + getVirtualHost().getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED()); + closeChannel(AMQConstant.MESSAGE_TOO_LARGE, + "Channel flow control was requested, but not enforced by sender"); } else { + MessageDestination destination; - MessagePublishInfo info = new MessagePublishInfo(exchangeName, - immediate, - mandatory, - routingKey); + if (isDefaultExchange(exchangeName)) + { + destination = vHost.getDefaultDestination(); + } + else + { + destination = vHost.getMessageDestination(exchangeName.toString()); + } - try + // if the exchange does not exist we raise a channel exception + if (destination == null) { - setPublishFrame(info, destination); + closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName); } - catch (AccessControlException e) + else { - _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); + MessagePublishInfo info = new MessagePublishInfo(exchangeName, + immediate, + mandatory, + routingKey); + + try + { + setPublishFrame(info, destination); + } + catch (AccessControlException e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); + + } } } } + private boolean blockingTimeoutExceeded() + { + + return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout; + } + @Override public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 379dcb01f2..8d71f980e5 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -722,7 +722,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } @Override - public int compareTo(Session_1_0 o) + public int compareTo(AMQSessionModel o) { return getId().compareTo(o.getId()); } |
