diff options
| author | Alex Rudyy <orudyy@apache.org> | 2013-06-21 17:06:57 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2013-06-21 17:06:57 +0000 |
| commit | e409124b9f3a7423fe4ab04e7ce3e446244d04e3 (patch) | |
| tree | 95eb9be13518f19536314f7c0993fe40d84c70c9 /qpid/java/broker | |
| parent | 8bdb080ef1f4afb1727dc3fc5f2666bdfd982107 (diff) | |
| download | qpid-python-e409124b9f3a7423fe4ab04e7ce3e446244d04e3.tar.gz | |
QPID-4943: Introduce a feature for 0-8/0-9/0-9-1 protocols to close a connection on receiving a mandatory unroutable message in a transacted session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1495511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
6 files changed, 209 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 8d88ee902a..8588aea2d4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQMethodBody; @@ -324,14 +325,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { if(destinationQueues == null || destinationQueues.isEmpty()) { - if (_currentMessage.isMandatory() || _currentMessage.isImmediate()) - { - _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", _currentMessage)); - } - else - { - _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey())); - } + handleUnroutableMessage(); } else { @@ -378,6 +372,61 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } + /** + * Either throws a {@link AMQConnectionException} or returns the message + * + * Pre-requisite: the current message is judged to have no destination queues. + * + * @throws AMQConnectionException if the message is mandatoryclose-on-no-route + * @see AMQProtocolSession#isCloseWhenNoRoute() + */ + private void handleUnroutableMessage() throws AMQConnectionException + { + boolean mandatory = _currentMessage.isMandatory(); + String description = currentMessageDescription(); + boolean closeOnNoRoute = _session.isCloseWhenNoRoute(); + + if(_logger.isDebugEnabled()) + { + _logger.debug(String.format( + "Unroutable message %s, mandatory=%s, transactionalSession=%s, closeOnNoRoute=%s", + description, mandatory, isTransactional(), closeOnNoRoute)); + } + + if (mandatory && isTransactional() && _session.isCloseWhenNoRoute()) + { + throw new AMQConnectionException( + AMQConstant.NO_ROUTE, + "No route for message " + currentMessageDescription(), + 0, 0, // default class and method ids + getProtocolSession().getProtocolVersion().getMajorVersion(), + getProtocolSession().getProtocolVersion().getMinorVersion(), + (Throwable) null); + } + + if (mandatory || _currentMessage.isImmediate()) + { + _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), _currentMessage)); + } + else + { + _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey())); + } + } + + private String currentMessageDescription() + { + if(_currentMessage == null || !_currentMessage.allContentReceived()) + { + throw new IllegalStateException("Cannot create message description for message: " + _currentMessage); + } + + return String.format( + "[Exchange: %s, Routing key: %s]", + _currentMessage.getExchange(), + _currentMessage.getRoutingKey()); + } + public void publishContentBody(ContentBody contentBody) throws AMQException { if (_currentMessage == null) @@ -522,6 +571,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * * @throws AMQException if there is an error during closure */ + @Override public void close() throws AMQException { if(!_closing.compareAndSet(false, true)) @@ -1344,7 +1394,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _message, _channelId, _errorCode.getCode(), - new AMQShortString(_description)); + AMQShortString.valueOf(_description, true, true)); } catch (AMQException e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java index 24fd687240..8f565362b4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java @@ -73,6 +73,7 @@ public interface Broker extends ConfiguredObject String CONNECTION_SESSION_COUNT_LIMIT = "connection.sessionCountLimit"; String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay"; + String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute"; String VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD = "virtualhost.housekeepingCheckPeriod"; String VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = "virtualhost.storeTransactionIdleTimeoutClose"; @@ -113,6 +114,7 @@ public interface Broker extends ConfiguredObject VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, CONNECTION_SESSION_COUNT_LIMIT, CONNECTION_HEART_BEAT_DELAY, + CONNECTION_CLOSE_WHEN_NO_ROUTE, STATISTICS_REPORTING_PERIOD, STATISTICS_REPORTING_RESET_ENABLED, STORE_TYPE, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 678db43d58..ff9cac9a21 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -92,6 +92,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat put(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, Integer.class); put(CONNECTION_SESSION_COUNT_LIMIT, Integer.class); put(CONNECTION_HEART_BEAT_DELAY, Integer.class); + put(CONNECTION_CLOSE_WHEN_NO_ROUTE, Boolean.class); put(STATISTICS_REPORTING_PERIOD, Integer.class); put(NAME, String.class); @@ -124,6 +125,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN = 0l; public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = 0l; public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_WARN = 0l; + public static final boolean DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE = true; @SuppressWarnings("serial") private static final Map<String, Object> DEFAULTS = Collections.unmodifiableMap(new HashMap<String, Object>(){{ @@ -141,6 +143,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, DEFAULT_HOUSEKEEPING_CHECK_PERIOD); put(Broker.CONNECTION_HEART_BEAT_DELAY, DEFAULT_HEART_BEAT_DELAY); put(Broker.CONNECTION_SESSION_COUNT_LIMIT, DEFAULT_SESSION_COUNT_LIMIT); + put(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE); put(Broker.NAME, DEFAULT_NAME); put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE); put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index bbf90fad86..92d6683415 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -27,8 +27,10 @@ import java.nio.ByteBuffer; import java.security.Principal; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; @@ -39,6 +41,7 @@ import java.util.concurrent.locks.ReentrantLock; import javax.security.auth.Subject; import javax.security.sasl.SaslServer; + import org.apache.log4j.Logger; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; @@ -47,7 +50,24 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQProtocolHeaderException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -102,6 +122,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; + /** + * The channels that the latest call to {@link #received(ByteBuffer)} applied to. + * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()} + * on after handling the frames. + * + * Thread-safety: guarded by {@link #_receivedLock}. + */ + private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<AMQChannel>(); + private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); private final AMQStateManager _stateManager; @@ -160,6 +189,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final Broker _broker; private final Transport _transport; + private volatile boolean _closeWhenNoRoute; public AMQProtocolEngine(Broker broker, NetworkConnection network, @@ -184,6 +214,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _actor.message(ConnectionMessages.OPEN(null, null, null, false, false, false)); + _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE); + initialiseStatistics(); } @@ -253,17 +285,26 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { dataBlockReceived(dataBlock); } + catch(AMQConnectionException e) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e); + } + break; + } catch (Exception e) { _logger.error("Unexpected exception when processing datablock", e); closeProtocolSession(); + break; } } - receiveComplete(); + receivedComplete(); } catch (Exception e) { - _logger.error("Unexpected exception when processing datablock", e); + _logger.error("Unexpected exception when processing datablocks", e); closeProtocolSession(); } finally @@ -272,16 +313,45 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - private void receiveComplete() + private void receivedComplete() throws AMQException { - for (AMQChannel channel : _channelMap.values()) + Exception exception = null; + for (AMQChannel channel : _channelsForCurrentMessage) { - channel.receivedComplete(); + try + { + channel.receivedComplete(); + } + catch(Exception exceptionForThisChannel) + { + if(exception == null) + { + exception = exceptionForThisChannel; + } + _logger.error("Error informing channel that receiving is complete. Channel: " + channel, exceptionForThisChannel); + } } + _channelsForCurrentMessage.clear(); + + if(exception != null) + { + throw new AMQException( + AMQConstant.INTERNAL_ERROR, + "Error informing channel that receiving is complete: " + exception.getMessage(), + exception); + } } - public void dataBlockReceived(AMQDataBlock message) throws Exception + /** + * Process the data block. + * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}. + * + * @throws an AMQConnectionException if unable to process the data block. In this case, + * the connection is already closed by the time the exception is thrown. If any other + * type of exception is thrown, the connection is not already closed. + */ + private void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; if (message instanceof ProtocolInitiation) @@ -301,18 +371,40 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } + /** + * Handle the supplied frame. + * Adds this frame's channel to {@link #_channelsForCurrentMessage}. + * + * @throws an AMQConnectionException if unable to process the data block. In this case, + * the connection is already closed by the time the exception is thrown. If any other + * type of exception is thrown, the connection is not already closed. + */ private void frameReceived(AMQFrame frame) throws AMQException { int channelId = frame.getChannel(); + AMQChannel amqChannel = _channelMap.get(channelId); + if(amqChannel != null) + { + // The _receivedLock is already aquired in the caller + // It is safe to add channel + _channelsForCurrentMessage.add(amqChannel); + } + else + { + // Not an error. The frame is probably a channel Open for this channel id, which + // does not require asynchronous work therefore its absence from + // _channelsForCurrentMessage is ok. + } + AMQBody body = frame.getBodyFrame(); //Look up the Channel's Actor and set that as the current actor // If that is not available then we can use the ConnectionActor // that is associated with this AMQMPSession. LogActor channelActor = null; - if (_channelMap.get(channelId) != null) + if (amqChannel != null) { - channelActor = _channelMap.get(channelId).getLogActor(); + channelActor = amqChannel.getLogActor(); } CurrentActor.set(channelActor == null ? _actor : channelActor); @@ -349,6 +441,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { body.handle(channelId, this); } + catch(AMQConnectionException e) + { + _logger.info(e.getMessage() + " whilst processing frame: " + body); + closeConnection(channelId, e); + throw e; + } catch (AMQException e) { closeChannel(channelId); @@ -400,6 +498,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi QpidProperties.getBuildVersion()); serverProperties.setString(ServerPropertyNames.QPID_INSTANCE_NAME, _broker.getName()); + serverProperties.setString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, + String.valueOf(_closeWhenNoRoute)); AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), (short) pv.getActualMinorVersion(), @@ -720,6 +820,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * @throws AMQException if an error occurs closing the channel * @throws IllegalArgumentException if the channel id is not valid */ + @Override public void closeChannel(int channelId) throws AMQException { final AMQChannel channel = getChannel(channelId); @@ -819,12 +920,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } /** This must be called when the session is _closed in order to free up any resources managed by the session. */ + @Override public void closeSession() throws AMQException { if(_closing.compareAndSet(false,true)) { // force sync of outstanding async work - receiveComplete(); + _receivedLock.lock(); + try + { + receivedComplete(); + } + finally + { + _receivedLock.unlock(); + } // REMOVE THIS SHOULD NOT BE HERE. if (CurrentActor.get() == null) @@ -900,6 +1010,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } + @Override public void closeProtocolSession() { _network.close(); @@ -968,6 +1079,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _clientProperties = clientProperties; if (_clientProperties != null) { + Boolean closeWhenNoRoute = _clientProperties.getBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE); + if (closeWhenNoRoute != null) + { + _closeWhenNoRoute = closeWhenNoRoute; + if(_logger.isDebugEnabled()) + { + _logger.debug("Client set closeWhenNoRoute=" + _closeWhenNoRoute + " for protocol engine " + this); + } + } + _clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8); if (_clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8) != null) @@ -1538,4 +1659,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { return _lastWriteTime.get(); } + + @Override + public boolean isCloseWhenNoRoute() + { + return _closeWhenNoRoute; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index ba806c04bd..1842117d6f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -218,4 +218,11 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth public Principal getPeerPrincipal(); Lock getReceivedLock(); + + /** + * Used for 0-8/0-9/0-9-1 connections to choose to close + * the connection when a transactional session receives a 'mandatory' message which + * can't be routed rather than returning the message. + */ + boolean isCloseWhenNoRoute(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 8de19d9cff..1c8939d117 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -73,6 +73,7 @@ public class BrokerTestHelper when(subjectCreator.getMechanisms()).thenReturn(""); Broker broker = mock(Broker.class); when(broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT)).thenReturn(1); + when(broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(false); when(broker.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(10000l); when(broker.getId()).thenReturn(UUID.randomUUID()); when(broker.getSubjectCreator(any(SocketAddress.class))).thenReturn(subjectCreator); |
