diff options
| author | Keith Wall <kwall@apache.org> | 2011-09-19 08:16:29 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-09-19 08:16:29 +0000 |
| commit | 5bfcfdb2ef8190a2a3f674b90e53477d34754937 (patch) | |
| tree | 19dbd8a5a76b22249721ee62790280d493a10d59 /java | |
| parent | f1b244ae11f6edf458c153967d1cfca054297212 (diff) | |
| download | qpid-python-5bfcfdb2ef8190a2a3f674b90e53477d34754937.tar.gz | |
QPID-3415: Change 0-10 code path to utilise the CallbackHandlerRegistry to create the correct CallbackHandler. The sasl_mechs property/broker option is retained, but continues to be understood only by the 0-10 path.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1172506 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
14 files changed, 652 insertions, 413 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 63342bdb26..0ed3db6ecb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -35,6 +35,7 @@ import javax.jms.XASession; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.transport.ClientConnectionDelegate; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; @@ -194,6 +195,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } ConnectionSettings conSettings = retriveConnectionSettings(brokerDetail); + _qpidConnection.setConnectionDelegate(new ClientConnectionDelegate(conSettings, _conn.getConnectionURL())); _qpidConnection.connect(conSettings); _conn._connected = true; diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 2b49bb8f81..939bd181a3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.client.handler; +import java.io.UnsupportedEncodingException; +import java.util.StringTokenizer; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.security.AMQCallbackHandler; @@ -34,18 +41,9 @@ import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.ProtocolVersion; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - -import java.io.UnsupportedEncodingException; -import java.util.HashSet; -import java.util.StringTokenizer; - public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody> { private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class); @@ -197,40 +195,20 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co private String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException { final String mechanisms = new String(availableMechanisms, "utf8"); - StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); - HashSet mechanismSet = new HashSet(); - while (tokenizer.hasMoreTokens()) - { - mechanismSet.add(tokenizer.nextToken()); - } - - String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); - StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " "); - while (prefTokenizer.hasMoreTokens()) - { - String mech = prefTokenizer.nextToken(); - if (mechanismSet.contains(mech)) - { - return mech; - } - } - - return null; + return CallbackHandlerRegistry.getInstance().selectMechanism(mechanisms); } private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession) throws AMQException { - Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism); try { - Object instance = mechanismClass.newInstance(); - AMQCallbackHandler cbh = (AMQCallbackHandler) instance; - cbh.initialise(protocolSession.getAMQConnection().getConnectionURL()); + AMQCallbackHandler instance = CallbackHandlerRegistry.getInstance().createCallbackHandler(mechanism); + instance.initialise(protocolSession.getAMQConnection().getConnectionURL()); - return cbh; + return instance; } - catch (Exception e) + catch (IllegalArgumentException e) { throw new AMQException(null, "Unable to create callback handler: " + e, e); } diff --git a/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java b/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java index 140cbdeb75..14bae68561 100644 --- a/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java @@ -20,17 +20,22 @@ */ package org.apache.qpid.client.security; -import org.apache.qpid.util.FileUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.io.InputStream; +import java.util.Collection; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.StringTokenizer; +import java.util.TreeMap; + +import org.apache.qpid.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * CallbackHandlerRegistry is a registry for call back handlers for user authentication and interaction during user @@ -42,7 +47,7 @@ import java.util.Properties; * "amp.callbackhandler.properties". The format of the properties file is: * * <p/><pre> - * CallbackHanlder.mechanism=fully.qualified.class.name + * CallbackHanlder.n.mechanism=fully.qualified.class.name where n is an ordinal * </pre> * * <p/>Where mechanism is an IANA-registered mechanism name and the fully qualified class name refers to a @@ -66,51 +71,15 @@ public class CallbackHandlerRegistry public static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/client/security/CallbackHandlerRegistry.properties"; /** A static reference to the singleton instance of this registry. */ - private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry(); + private static final CallbackHandlerRegistry _instance; /** Holds a map from SASL mechanism names to call back handlers. */ - private Map<String, Class> _mechanismToHandlerClassMap = new HashMap<String, Class>(); - - /** Holds a space delimited list of mechanisms that callback handlers exist for. */ - private String _mechanisms; - - /** - * Gets the singleton instance of this registry. - * - * @return The singleton instance of this registry. - */ - public static CallbackHandlerRegistry getInstance() - { - return _instance; - } + private Map<String, Class<AMQCallbackHandler>> _mechanismToHandlerClassMap = new HashMap<String, Class<AMQCallbackHandler>>(); - /** - * Gets the callback handler class for a given SASL mechanism name. - * - * @param mechanism The SASL mechanism name. - * - * @return The callback handler class for the mechanism, or null if none is configured for that mechanism. - */ - public Class getCallbackHandlerClass(String mechanism) - { - return (Class) _mechanismToHandlerClassMap.get(mechanism); - } + /** Ordered collection of mechanisms for which callback handlers exist. */ + private Collection<String> _mechanisms; - /** - * Gets a space delimited list of supported SASL mechanisms. - * - * @return A space delimited list of supported SASL mechanisms. - */ - public String getMechanisms() - { - return _mechanisms; - } - - /** - * Creates the call back handler registry from its configuration resource or file. This also has the side effect - * of configuring and registering the SASL client factory implementations using {@link DynamicSaslRegistrar}. - */ - private CallbackHandlerRegistry() + static { // Register any configured SASL client factories. DynamicSaslRegistrar.registerSaslProviders(); @@ -120,12 +89,12 @@ public class CallbackHandlerRegistry FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME, CallbackHandlerRegistry.class.getClassLoader()); + final Properties props = new Properties(); + try { - Properties props = new Properties(); + props.load(is); - parseProperties(props); - _logger.info("Callback handlers available for SASL mechanisms: " + _mechanisms); } catch (IOException e) { @@ -146,32 +115,68 @@ public class CallbackHandlerRegistry } } } + + _instance = new CallbackHandlerRegistry(props); + _logger.info("Callback handlers available for SASL mechanisms: " + _instance._mechanisms); + } - /*private InputStream openPropertiesInputStream(String filename) + /** + * Gets the singleton instance of this registry. + * + * @return The singleton instance of this registry. + */ + public static CallbackHandlerRegistry getInstance() + { + return _instance; + } + + public AMQCallbackHandler createCallbackHandler(final String mechanism) { - boolean useDefault = true; - InputStream is = null; - if (filename != null) + final Class<AMQCallbackHandler> mechanismClass = _mechanismToHandlerClassMap.get(mechanism); + + if (mechanismClass == null) { - try - { - is = new BufferedInputStream(new FileInputStream(new File(filename))); - useDefault = false; - } - catch (FileNotFoundException e) - { - _logger.error("Unable to read from file " + filename + ": " + e, e); - } + throw new IllegalArgumentException("Mechanism " + mechanism + " not known"); } - if (useDefault) + try + { + return mechanismClass.newInstance(); + } + catch (InstantiationException e) + { + throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e); + } + catch (IllegalAccessException e) { - is = CallbackHandlerRegistry.class.getResourceAsStream(DEFAULT_RESOURCE_NAME); + throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e); } + } - return is; - }*/ + /** + * Gets collections of supported SASL mechanism names, ordered by preference + * + * @return collection of SASL mechanism names. + */ + public Collection<String> getMechanisms() + { + return Collections.unmodifiableCollection(_mechanisms); + } + + /** + * Creates the call back handler registry from its configuration resource or file. + * + * This also has the side effect of configuring and registering the SASL client factory + * implementations using {@link DynamicSaslRegistrar}. + * + * This constructor is default protection to allow for effective unit testing. Clients must use + * {@link #getInstance()} to obtain the singleton instance. + */ + CallbackHandlerRegistry(final Properties props) + { + parseProperties(props); + } /** * Scans the specified properties as a mapping from IANA registered SASL mechanism to call back handler @@ -183,20 +188,20 @@ public class CallbackHandlerRegistry */ private void parseProperties(Properties props) { + + final Map<Integer, String> mechanisms = new TreeMap<Integer, String>(); + Enumeration e = props.propertyNames(); while (e.hasMoreElements()) { - String propertyName = (String) e.nextElement(); - int period = propertyName.indexOf("."); - if (period < 0) - { - _logger.warn("Unable to parse property " + propertyName + " when configuring SASL providers"); + final String propertyName = (String) e.nextElement(); + final String[] parts = propertyName.split("\\.", 2); - continue; - } + checkPropertyNameFormat(propertyName, parts); - String mechanism = propertyName.substring(period + 1); - String className = props.getProperty(propertyName); + final String mechanism = parts[0]; + final int ordinal = getPropertyOrdinal(propertyName, parts); + final String className = props.getProperty(propertyName); Class clazz = null; try { @@ -205,20 +210,11 @@ public class CallbackHandlerRegistry { _logger.warn("SASL provider " + clazz + " does not implement " + AMQCallbackHandler.class + ". Skipping"); - continue; } - _mechanismToHandlerClassMap.put(mechanism, clazz); - if (_mechanisms == null) - { - _mechanisms = mechanism; - } - else - { - // one time cost - _mechanisms = _mechanisms + " " + mechanism; - } + + mechanisms.put(ordinal, mechanism); } catch (ClassNotFoundException ex) { @@ -227,5 +223,91 @@ public class CallbackHandlerRegistry continue; } } + + _mechanisms = mechanisms.values(); // order guaranteed by keys of treemap (i.e. our ordinals) + + + } + + private void checkPropertyNameFormat(final String propertyName, final String[] parts) + { + if (parts.length != 2) + { + throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers"); + } + } + + private int getPropertyOrdinal(final String propertyName, final String[] parts) + { + try + { + return Integer.parseInt(parts[1]); + } + catch(NumberFormatException nfe) + { + throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers", nfe); + } + } + + /** + * Selects a SASL mechanism that is mutually available to both parties. If more than one + * mechanism is mutually available the one appearing first (by ordinal) will be returned. + * + * @param peerMechanismList space separated list of mechanisms + * @return selected mechanism, or null if none available + */ + public String selectMechanism(final String peerMechanismList) + { + final Set<String> peerList = mechListToSet(peerMechanismList); + + return selectMechInternal(peerList, Collections.<String>emptySet()); + } + + /** + * Selects a SASL mechanism that is mutually available to both parties. + * + * @param peerMechanismList space separated list of mechanisms + * @param restrictionList space separated list of mechanisms + * @return selected mechanism, or null if none available + */ + public String selectMechanism(final String peerMechanismList, final String restrictionList) + { + final Set<String> peerList = mechListToSet(peerMechanismList); + final Set<String> restrictionSet = mechListToSet(restrictionList); + + return selectMechInternal(peerList, restrictionSet); + } + + private String selectMechInternal(final Set<String> peerSet, final Set<String> restrictionSet) + { + for (final String mech : _mechanisms) + { + if (peerSet.contains(mech)) + { + if (restrictionSet.isEmpty() || restrictionSet.contains(mech)) + { + return mech; + } + } + } + + return null; + } + + private Set<String> mechListToSet(final String mechanismList) + { + if (mechanismList == null) + { + return Collections.emptySet(); + } + + final StringTokenizer tokenizer = new StringTokenizer(mechanismList, " "); + final Set<String> mechanismSet = new HashSet<String>(tokenizer.countTokens()); + while (tokenizer.hasMoreTokens()) + { + mechanismSet.add(tokenizer.nextToken()); + } + return Collections.unmodifiableSet(mechanismSet); } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties b/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties index 1fcfde3579..b04a756e80 100644 --- a/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties +++ b/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties @@ -16,7 +16,17 @@ # specific language governing permissions and limitations # under the License. # -CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler -CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler -CallbackHandler.AMQPLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler -CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler + +# +# Format: +# <mechanism name>.ordinal=<implementation> +# +# @see CallbackHandlerRegistry +# + +EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler +GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler +CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler +CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler +AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler +PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java new file mode 100644 index 0000000000..1b483f6948 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java @@ -0,0 +1,168 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.transport; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +import org.apache.qpid.client.security.AMQCallbackHandler; +import org.apache.qpid.client.security.CallbackHandlerRegistry; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.transport.ClientDelegate; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionException; +import org.apache.qpid.transport.ConnectionOpenOk; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.util.Strings; +import org.ietf.jgss.GSSContext; +import org.ietf.jgss.GSSException; +import org.ietf.jgss.GSSManager; +import org.ietf.jgss.GSSName; +import org.ietf.jgss.Oid; + +/** + * + */ +public class ClientConnectionDelegate extends ClientDelegate +{ + private static final Logger LOGGER = Logger.get(ClientDelegate.class); + + private static final String KRB5_OID_STR = "1.2.840.113554.1.2.2"; + protected static final Oid KRB5_OID; + + static + { + Oid oid; + try + { + oid = new Oid(KRB5_OID_STR); + } + catch (GSSException ignore) + { + oid = null; + } + + KRB5_OID = oid; + } + + private final ConnectionURL _connectionURL; + + /** + * @param settings + * @param connectionURL + */ + public ClientConnectionDelegate(ConnectionSettings settings, ConnectionURL connectionURL) + { + super(settings); + this._connectionURL = connectionURL; + } + + @Override + protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException, SaslException + { + final String brokerMechanisms = Strings.join(" ", brokerMechs); + final String restrictionList = _conSettings.getSaslMechs(); + final String selectedMech = CallbackHandlerRegistry.getInstance().selectMechanism(brokerMechanisms, restrictionList); + if (selectedMech == null) + { + throw new ConnectionException("Client and broker have no SASL mechanisms in common." + + " Broker allows : " + brokerMechanisms + + " Client has : " + CallbackHandlerRegistry.getInstance().getMechanisms() + + " Client restricted itself to : " + (restrictionList != null ? restrictionList : "no restriction")); + } + + Map<String,Object> saslProps = new HashMap<String,Object>(); + if (_conSettings.isUseSASLEncryption()) + { + saslProps.put(Sasl.QOP, "auth-conf"); + } + + final AMQCallbackHandler handler = CallbackHandlerRegistry.getInstance().createCallbackHandler(selectedMech); + handler.initialise(_connectionURL); + final SaslClient sc = Sasl.createSaslClient(new String[] {selectedMech}, null, _conSettings.getSaslProtocol(), _conSettings.getSaslServerName(), saslProps, handler); + + return sc; + } + + @Override + public void connectionOpenOk(Connection conn, ConnectionOpenOk ok) + { + SaslClient sc = conn.getSaslClient(); + if (sc != null) + { + if (sc.getMechanismName().equals("GSSAPI")) + { + String id = getKerberosUser(); + if (id != null) + { + conn.setUserID(id); + } + } + else if (sc.getMechanismName().equals("EXTERNAL")) + { + if (conn.getSecurityLayer() != null) + { + conn.setUserID(conn.getSecurityLayer().getUserID()); + } + } + } + + super.connectionOpenOk(conn, ok); + } + + private String getKerberosUser() + { + LOGGER.debug("Obtaining userID from kerberos"); + String service = _conSettings.getSaslProtocol() + "@" + _conSettings.getSaslServerName(); + GSSManager manager = GSSManager.getInstance(); + + try + { + GSSName acceptorName = manager.createName(service, + GSSName.NT_HOSTBASED_SERVICE, KRB5_OID); + + GSSContext secCtx = manager.createContext(acceptorName, + KRB5_OID, + null, + GSSContext.INDEFINITE_LIFETIME); + + secCtx.initSecContext(new byte[0], 0, 1); + + if (secCtx.getSrcName() != null) + { + return secCtx.getSrcName().toString(); + } + + } + catch (GSSException e) + { + LOGGER.warn("Unable to retrieve userID from Kerberos due to error",e); + } + + return null; + } +} diff --git a/java/client/src/test/java/org/apache/qpid/client/security/CallbackHandlerRegistryTest.java b/java/client/src/test/java/org/apache/qpid/client/security/CallbackHandlerRegistryTest.java new file mode 100644 index 0000000000..cc5d48fbef --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/security/CallbackHandlerRegistryTest.java @@ -0,0 +1,185 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.security; + +import java.io.IOException; +import java.util.Properties; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.UnsupportedCallbackException; + +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.QpidTestCase; + + +/** + * Tests the ability of {@link CallbackHandlerRegistry} to correctly parse + * the properties describing the available callback handlers. Ensures also + * that it is able to select the mechanism and create an implementation + * given a variety of starting conditions. + * + */ +public class CallbackHandlerRegistryTest extends QpidTestCase +{ + private CallbackHandlerRegistry _registry; // Object under test + + public void testCreateHandlerSuccess() + { + final Properties props = new Properties(); + props.put("TESTA.1", TestACallbackHandler.class.getName()); + + _registry = new CallbackHandlerRegistry(props); + assertEquals(1,_registry.getMechanisms().size()); + + final CallbackHandler handler = _registry.createCallbackHandler("TESTA"); + assertTrue(handler instanceof TestACallbackHandler); + } + + public void testCreateHandlerForUnknownMechanismName() + { + final Properties props = new Properties(); + props.put("TEST1.1", TestACallbackHandler.class.getName()); + + _registry = new CallbackHandlerRegistry(props); + + try + { + _registry.createCallbackHandler("NOTFOUND"); + fail("Exception not thrown"); + } + catch (IllegalArgumentException iae) + { + // PASS + } + } + + public void testSelectMechanism() + { + final Properties props = new Properties(); + props.put("TESTA.1", TestACallbackHandler.class.getName()); + props.put("TESTB.2", TestBCallbackHandler.class.getName()); + + _registry = new CallbackHandlerRegistry(props); + assertEquals(2,_registry.getMechanisms().size()); + + final String selectedMechanism = _registry.selectMechanism("TESTA"); + assertEquals("TESTA", selectedMechanism); + } + + public void testSelectReturnsFirstMutallyAvailableMechanism() + { + final Properties props = new Properties(); + props.put("TESTA.1", TestACallbackHandler.class.getName()); + props.put("TESTB.2", TestBCallbackHandler.class.getName()); + + _registry = new CallbackHandlerRegistry(props); + + final String selectedMechanism = _registry.selectMechanism("TESTD TESTB TESTA"); + // TESTA should be returned as it is higher than TESTB in the properties file. + assertEquals("Selected mechanism should respect the ordinal", "TESTA", selectedMechanism); + } + + public void testRestrictedSelectReturnsMechanismFromRestrictedList() + { + final Properties props = new Properties(); + props.put("TESTA.1", TestACallbackHandler.class.getName()); + props.put("TESTB.2", TestBCallbackHandler.class.getName()); + props.put("TESTC.3", TestCCallbackHandler.class.getName()); + + _registry = new CallbackHandlerRegistry(props); + + final String selectedMechanism = _registry.selectMechanism("TESTC TESTB TESTA", "TESTB TESTC"); + // TESTB should be returned as client has restricted the mechanism list to TESTB and TESTC + assertEquals("Selected mechanism should respect the ordinal and be limitted by restricted list","TESTB", selectedMechanism); + } + + public void testOldPropertyFormatRejected() + { + final Properties props = new Properties(); + props.put("CallbackHandler.TESTA", TestACallbackHandler.class.getName()); + + try + { + new CallbackHandlerRegistry(props); + fail("exception not thrown"); + } + catch(IllegalArgumentException iae) + { + // PASS + } + } + + public void testPropertyWithNonnumericalOrdinal() + { + final Properties props = new Properties(); + props.put("TESTA.z", TestACallbackHandler.class.getName()); + try + { + new CallbackHandlerRegistry(props); + fail("exception not thrown"); + } + catch(IllegalArgumentException iae) + { + // PASS + } + } + + public void testUnexpectedCallbackImplementationsIgnored() + { + final Properties props = new Properties(); + props.put("TESTA.1", TestACallbackHandler.class.getName()); + props.put("TESTB.2", "NotFound"); + props.put("TESTC.3", "java.lang.String"); + + _registry = new CallbackHandlerRegistry(props); + + assertEquals(1,_registry.getMechanisms().size()); + } + + static class TestACallbackHandler extends TestCallbackHandler + { + } + + static class TestBCallbackHandler extends TestCallbackHandler + { + } + + static class TestCCallbackHandler extends TestCallbackHandler + { + } + + static abstract class TestCallbackHandler implements AMQCallbackHandler + { + @Override + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException + { + throw new UnsupportedOperationException(); + } + + @Override + public void initialise(ConnectionURL connectionURL) + { + throw new UnsupportedOperationException(); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/security/AMQPCallbackHandler.java b/java/common/src/main/java/org/apache/qpid/security/AMQPCallbackHandler.java deleted file mode 100644 index a3dad9acdc..0000000000 --- a/java/common/src/main/java/org/apache/qpid/security/AMQPCallbackHandler.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.security; - -import javax.security.auth.callback.CallbackHandler; - -public interface AMQPCallbackHandler extends CallbackHandler -{ - void initialise(String username,String password); -} diff --git a/java/common/src/main/java/org/apache/qpid/security/UsernamePasswordCallbackHandler.java b/java/common/src/main/java/org/apache/qpid/security/UsernamePasswordCallbackHandler.java deleted file mode 100644 index 89a63abeab..0000000000 --- a/java/common/src/main/java/org/apache/qpid/security/UsernamePasswordCallbackHandler.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.security; - -import java.io.IOException; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; - -public class UsernamePasswordCallbackHandler implements AMQPCallbackHandler -{ - private String _username; - private String _password; - - public void initialise(String username,String password) - { - _username = username; - _password = password; - } - - public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException - { - for (int i = 0; i < callbacks.length; i++) - { - Callback cb = callbacks[i]; - if (cb instanceof NameCallback) - { - ((NameCallback)cb).setName(_username); - } - else if (cb instanceof PasswordCallback) - { - ((PasswordCallback)cb).setPassword((_password).toCharArray()); - } - else - { - throw new UnsupportedCallbackException(cb); - } - } - } -} diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index e421f06901..9bdad6b00e 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -20,28 +20,20 @@ */ package org.apache.qpid.transport; -import org.ietf.jgss.GSSContext; -import org.ietf.jgss.GSSException; -import org.ietf.jgss.GSSManager; -import org.ietf.jgss.GSSName; -import org.ietf.jgss.Oid; - -import org.apache.qpid.security.UsernamePasswordCallbackHandler; import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.RESUMING; -import org.apache.qpid.transport.util.Logger; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +import org.apache.qpid.transport.util.Logger; + /** * ClientDelegate @@ -52,31 +44,13 @@ public class ClientDelegate extends ConnectionDelegate { private static final Logger log = Logger.get(ClientDelegate.class); - private static final String KRB5_OID_STR = "1.2.840.113554.1.2.2"; - protected static final Oid KRB5_OID; - static - { - Oid oid; - try - { - oid = new Oid(KRB5_OID_STR); - } - catch (GSSException ignore) - { - oid = null; - } - KRB5_OID = oid; - } - - private List<String> clientMechs; - private ConnectionSettings conSettings; + protected final ConnectionSettings _conSettings; public ClientDelegate(ConnectionSettings settings) { - this.conSettings = settings; - this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" ")); + this._conSettings = settings; } public void init(Connection conn, ProtocolHeader hdr) @@ -92,9 +66,9 @@ public class ClientDelegate extends ConnectionDelegate { Map<String,Object> clientProperties = new HashMap<String,Object>(); - if(this.conSettings.getClientProperties() != null) + if(this._conSettings.getClientProperties() != null) { - clientProperties.putAll(this.conSettings.getClientProperties()); + clientProperties.putAll(_conSettings.getClientProperties()); } clientProperties.put("qpid.session_flow", 1); @@ -109,41 +83,12 @@ public class ClientDelegate extends ConnectionDelegate (clientProperties, null, null, conn.getLocale()); return; } - - List<String> choosenMechs = new ArrayList<String>(); - for (String mech:clientMechs) - { - if (brokerMechs.contains(mech)) - { - choosenMechs.add(mech); - } - } - - if (choosenMechs.size() == 0) - { - conn.exception(new ConnectionException("The following SASL mechanisms " + - clientMechs.toString() + - " specified by the client are not supported by the broker")); - return; - } - - String[] mechs = new String[choosenMechs.size()]; - choosenMechs.toArray(mechs); - conn.setServerProperties(start.getServerProperties()); try { - Map<String,Object> saslProps = new HashMap<String,Object>(); - if (conSettings.isUseSASLEncryption()) - { - saslProps.put(Sasl.QOP, "auth-conf"); - } - UsernamePasswordCallbackHandler handler = - new UsernamePasswordCallbackHandler(); - handler.initialise(conSettings.getUsername(), conSettings.getPassword()); - SaslClient sc = Sasl.createSaslClient - (mechs, null, conSettings.getSaslProtocol(), conSettings.getSaslServerName(), saslProps, handler); + final SaslClient sc = createSaslClient(brokerMechs); + conn.setSaslClient(sc); byte[] response = sc.hasInitialResponse() ? @@ -152,12 +97,22 @@ public class ClientDelegate extends ConnectionDelegate (clientProperties, sc.getMechanismName(), response, conn.getLocale()); } + catch (ConnectionException ce) + { + conn.exception(ce); + } catch (SaslException e) { conn.exception(e); } } + + protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException, SaslException + { + throw new UnsupportedOperationException(); + } + @Override public void connectionSecure(Connection conn, ConnectionSecure secure) { @@ -176,7 +131,7 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionTune(Connection conn, ConnectionTune tune) { - int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(), + int hb_interval = calculateHeartbeatInterval(_conSettings.getHeartbeatInterval(), tune.getHeartbeatMin(), tune.getHeartbeatMax() ); @@ -191,29 +146,12 @@ public class ClientDelegate extends ConnectionDelegate //(or that forced by protocol limitations [0xFFFF]) conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax); - conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST); + conn.connectionOpen(_conSettings.getVhost(), null, Option.INSIST); } @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok) { - SaslClient sc = conn.getSaslClient(); - if (sc != null) - { - if (sc.getMechanismName().equals("GSSAPI")) - { - String id = getKerberosUser(); - if (id != null) - { - conn.setUserID(id); - } - } - else if (sc.getMechanismName().equals("EXTERNAL")) - { - conn.setUserID(conn.getSecurityLayer().getUserID()); - } - } - if (conn.isConnectionResuming()) { conn.setState(RESUMING); @@ -283,35 +221,7 @@ public class ClientDelegate extends ConnectionDelegate } - private String getKerberosUser() - { - log.debug("Obtaining userID from kerberos"); - String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName(); - GSSManager manager = GSSManager.getInstance(); - - try - { - GSSName acceptorName = manager.createName(service, - GSSName.NT_HOSTBASED_SERVICE, KRB5_OID); - - GSSContext secCtx = manager.createContext(acceptorName, - KRB5_OID, - null, - GSSContext.INDEFINITE_LIFETIME); - secCtx.initSecContext(new byte[0], 0, 1); - if (secCtx.getSrcName() != null) - { - return secCtx.getSrcName().toString(); - } - } - catch (GSSException e) - { - log.warn("Unable to retrieve userID from Kerberos due to error",e); - } - - return null; - } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 469b007ab3..347bf8e649 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -239,7 +239,6 @@ public class Connection extends ConnectionInvoker conSettings = settings; state = OPENING; userID = settings.getUsername(); - delegate = new ClientDelegate(settings); securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings()); diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java index 2074c77a5b..37a8e594c0 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -58,7 +58,7 @@ public class ConnectionSettings boolean verifyHostname; // SASL props - String saslMechs = System.getProperty("qpid.sasl_mechs", "PLAIN"); + String saslMechs = System.getProperty("qpid.sasl_mechs", null); String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP"); String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost"); boolean useSASLEncryption; diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 3cd7dea2b6..49f6a08007 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -20,30 +20,27 @@ */ package org.apache.qpid.transport; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.network.ConnectionBinding; -import org.apache.qpid.transport.network.io.IoAcceptor; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.transport.util.Waiter; +import static org.apache.qpid.transport.Option.EXPECTED; +import static org.apache.qpid.transport.Option.NONE; +import static org.apache.qpid.transport.Option.SYNC; +import java.io.IOException; import java.util.ArrayList; -import java.util.List; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.io.IOException; -import static org.apache.qpid.transport.Option.*; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.io.IoAcceptor; +import org.apache.qpid.transport.util.Waiter; /** * ConnectionTest */ - public class ConnectionTest extends QpidTestCase implements SessionListener { - - private static final Logger log = Logger.get(ConnectionTest.class); - private int port; private volatile boolean queue = false; private List<MessageTransfer> messages = new ArrayList<MessageTransfer>(); @@ -156,7 +153,8 @@ public class ConnectionTest extends QpidTestCase implements SessionListener private Connection connect(final CountDownLatch closed) { - Connection conn = new Connection(); + final Connection conn = new Connection(); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.addConnectionListener(new ConnectionListener() { public void opened(Connection conn) {} @@ -180,9 +178,9 @@ public class ConnectionTest extends QpidTestCase implements SessionListener { // Force os.name to be windows to exercise code in IoReceiver // that looks for the value of os.name - System.setProperty("os.name","windows"); + setTestSystemProperty("os.name","windows"); - // Start server as 0-9 to froce a ProtocolVersionException + // Start server as 0-9 to force a ProtocolVersionException startServer(new ProtocolHeader(1, 0, 9)); CountDownLatch closed = new CountDownLatch(1); @@ -217,7 +215,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener conn.send(protocolHeader); List<Object> utf8 = new ArrayList<Object>(); utf8.add("utf8"); - conn.connectionStart(null, Collections.EMPTY_LIST, utf8); + conn.connectionStart(null, Collections.emptyList(), utf8); } @Override @@ -268,40 +266,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener } } - class FailoverConnectionListener implements ConnectionListener - { - public void opened(Connection conn) {} - - public void exception(Connection conn, ConnectionException e) - { - throw e; - } - public void closed(Connection conn) - { - queue = true; - conn.connect("localhost", port, null, "guest", "guest"); - conn.resume(); - } - } - - class TestSessionListener implements SessionListener - { - public void opened(Session s) {} - public void resumed(Session s) {} - public void exception(Session s, SessionException e) {} - public void message(Session s, MessageTransfer xfr) - { - synchronized (incoming) - { - incoming.add(xfr); - incoming.notifyAll(); - } - - s.processed(xfr); - } - public void closed(Session s) {} - } public void testResumeNonemptyReplayBuffer() throws Exception { @@ -309,6 +274,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener Connection conn = new Connection(); conn.addConnectionListener(new FailoverConnectionListener()); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(1); ssn.setSessionListener(new TestSessionListener()); @@ -363,6 +329,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener startServer(); Connection conn = new Connection(); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.addConnectionListener(new FailoverConnectionListener()); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(1); @@ -385,6 +352,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener startServer(); Connection conn = new Connection(); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(); ssn.sessionFlush(EXPECTED); @@ -398,6 +366,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener { startServer(); Connection conn = new Connection(); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.connect("localhost", port, null, "guest", "guest"); conn.connectionHeartbeat(); conn.close(); @@ -408,6 +377,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener startServer(); Connection conn = new Connection(); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(); send(ssn, "EXCP 0"); @@ -427,6 +397,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener startServer(); Connection conn = new Connection(); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(); send(ssn, "EXCP 0", true); @@ -441,4 +412,38 @@ public class ConnectionTest extends QpidTestCase implements SessionListener } } + class FailoverConnectionListener implements ConnectionListener + { + public void opened(Connection conn) {} + + public void exception(Connection conn, ConnectionException e) + { + throw e; + } + + public void closed(Connection conn) + { + queue = true; + conn.connect("localhost", port, null, "guest", "guest"); + conn.resume(); + } + } + + class TestSessionListener implements SessionListener + { + public void opened(Session s) {} + public void resumed(Session s) {} + public void exception(Session s, SessionException e) {} + public void message(Session s, MessageTransfer xfr) + { + synchronized (incoming) + { + incoming.add(xfr); + incoming.notifyAll(); + } + + s.processed(xfr); + } + public void closed(Session s) {} + } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 1fa6829bf9..f18f365f20 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -286,10 +286,11 @@ public class ConnectionTest extends QpidBrokerTestCase } catch (Exception e) { - assertTrue("Incorrect exception thrown", - e.getMessage().contains("The following SASL mechanisms " + - "[MY_MECH]" + - " specified by the client are not supported by the broker")); + assertTrue("Unexpected exception message : " + e.getMessage(), + e.getMessage().contains("Client and broker have no SASL mechanisms in common.")); + assertTrue("Unexpected exception message : " + e.getMessage(), + e.getMessage().contains("Client restricted itself to : MY_MECH")); + } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java b/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java index fe929b4965..978ebfa93f 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java @@ -20,17 +20,20 @@ */ package org.apache.qpid.test.unit.message; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.Session; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.util.Properties; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.naming.InitialContext; -import javax.jms.*; -import java.util.Properties; -import java.io.*; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; /** @@ -41,8 +44,6 @@ import java.io.*; */ public class UTF8Test extends QpidBrokerTestCase { - private static final Logger _logger = LoggerFactory.getLogger(UTF8Test.class); - public void testPlainEn() throws Exception { invoke("UTF8En"); @@ -65,38 +66,24 @@ public class UTF8Test extends QpidBrokerTestCase private void runTest(String exchangeName, String queueName, String routingKey, String data) throws Exception { - _logger.info("Running test for exchange: " + exchangeName - + " queue Name: " + queueName - + " routing key: " + routingKey); - declareQueue(exchangeName, routingKey, queueName); + Connection con = getConnection(); + Session sess = con.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + final Destination dest = getDestination(exchangeName, routingKey, queueName); + + final MessageConsumer msgCons = sess.createConsumer(dest); + con.start(); - javax.jms.Connection con = getConnection(); - javax.jms.Session sess = con.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); - Destination dest = getDestination(exchangeName, routingKey, queueName); // Send data MessageProducer msgProd = sess.createProducer(dest); TextMessage message = sess.createTextMessage(data); msgProd.send(message); + // consume data - MessageConsumer msgCons = sess.createConsumer(dest); - con.start(); TextMessage m = (TextMessage) msgCons.receive(RECEIVE_TIMEOUT); assertNotNull(m); assertEquals(m.getText(), data); } - private void declareQueue(String exch, String routkey, String qname) throws Exception - { - Connection conn = new Connection(); - conn.connect("localhost", QpidBrokerTestCase.DEFAULT_PORT, "test", "guest", "guest",false); - Session sess = conn.createSession(0); - sess.exchangeDeclare(exch, "direct", null, null); - sess.queueDeclare(qname, null, null); - sess.exchangeBind(qname, exch, routkey, null); - sess.sync(); - conn.close(); - } - private Destination getDestination(String exch, String routkey, String qname) throws Exception { Properties props = new Properties(); |
