summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2013-06-21 17:06:57 +0000
committerAlex Rudyy <orudyy@apache.org>2013-06-21 17:06:57 +0000
commite409124b9f3a7423fe4ab04e7ce3e446244d04e3 (patch)
tree95eb9be13518f19536314f7c0993fe40d84c70c9 /qpid/java/broker
parent8bdb080ef1f4afb1727dc3fc5f2666bdfd982107 (diff)
downloadqpid-python-e409124b9f3a7423fe4ab04e7ce3e446244d04e3.tar.gz
QPID-4943: Introduce a feature for 0-8/0-9/0-9-1 protocols to close a connection on receiving a mandatory unroutable message in a transacted session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1495511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java147
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java1
6 files changed, 209 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 8d88ee902a..8588aea2d4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQMethodBody;
@@ -324,14 +325,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
if(destinationQueues == null || destinationQueues.isEmpty())
{
- if (_currentMessage.isMandatory() || _currentMessage.isImmediate())
- {
- _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", _currentMessage));
- }
- else
- {
- _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
- }
+ handleUnroutableMessage();
}
else
{
@@ -378,6 +372,61 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
+ /**
+ * Either throws a {@link AMQConnectionException} or returns the message
+ *
+ * Pre-requisite: the current message is judged to have no destination queues.
+ *
+ * @throws AMQConnectionException if the message is mandatoryclose-on-no-route
+ * @see AMQProtocolSession#isCloseWhenNoRoute()
+ */
+ private void handleUnroutableMessage() throws AMQConnectionException
+ {
+ boolean mandatory = _currentMessage.isMandatory();
+ String description = currentMessageDescription();
+ boolean closeOnNoRoute = _session.isCloseWhenNoRoute();
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(String.format(
+ "Unroutable message %s, mandatory=%s, transactionalSession=%s, closeOnNoRoute=%s",
+ description, mandatory, isTransactional(), closeOnNoRoute));
+ }
+
+ if (mandatory && isTransactional() && _session.isCloseWhenNoRoute())
+ {
+ throw new AMQConnectionException(
+ AMQConstant.NO_ROUTE,
+ "No route for message " + currentMessageDescription(),
+ 0, 0, // default class and method ids
+ getProtocolSession().getProtocolVersion().getMajorVersion(),
+ getProtocolSession().getProtocolVersion().getMinorVersion(),
+ (Throwable) null);
+ }
+
+ if (mandatory || _currentMessage.isImmediate())
+ {
+ _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), _currentMessage));
+ }
+ else
+ {
+ _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
+ }
+ }
+
+ private String currentMessageDescription()
+ {
+ if(_currentMessage == null || !_currentMessage.allContentReceived())
+ {
+ throw new IllegalStateException("Cannot create message description for message: " + _currentMessage);
+ }
+
+ return String.format(
+ "[Exchange: %s, Routing key: %s]",
+ _currentMessage.getExchange(),
+ _currentMessage.getRoutingKey());
+ }
+
public void publishContentBody(ContentBody contentBody) throws AMQException
{
if (_currentMessage == null)
@@ -522,6 +571,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
*
* @throws AMQException if there is an error during closure
*/
+ @Override
public void close() throws AMQException
{
if(!_closing.compareAndSet(false, true))
@@ -1344,7 +1394,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
_message,
_channelId,
_errorCode.getCode(),
- new AMQShortString(_description));
+ AMQShortString.valueOf(_description, true, true));
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java
index 24fd687240..8f565362b4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -73,6 +73,7 @@ public interface Broker extends ConfiguredObject
String CONNECTION_SESSION_COUNT_LIMIT = "connection.sessionCountLimit";
String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay";
+ String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute";
String VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD = "virtualhost.housekeepingCheckPeriod";
String VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = "virtualhost.storeTransactionIdleTimeoutClose";
@@ -113,6 +114,7 @@ public interface Broker extends ConfiguredObject
VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD,
CONNECTION_SESSION_COUNT_LIMIT,
CONNECTION_HEART_BEAT_DELAY,
+ CONNECTION_CLOSE_WHEN_NO_ROUTE,
STATISTICS_REPORTING_PERIOD,
STATISTICS_REPORTING_RESET_ENABLED,
STORE_TYPE,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 678db43d58..ff9cac9a21 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -92,6 +92,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
put(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, Integer.class);
put(CONNECTION_SESSION_COUNT_LIMIT, Integer.class);
put(CONNECTION_HEART_BEAT_DELAY, Integer.class);
+ put(CONNECTION_CLOSE_WHEN_NO_ROUTE, Boolean.class);
put(STATISTICS_REPORTING_PERIOD, Integer.class);
put(NAME, String.class);
@@ -124,6 +125,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN = 0l;
public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = 0l;
public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_WARN = 0l;
+ public static final boolean DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE = true;
@SuppressWarnings("serial")
private static final Map<String, Object> DEFAULTS = Collections.unmodifiableMap(new HashMap<String, Object>(){{
@@ -141,6 +143,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, DEFAULT_HOUSEKEEPING_CHECK_PERIOD);
put(Broker.CONNECTION_HEART_BEAT_DELAY, DEFAULT_HEART_BEAT_DELAY);
put(Broker.CONNECTION_SESSION_COUNT_LIMIT, DEFAULT_SESSION_COUNT_LIMIT);
+ put(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE);
put(Broker.NAME, DEFAULT_NAME);
put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE);
put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index bbf90fad86..92d6683415 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -27,8 +27,10 @@ import java.nio.ByteBuffer;
import java.security.Principal;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -39,6 +41,7 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
@@ -47,7 +50,24 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -102,6 +122,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+ /**
+ * The channels that the latest call to {@link #received(ByteBuffer)} applied to.
+ * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
+ * on after handling the frames.
+ *
+ * Thread-safety: guarded by {@link #_receivedLock}.
+ */
+ private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<AMQChannel>();
+
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
private final AMQStateManager _stateManager;
@@ -160,6 +189,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final Broker _broker;
private final Transport _transport;
+ private volatile boolean _closeWhenNoRoute;
public AMQProtocolEngine(Broker broker,
NetworkConnection network,
@@ -184,6 +214,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_actor.message(ConnectionMessages.OPEN(null, null, null, false, false, false));
+ _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE);
+
initialiseStatistics();
}
@@ -253,17 +285,26 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
dataBlockReceived(dataBlock);
}
+ catch(AMQConnectionException e)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e);
+ }
+ break;
+ }
catch (Exception e)
{
_logger.error("Unexpected exception when processing datablock", e);
closeProtocolSession();
+ break;
}
}
- receiveComplete();
+ receivedComplete();
}
catch (Exception e)
{
- _logger.error("Unexpected exception when processing datablock", e);
+ _logger.error("Unexpected exception when processing datablocks", e);
closeProtocolSession();
}
finally
@@ -272,16 +313,45 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- private void receiveComplete()
+ private void receivedComplete() throws AMQException
{
- for (AMQChannel channel : _channelMap.values())
+ Exception exception = null;
+ for (AMQChannel channel : _channelsForCurrentMessage)
{
- channel.receivedComplete();
+ try
+ {
+ channel.receivedComplete();
+ }
+ catch(Exception exceptionForThisChannel)
+ {
+ if(exception == null)
+ {
+ exception = exceptionForThisChannel;
+ }
+ _logger.error("Error informing channel that receiving is complete. Channel: " + channel, exceptionForThisChannel);
+ }
}
+ _channelsForCurrentMessage.clear();
+
+ if(exception != null)
+ {
+ throw new AMQException(
+ AMQConstant.INTERNAL_ERROR,
+ "Error informing channel that receiving is complete: " + exception.getMessage(),
+ exception);
+ }
}
- public void dataBlockReceived(AMQDataBlock message) throws Exception
+ /**
+ * Process the data block.
+ * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}.
+ *
+ * @throws an AMQConnectionException if unable to process the data block. In this case,
+ * the connection is already closed by the time the exception is thrown. If any other
+ * type of exception is thrown, the connection is not already closed.
+ */
+ private void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
if (message instanceof ProtocolInitiation)
@@ -301,18 +371,40 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
+ /**
+ * Handle the supplied frame.
+ * Adds this frame's channel to {@link #_channelsForCurrentMessage}.
+ *
+ * @throws an AMQConnectionException if unable to process the data block. In this case,
+ * the connection is already closed by the time the exception is thrown. If any other
+ * type of exception is thrown, the connection is not already closed.
+ */
private void frameReceived(AMQFrame frame) throws AMQException
{
int channelId = frame.getChannel();
+ AMQChannel amqChannel = _channelMap.get(channelId);
+ if(amqChannel != null)
+ {
+ // The _receivedLock is already aquired in the caller
+ // It is safe to add channel
+ _channelsForCurrentMessage.add(amqChannel);
+ }
+ else
+ {
+ // Not an error. The frame is probably a channel Open for this channel id, which
+ // does not require asynchronous work therefore its absence from
+ // _channelsForCurrentMessage is ok.
+ }
+
AMQBody body = frame.getBodyFrame();
//Look up the Channel's Actor and set that as the current actor
// If that is not available then we can use the ConnectionActor
// that is associated with this AMQMPSession.
LogActor channelActor = null;
- if (_channelMap.get(channelId) != null)
+ if (amqChannel != null)
{
- channelActor = _channelMap.get(channelId).getLogActor();
+ channelActor = amqChannel.getLogActor();
}
CurrentActor.set(channelActor == null ? _actor : channelActor);
@@ -349,6 +441,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
body.handle(channelId, this);
}
+ catch(AMQConnectionException e)
+ {
+ _logger.info(e.getMessage() + " whilst processing frame: " + body);
+ closeConnection(channelId, e);
+ throw e;
+ }
catch (AMQException e)
{
closeChannel(channelId);
@@ -400,6 +498,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
QpidProperties.getBuildVersion());
serverProperties.setString(ServerPropertyNames.QPID_INSTANCE_NAME,
_broker.getName());
+ serverProperties.setString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE,
+ String.valueOf(_closeWhenNoRoute));
AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
(short) pv.getActualMinorVersion(),
@@ -720,6 +820,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
* @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
*/
+ @Override
public void closeChannel(int channelId) throws AMQException
{
final AMQChannel channel = getChannel(channelId);
@@ -819,12 +920,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
+ @Override
public void closeSession() throws AMQException
{
if(_closing.compareAndSet(false,true))
{
// force sync of outstanding async work
- receiveComplete();
+ _receivedLock.lock();
+ try
+ {
+ receivedComplete();
+ }
+ finally
+ {
+ _receivedLock.unlock();
+ }
// REMOVE THIS SHOULD NOT BE HERE.
if (CurrentActor.get() == null)
@@ -900,6 +1010,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
+ @Override
public void closeProtocolSession()
{
_network.close();
@@ -968,6 +1079,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_clientProperties = clientProperties;
if (_clientProperties != null)
{
+ Boolean closeWhenNoRoute = _clientProperties.getBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
+ if (closeWhenNoRoute != null)
+ {
+ _closeWhenNoRoute = closeWhenNoRoute;
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Client set closeWhenNoRoute=" + _closeWhenNoRoute + " for protocol engine " + this);
+ }
+ }
+
_clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
if (_clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8) != null)
@@ -1538,4 +1659,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
return _lastWriteTime.get();
}
+
+ @Override
+ public boolean isCloseWhenNoRoute()
+ {
+ return _closeWhenNoRoute;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index ba806c04bd..1842117d6f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -218,4 +218,11 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
public Principal getPeerPrincipal();
Lock getReceivedLock();
+
+ /**
+ * Used for 0-8/0-9/0-9-1 connections to choose to close
+ * the connection when a transactional session receives a 'mandatory' message which
+ * can't be routed rather than returning the message.
+ */
+ boolean isCloseWhenNoRoute();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index 8de19d9cff..1c8939d117 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -73,6 +73,7 @@ public class BrokerTestHelper
when(subjectCreator.getMechanisms()).thenReturn("");
Broker broker = mock(Broker.class);
when(broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT)).thenReturn(1);
+ when(broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(false);
when(broker.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(10000l);
when(broker.getId()).thenReturn(UUID.randomUUID());
when(broker.getSubjectCreator(any(SocketAddress.class))).thenReturn(subjectCreator);