diff options
Diffstat (limited to 'qpid/java')
12 files changed, 211 insertions, 101 deletions
diff --git a/qpid/java/broker-core/build-generate-sources.xml b/qpid/java/broker-core/build-generate-sources.xml index 56637c2bce..43ea9e2f23 100644 --- a/qpid/java/broker-core/build-generate-sources.xml +++ b/qpid/java/broker-core/build-generate-sources.xml @@ -79,7 +79,7 @@ <echo message="logmessages is ${logmessages}"/> - <java classname="org.apache.qpid.server.logging.GenerateLogMessages" fork="true" dir="${gentools.classes}" failonerror="true"> + <java classname="org.apache.qpid.server.logging.GenerateLogMessages" fork="true" failonerror="true"> <arg line="'${logmessages}'"/> <arg value="-j"/> <arg value="-o"/> diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java index 0cd0828623..6ae1ac4f02 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java @@ -22,14 +22,15 @@ package org.apache.qpid.server.logging.messages; import static org.apache.qpid.server.logging.AbstractMessageLogger.DEFAULT_LOG_HIERARCHY_PREFIX; -import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.logging.LogMessage; - import java.text.MessageFormat; import java.util.Locale; import java.util.ResourceBundle; +import org.apache.log4j.Logger; + +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.logging.LogMessage; + /** * DO NOT EDIT DIRECTLY, THIS FILE WAS GENERATED. * @@ -53,6 +54,7 @@ public class ChannelMessages public static final String DEADLETTERMSG_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.deadlettermsg"; public static final String DISCARDMSG_NOALTEXCH_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.discardmsg_noaltexch"; public static final String IDLE_TXN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.idle_txn"; + public static final String FLOW_CONTROL_IGNORED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.flow_control_ignored"; public static final String DISCARDMSG_NOROUTE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.discardmsg_noroute"; public static final String OPEN_TXN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.open_txn"; public static final String FLOW_REMOVED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.flow_removed"; @@ -69,6 +71,7 @@ public class ChannelMessages Logger.getLogger(DEADLETTERMSG_LOG_HIERARCHY); Logger.getLogger(DISCARDMSG_NOALTEXCH_LOG_HIERARCHY); Logger.getLogger(IDLE_TXN_LOG_HIERARCHY); + Logger.getLogger(FLOW_CONTROL_IGNORED_LOG_HIERARCHY); Logger.getLogger(DISCARDMSG_NOROUTE_LOG_HIERARCHY); Logger.getLogger(OPEN_TXN_LOG_HIERARCHY); Logger.getLogger(FLOW_REMOVED_LOG_HIERARCHY); @@ -356,6 +359,33 @@ public class ChannelMessages /** * Log a Channel message of the Format: + * <pre>CHN-1012 : Flow Control Ignored. Channel will be closed.</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage FLOW_CONTROL_IGNORED() + { + String rawMessage = _messages.getString("FLOW_CONTROL_IGNORED"); + + final String message = rawMessage; + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return FLOW_CONTROL_IGNORED_LOG_HIERARCHY; + } + }; + } + + /** + * Log a Channel message of the Format: * <pre>CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties index 397c12d73c..5c6e066541 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties @@ -38,3 +38,5 @@ IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2} DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1} DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1} + +FLOW_CONTROL_IGNORED = CHN-1012 : Flow Control Ignored. Channel will be closed. diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 4c5293ff65..4e4acb3e21 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -51,6 +51,8 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL String MODEL_VERSION = "modelVersion"; String CONFIDENTIAL_CONFIGURATION_ENCRYPTION_PROVIDER = "confidentialConfigurationEncryptionProvider"; + String CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT = "channel.flowControlEnforcementTimeout"; + String CONNECTION_SESSION_COUNT_LIMIT = "connection.sessionCountLimit"; String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay"; String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute"; @@ -63,19 +65,22 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL String QPID_JMX_PORT = "qpid.jmx_port"; @ManagedContextDefault(name = "broker.name") - static final String DEFAULT_BROKER_NAME = "Broker"; + String DEFAULT_BROKER_NAME = "Broker"; @ManagedContextDefault(name = QPID_AMQP_PORT) - public static final String DEFAULT_AMQP_PORT_NUMBER = "5672"; + String DEFAULT_AMQP_PORT_NUMBER = "5672"; @ManagedContextDefault(name = QPID_HTTP_PORT) - public static final String DEFAULT_HTTP_PORT_NUMBER = "8080"; + String DEFAULT_HTTP_PORT_NUMBER = "8080"; @ManagedContextDefault(name = QPID_RMI_PORT) - public static final String DEFAULT_RMI_PORT_NUMBER = "8999"; + String DEFAULT_RMI_PORT_NUMBER = "8999"; @ManagedContextDefault(name = QPID_JMX_PORT) - public static final String DEFAULT_JMX_PORT_NUMBER = "9099"; + String DEFAULT_JMX_PORT_NUMBER = "9099"; @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD) - public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory()); + long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory()); + + @ManagedContextDefault(name = CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT) + long DEFAULT_CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT = 5000l; String BROKER_FRAME_SIZE = "qpid.broker_frame_size"; @ManagedContextDefault(name = BROKER_FRAME_SIZE) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index a9cd32f8f9..f13af479ad 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -36,7 +36,7 @@ import org.apache.qpid.server.util.Deletable; * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet} * when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}. */ -public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQConnectionModel<C,T>> extends Comparable<T>, Deletable<T> +public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQConnectionModel<C,T>> extends Comparable<AMQSessionModel>, Deletable<T> { public UUID getId(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 0bee92a2e9..2f44218cf1 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -97,6 +97,7 @@ public class BrokerTestHelper when(broker.getEventLogger()).thenReturn(eventLogger); when(broker.getCategoryClass()).thenReturn(Broker.class); when(broker.getParent(SystemConfig.class)).thenReturn(systemConfig); + when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l); when(broker.getTaskExecutor()).thenReturn(TASK_EXECUTOR); when(systemConfig.getTaskExecutor()).thenReturn(TASK_EXECUTOR); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 60bb5c6112..9bccfa53a3 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -65,7 +65,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S LogSubject, AuthorizationHolder { - private final Broker _broker; + private final Broker<?> _broker; private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); @@ -106,6 +106,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S return _reference; } + public Broker<?> getBroker() + { + return _broker; + } + @Override protected void invoke(Method method) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index b1c22fe823..dc5635654e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -62,6 +62,7 @@ import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; @@ -131,6 +132,8 @@ public class ServerSession extends Session private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private org.apache.qpid.server.model.Session<?> _modelObject; + private long _blockTime; + private long _blockingTimeout; public static interface MessageDispositionChangeListener @@ -182,6 +185,9 @@ public class ServerSession extends Session getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); } }, getVirtualHost()); + + _blockingTimeout = ((ServerConnection)connection).getBroker().getContextValue(Long.class, + Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT); } protected void setState(final State state) @@ -774,6 +780,7 @@ public class ServerSession extends Session { invokeBlock(); } + _blockTime = System.currentTimeMillis(); getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); } @@ -798,7 +805,7 @@ public class ServerSession extends Session { if(_blocking.compareAndSet(true,false) && !isClosing()) { - + _blockTime = 0l; getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); MessageFlow mf = new MessageFlow(); mf.setUnit(MessageCreditUnit.MESSAGE); @@ -812,6 +819,17 @@ public class ServerSession extends Session } } + boolean blockingTimeoutExceeded() + { + long blockTime = _blockTime; + boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout; + if(b) + { + System.err.println(_blockingTimeout); + } + return b; + } + @Override public Object getConnectionReference() { @@ -1065,7 +1083,7 @@ public class ServerSession extends Session } @Override - public int compareTo(ServerSession o) + public int compareTo(AMQSessionModel o) { return getId().compareTo(o.getId()); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 14082091f9..77dba71fac 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -34,11 +34,13 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; +import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageDestination; @@ -331,84 +333,103 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageTransfer(Session ssn, final MessageTransfer xfr) { - final MessageDestination exchange = getDestinationForMessage(ssn, xfr); - - final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties(); - if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration()) + if(((ServerSession)ssn).blockingTimeoutExceeded()) { - delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); - } - - final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); + getVirtualHost(ssn).getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED()); - final VirtualHostImpl virtualHost = getVirtualHost(ssn); - try - { - virtualHost.getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName(), virtualHost.getName()); + ((ServerSession) ssn).close(AMQConstant.MESSAGE_TOO_LARGE, + "Session flow control was requested, but not enforced by sender"); } - catch (AccessControlException e) + else { - ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS; - exception(ssn, xfr, errorCode, e.getMessage()); + final MessageDestination exchange = getDestinationForMessage(ssn, xfr); - return; - } + final DeliveryProperties delvProps = + xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties(); + if (delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration()) + { + delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); + } - final MessageStore store = virtualHost.getMessageStore(); - final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); - final ServerSession serverSession = (ServerSession) ssn; - final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); - MessageReference<MessageTransferMessage> reference = message.newReference(); + final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); - final InstanceProperties instanceProperties = new InstanceProperties() - { - @Override - public Object getProperty(final Property prop) + final VirtualHostImpl virtualHost = getVirtualHost(ssn); + try + { + virtualHost.getSecurityManager() + .authorisePublish(messageMetaData.isImmediate(), + messageMetaData.getRoutingKey(), + exchange.getName(), + virtualHost.getName()); + } + catch (AccessControlException e) { - switch(prop) + ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS; + exception(ssn, xfr, errorCode, e.getMessage()); + + return; + } + + final MessageStore store = virtualHost.getMessageStore(); + final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); + final ServerSession serverSession = (ServerSession) ssn; + final MessageTransferMessage message = + new MessageTransferMessage(storeMessage, serverSession.getReference()); + MessageReference<MessageTransferMessage> reference = message.newReference(); + + final InstanceProperties instanceProperties = new InstanceProperties() + { + @Override + public Object getProperty(final Property prop) { - case EXPIRATION: - return message.getExpiration(); - case IMMEDIATE: - return message.isImmediate(); - case MANDATORY: - return (delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT; - case PERSISTENT: - return message.isPersistent(); - case REDELIVERED: - return delvProps.getRedelivered(); + switch (prop) + { + case EXPIRATION: + return message.getExpiration(); + case IMMEDIATE: + return message.isImmediate(); + case MANDATORY: + return (delvProps == null || !delvProps.getDiscardUnroutable()) + && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT; + case PERSISTENT: + return message.isPersistent(); + case REDELIVERED: + return delvProps.getRedelivered(); + } + return null; } - return null; - } - }; + }; - int enqueues = serverSession.enqueue(message, instanceProperties, exchange); + int enqueues = serverSession.enqueue(message, instanceProperties, exchange); - if(enqueues == 0) - { - if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) + if (enqueues == 0) { - RangeSet rejects = RangeSetFactory.createRangeSet(); - rejects.add(xfr.getId()); - MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); - ssn.invoke(reject); + if ((delvProps == null || !delvProps.getDiscardUnroutable()) + && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) + { + RangeSet rejects = RangeSetFactory.createRangeSet(); + rejects.add(xfr.getId()); + MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); + ssn.invoke(reject); + } + else + { + virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(), + messageMetaData.getRoutingKey())); + } + } + + if (serverSession.isTransactional()) + { + serverSession.processed(xfr); } else { - virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(), - messageMetaData.getRoutingKey())); + serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, + new CommandProcessedAction(serverSession, xfr)); } + reference.release(); } - - if(serverSession.isTransactional()) - { - serverSession.processed(xfr); - } - else - { - serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr)); - } - reference.release(); } private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr, diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java index 01dd523e3e..58de3edb61 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java @@ -18,14 +18,16 @@ */ package org.apache.qpid.server.protocol.v0_10; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.Binary; -import static org.mockito.Mockito.mock; - public class ServerSessionTest extends QpidTestCase { @@ -59,6 +61,8 @@ public class ServerSessionTest extends QpidTestCase public void testCompareTo() throws Exception { final Broker broker = mock(Broker.class); + when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l); + ServerConnection connection = new ServerConnection(1, broker); connection.setVirtualHost(_virtualHost); ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(), diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index d3ddaa16dd..e511878ff1 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -79,6 +79,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; @@ -201,6 +202,8 @@ public class AMQChannel private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private Session<?> _modelObject; + private long _blockTime; + private long _blockingTimeout; private boolean _confirmOnPublish; private long _confirmedMessageCounter; @@ -217,7 +220,8 @@ public class AMQChannel _logSubject = new ChannelLogSubject(this); _messageStore = messageStore; - + _blockingTimeout = connection.getBroker().getContextValue(Long.class, + Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT); // by default the session is non-transactional _transaction = new AsyncAutoCommitTransaction(_messageStore, this); @@ -1317,7 +1321,7 @@ public class AMQChannel } @Override - public int compareTo(AMQChannel o) + public int compareTo(AMQSessionModel o) { return getId().compareTo(o.getId()); } @@ -1554,6 +1558,7 @@ public class AMQChannel getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **")); flow(false); + _blockTime = System.currentTimeMillis(); } } } @@ -1580,6 +1585,8 @@ public class AMQChannel { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName())); flow(false); + _blockTime = System.currentTimeMillis(); + } } } @@ -2146,44 +2153,61 @@ public class AMQChannel " immediate: " + immediate + " ]"); } - VirtualHostImpl vHost = _connection.getVirtualHost(); - MessageDestination destination; - if (isDefaultExchange(exchangeName)) - { - destination = vHost.getDefaultDestination(); - } - else - { - destination = vHost.getMessageDestination(exchangeName.toString()); - } + VirtualHostImpl vHost = _connection.getVirtualHost(); - // if the exchange does not exist we raise a channel exception - if (destination == null) + if(blockingTimeoutExceeded()) { - closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName); + getVirtualHost().getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED()); + closeChannel(AMQConstant.MESSAGE_TOO_LARGE, + "Channel flow control was requested, but not enforced by sender"); } else { + MessageDestination destination; - MessagePublishInfo info = new MessagePublishInfo(exchangeName, - immediate, - mandatory, - routingKey); + if (isDefaultExchange(exchangeName)) + { + destination = vHost.getDefaultDestination(); + } + else + { + destination = vHost.getMessageDestination(exchangeName.toString()); + } - try + // if the exchange does not exist we raise a channel exception + if (destination == null) { - setPublishFrame(info, destination); + closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName); } - catch (AccessControlException e) + else { - _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); + MessagePublishInfo info = new MessagePublishInfo(exchangeName, + immediate, + mandatory, + routingKey); + + try + { + setPublishFrame(info, destination); + } + catch (AccessControlException e) + { + _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); + + } } } } + private boolean blockingTimeoutExceeded() + { + + return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout; + } + @Override public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 379dcb01f2..8d71f980e5 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -722,7 +722,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } @Override - public int compareTo(Session_1_0 o) + public int compareTo(AMQSessionModel o) { return getId().compareTo(o.getId()); } |
