summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2013-06-21 17:06:57 +0000
committerAlex Rudyy <orudyy@apache.org>2013-06-21 17:06:57 +0000
commite409124b9f3a7423fe4ab04e7ce3e446244d04e3 (patch)
tree95eb9be13518f19536314f7c0993fe40d84c70c9 /qpid/java/client
parent8bdb080ef1f4afb1727dc3fc5f2666bdfd982107 (diff)
downloadqpid-python-e409124b9f3a7423fe4ab04e7ce3e446244d04e3.tar.gz
QPID-4943: Introduce a feature for 0-8/0-9/0-9-1 protocols to close a connection on receiving a mandatory unroutable message in a transacted session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1495511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java15
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java74
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java17
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java13
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelperTest.java77
5 files changed, 191 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 4e885258b9..74c9878a8e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -844,7 +844,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- public void close() throws JMSException
+ public void close() throws JMSException
{
close(DEFAULT_TIMEOUT);
}
@@ -859,9 +859,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (!setClosed())
{
setClosing(true);
- try{
+ try
+ {
doClose(sessions, timeout);
- }finally{
+ }
+ finally
+ {
setClosing(false);
}
}
@@ -1594,4 +1597,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _validateQueueOnSend;
}
+
+ @Override
+ protected boolean setClosed()
+ {
+ return super.setClosed();
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java
new file mode 100644
index 0000000000..baae072167
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.properties.ConnectionStartProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used during connection establishment to optionally set the "close when no route" client property
+ */
+class CloseWhenNoRouteSettingsHelper
+{
+ private static final Logger _log = LoggerFactory.getLogger(CloseWhenNoRouteSettingsHelper.class);
+
+ /**
+ * @param url the client's connection URL which may contain the option
+ * {@value ConnectionStartProperties#QPID_CLOSE_WHEN_NO_ROUTE}
+ * @param serverProperties the properties received from the broker which may contain the option
+ * {@value ConnectionStartProperties#QPID_CLOSE_WHEN_NO_ROUTE}
+ * @param clientProperties the client properties to optionally set the close-when-no-route option on
+ */
+ public void setClientProperties(FieldTable clientProperties, ConnectionURL url, FieldTable serverProperties)
+ {
+ boolean brokerSupportsCloseWhenNoRoute =
+ serverProperties != null && serverProperties.containsKey(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
+ boolean brokerCloseWhenNoRoute = brokerSupportsCloseWhenNoRoute &&
+ Boolean.parseBoolean(serverProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE));
+
+ String closeWhenNoRouteOption = url.getOption(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE);
+ if(closeWhenNoRouteOption != null)
+ {
+ if(brokerSupportsCloseWhenNoRoute)
+ {
+ boolean desiredCloseWhenNoRoute = Boolean.valueOf(closeWhenNoRouteOption);
+ if(desiredCloseWhenNoRoute != brokerCloseWhenNoRoute)
+ {
+ clientProperties.setBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, desiredCloseWhenNoRoute);
+ _log.debug(
+ "Set client property {} to {}",
+ ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, desiredCloseWhenNoRoute);
+ }
+ else
+ {
+ _log.debug(
+ "Client's desired {} value {} already matches the server's",
+ ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, desiredCloseWhenNoRoute);
+ }
+ }
+ else
+ {
+ _log.warn("The broker being connected to does not support the " + ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE + " option");
+ }
+ }
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index 66c4821f60..366b5f115e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -37,6 +37,7 @@ import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.properties.ConnectionStartProperties;
import javax.security.sasl.Sasl;
@@ -51,6 +52,8 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
private static final ConnectionStartMethodHandler _instance = new ConnectionStartMethodHandler();
+ private final CloseWhenNoRouteSettingsHelper _closeWhenNoRouteHelper = new CloseWhenNoRouteSettingsHelper();
+
public static ConnectionStartMethodHandler getInstance()
{
return _instance;
@@ -59,6 +62,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
private ConnectionStartMethodHandler()
{ }
+ @Override
public void methodReceived(AMQProtocolSession session, ConnectionStartBody body, int channelId)
throws AMQException
{
@@ -147,6 +151,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
}
session.getStateManager().changeState(AMQState.CONNECTION_NOT_TUNED);
+
FieldTable clientProperties = FieldTableFactory.newFieldTable();
clientProperties.setString(ConnectionStartProperties.CLIENT_ID_0_8,
@@ -162,12 +167,16 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
clientProperties.setInteger(ConnectionStartProperties.PID,
ConnectionStartProperties.getPID());
+ FieldTable serverProperties = body.getServerProperties();
+ ConnectionURL url = getConnectionURL(session);
+ _closeWhenNoRouteHelper.setClientProperties(clientProperties, url, serverProperties);
+
ConnectionStartOkBody connectionStartOkBody = session.getMethodRegistry().createConnectionStartOkBody(clientProperties,new AMQShortString(mechanism),saslResponse,new AMQShortString(locales));
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
session.writeFrame(connectionStartOkBody.generateFrame(channelId));
-
+
}
catch (UnsupportedEncodingException e)
{
@@ -195,7 +204,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
try
{
AMQCallbackHandler instance = CallbackHandlerRegistry.getInstance().createCallbackHandler(mechanism);
- instance.initialise(protocolSession.getAMQConnection().getConnectionURL());
+ instance.initialise(getConnectionURL(protocolSession));
return instance;
}
@@ -205,4 +214,8 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
}
}
+ private ConnectionURL getConnectionURL(AMQProtocolSession protocolSession)
+ {
+ return protocolSession.getAMQConnection().getConnectionURL();
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index c4fbeb5607..3050e84419 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -57,6 +57,19 @@ public interface ConnectionURL
* the DLQ (or dropped) when delivery count exceeds the maximum.
*/
public static final String OPTIONS_REJECT_BEHAVIOUR = "rejectbehaviour";
+
+ /**
+ * <p>
+ * This option is only applicable for 0-8/0-9/0-9-1 protocol connections.
+ * </p>
+ * <p>
+ * It tells the client to request whether the broker should close the
+ * connection when a mandatory message isn't routable, rather than return
+ * the message to the client as it normally would.
+ * </p>
+ */
+ public static final String OPTIONS_CLOSE_WHEN_NO_ROUTE = "closeWhenNoRoute";
+
public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange";
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelperTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelperTest.java
new file mode 100644
index 0000000000..f1d7a76c75
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelperTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.handler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.properties.ConnectionStartProperties;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class CloseWhenNoRouteSettingsHelperTest extends QpidTestCase
+{
+ private static final String FALSE_STR = Boolean.toString(false);
+ private static final String TRUE_STR = Boolean.toString(true);
+
+ private final CloseWhenNoRouteSettingsHelper _closeWhenNoRouteSettingsHelper = new CloseWhenNoRouteSettingsHelper();
+
+ public void testCloseWhenNoRouteNegotiation()
+ {
+ test("Nothing should be set if option not in URL",
+ null,
+ true,
+ null);
+ test("Client should disable broker's enabled option",
+ FALSE_STR,
+ true,
+ false);
+ test("Client should be able to disable broker's enabled option",
+ TRUE_STR,
+ false,
+ true);
+ test("Client should not enable option if unsupported by broker",
+ TRUE_STR,
+ null,
+ null);
+ test("Malformed client option should evaluate to false",
+ "malformed boolean",
+ true,
+ false);
+ }
+
+ private void test(String message, String urlOption, Boolean serverOption, Boolean expectedClientProperty)
+ {
+ ConnectionURL url = mock(ConnectionURL.class);
+ when(url.getOption(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE)).thenReturn(urlOption);
+
+ FieldTable serverProperties = new FieldTable();
+ if(serverOption != null)
+ {
+ serverProperties.setBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, serverOption);
+ }
+
+ FieldTable clientProperties = new FieldTable();
+
+ _closeWhenNoRouteSettingsHelper.setClientProperties(clientProperties, url, serverProperties);
+
+ assertEquals(message, expectedClientProperty, clientProperties.getBoolean(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE));
+ }
+}