summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-10 16:15:08 +0000
committerKeith Wall <kwall@apache.org>2015-02-10 16:15:08 +0000
commit085486ebe5ff21133b9caf1c31625ac6ea356568 (patch)
tree7acbe9ca99a345dca71f9f80cd3e29ea4e3710f0 /qpid/java/systests/src
parent60c62c03ca404e98e4fbd1abf4a5ebf50763d604 (diff)
parente2e6d542b8cde9e702d1c3b63376e9d8380ba1c7 (diff)
downloadqpid-python-085486ebe5ff21133b9caf1c31625ac6ea356568.tar.gz
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java53
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java4
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java49
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java2
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java41
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/AccessControlProviderRestTest.java3
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/KeyStoreRestTest.java69
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/TrustStoreRestTest.java57
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java6
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java371
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/utils/QpidClientConnection.java288
11 files changed, 118 insertions, 825 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java
index fefed5b4ab..f0013a82d7 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/session/QueueDeclareTest.java
@@ -20,48 +20,77 @@
*/
package org.apache.qpid.client.session;
-import java.util.Collections;
-
import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
-import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.url.AMQBindingURL;
public class QueueDeclareTest extends QpidBrokerTestCase
{
private Connection _connection;
private AMQSession<?, ?> _session;
-
protected void setUp() throws Exception
{
super.setUp();
_connection = getConnection();
+ _connection.start();
_session = (AMQSession<?, ?>) _connection.createSession(true, Session.SESSION_TRANSACTED);
}
public void testDeclareAndBindWhenQueueIsNotSpecifiedInDestinationUrl() throws Exception
{
- AMQQueue destination = new AMQQueue(new AMQBindingURL("topic://amq.topic//?routingkey='testTopic'"));
+ AMQDestination destination = (AMQDestination) _session.createQueue("topic://amq.topic//?routingkey='testTopic'");
- assertEquals("Queue name is generated in parser", AMQShortString.EMPTY_STRING, destination.getAMQQueueName());
+ assertEquals("Non empty queue name unexpectedly generated by parser : " + destination.getAMQQueueName(), AMQShortString.EMPTY_STRING, destination.getAMQQueueName());
- _session.declareAndBind(destination, FieldTable.convertToFieldTable(Collections.<String, Object> emptyMap()));
+ _session.declareAndBind(destination);
- assertFalse("Unexpected queue name: [" + destination.getAMQQueueName() + "]", AMQShortString.EMPTY_STRING.equals(destination.getAMQQueueName()));
+ assertFalse("Non empty queue name should have been generated by declareAndBind",
+ AMQShortString.EMPTY_STRING.equals(destination.getAMQQueueName()));
sendMessage(_session, destination, 1);
+ receiveMessage(destination);
+ }
+
+ public void testDeclareIgnoresNonDurableFlagIfDurableQueueAlreadyExists() throws Exception
+ {
+ String format = "direct://amq.direct//%s?durable='%s'";
+ AMQDestination durable = (AMQDestination) _session.createQueue(String.format(format, getTestQueueName(), true));
+ AMQDestination nondurable = (AMQDestination) _session.createQueue(String.format(format, getTestQueueName(), false));
+
+ verifyDurabiltyIgnoreIfQueueExists(durable, nondurable);
+ }
+
+ public void testDeclareIgnoresDurableFlagIfNonDurableQueueAlreadyExists() throws Exception
+ {
+ String format = "direct://amq.direct//%s?durable='%s'";
+ AMQDestination nondurable = (AMQDestination) _session.createQueue(String.format(format, getTestQueueName(), false));
+ AMQDestination durable = (AMQDestination) _session.createQueue(String.format(format, getTestQueueName(), true));
+ verifyDurabiltyIgnoreIfQueueExists(nondurable, durable);
+ }
+
+ private void verifyDurabiltyIgnoreIfQueueExists(final AMQDestination firstDeclare, final AMQDestination secondDeclare) throws Exception
+ {
+ _session.declareAndBind(firstDeclare);
+
+ sendMessage(_session, firstDeclare, 1);
+
+ _session.declareAndBind(secondDeclare);
+ receiveMessage(secondDeclare);
+ }
+
+ private void receiveMessage(final Destination destination) throws Exception
+ {
MessageConsumer consumer = _session.createConsumer(destination);
- _connection.start();
- Message message = consumer.receive(1000l);
+ Message message = consumer.receive(RECEIVE_TIMEOUT);
assertNotNull("Message not received", message);
_session.commit();
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
index 7c82ea8e55..72dea9b18b 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
@@ -112,7 +112,9 @@ public class SSLTest extends QpidBrokerTestCase
}
catch (JMSException e)
{
- assertTrue("Unexpected exception message", e.getMessage().contains("Unrecognized SSL message, plaintext connection?"));
+ // PASS
+ assertTrue("Unexpected exception message : " + e.getMessage(),
+ e.getMessage().contains("Unrecognized SSL message, plaintext connection?"));
}
}
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
index 047151684f..7a18bb5ed2 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.logging;
-import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
@@ -306,54 +305,6 @@ public class ChannelLoggingTest extends AbstractTestLogging
validateChannelClose(results);
}
- public void testChannelClosedOnQueueArgumentsMismatch() throws Exception
- {
- assertLoggingNotYetOccured(CHANNEL_PREFIX);
-
- Connection connection = getConnection();
-
- // Create a session and then close it
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- waitForMessage("CHN-1001");
-
- String testQueueName = getTestQueueName();
-
- Queue nonDurableQueue = (Queue) session.createQueue("direct://amq.direct/" + testQueueName + "/" + testQueueName
- + "?durable='false'");
-
- ((AMQSession<?,?>)session).declareAndBind((AMQDestination)nonDurableQueue);
-
- Queue durableQueue = (Queue) session.createQueue("direct://amq.direct/" + testQueueName + "/" + testQueueName
- + "?durable='true'");
- try
- {
- ((AMQSession<?,?>)session).declareAndBind((AMQDestination) durableQueue);
- fail("Exception not thrown");
- }
- catch (AMQChannelClosedException acce)
- {
- // pass
- }
- catch (Exception e)
- {
- fail("Wrong exception thrown " + e);
- }
- waitForMessage("CHN-1003");
-
- List<String> results = findMatches(CHANNEL_PREFIX);
- assertTrue("No CHN messages logged", results.size() > 0);
-
- String closeLog = results.get(results.size() -1);
- int closeMessageID = closeLog.indexOf("CHN-1003");
- assertFalse("CHN-1003 is not found", closeMessageID == -1);
-
- String closeMessage = closeLog.substring(closeMessageID);
- assertTrue("Unexpected close channel message :" + closeMessage, Pattern.matches(CHANNEL_CLOSE_FORCED_MESSAGE_PATTERN, closeMessage));
-
- session.close();
- connection.close();
- }
-
public void testChannelClosedOnExclusiveQueueDeclaredOnDifferentSession() throws Exception
{
assertLoggingNotYetOccured(CHANNEL_PREFIX);
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java
index 5522187ee5..e855a721ee 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationTest.java
@@ -203,7 +203,7 @@ public class ExternalAuthenticationTest extends QpidBrokerTestCase
//add the peersOnly store to the config
Map<String, Object> sslTrustStoreAttributes = new HashMap<String, Object>();
sslTrustStoreAttributes.put(TrustStore.NAME, peerStoreName);
- sslTrustStoreAttributes.put(FileTrustStore.PATH, BROKER_PEERSTORE);
+ sslTrustStoreAttributes.put(FileTrustStore.STORE_URL, BROKER_PEERSTORE);
sslTrustStoreAttributes.put(FileTrustStore.PASSWORD, BROKER_PEERSTORE_PASSWORD);
sslTrustStoreAttributes.put(FileTrustStore.PEERS_ONLY, true);
getBrokerConfiguration().addObjectConfiguration(TrustStore.class, sslTrustStoreAttributes);
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java
index 4df81845d8..a5a167f633 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java
@@ -211,47 +211,6 @@ public class ManagementLoggingTest extends AbstractTestLogging
}
/**
- * Description:
- * Using the default configuration with SSL enabled for the management port the SSL Keystore path should be reported via MNG-1006
- * Input:
- * Management SSL enabled default configuration.
- * Output:
- *
- * <date> MESSAGE MNG-1006 : Using SSL Keystore : test_resources/ssl/keystore.jks
- *
- * Validation Steps:
- *
- * 1. The MNG ID is correct
- * 2. The keystore path is as specified in the configuration
- */
- public void testManagementStartupSSLKeystore() throws Exception
- {
- if (isJavaBroker())
- {
- setSystemProperty("javax.net.debug", "ssl");
- startBrokerAndCreateMonitor(true, true);
-
- List<String> results = waitAndFindMatches("MNG-1006");
-
- assertTrue("MNGer message not logged", results.size() > 0);
-
- String log = getLogMessage(results, 0);
-
- //1
- validateMessageID("MNG-1006", log);
-
- // Validate we only have two MNG-1002 (one via stdout, one via log4j)
- results = findMatches("MNG-1006");
- assertEquals("Upexpected SSL Keystore message count",
- 1, results.size());
-
- // Validate the keystore path is as expected
- assertTrue("SSL Keystore entry expected.:" + getMessageString(log),
- getMessageString(log).endsWith("systestsKeyStore"));
- }
- }
-
- /**
* Description: Tests the management connection open/close are logged correctly.
*
* Output:
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/AccessControlProviderRestTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/AccessControlProviderRestTest.java
index 0dda8e077b..26f7f16bec 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/AccessControlProviderRestTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/AccessControlProviderRestTest.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.management.plugin.HttpManagement;
+import org.apache.qpid.server.management.plugin.servlet.rest.RestServlet;
import org.apache.qpid.server.model.AccessControlProvider;
import org.apache.qpid.server.model.Plugin;
import org.apache.qpid.server.model.State;
@@ -183,7 +184,7 @@ public class AccessControlProviderRestTest extends QpidRestTestCase
getRestTestHelper().setUsernameAndPassword(BrokerOptions.MANAGEMENT_MODE_USER_NAME, MANAGEMENT_MODE_PASSWORD);
- Map<String, Object> acl = getRestTestHelper().getJsonAsSingletonList("accesscontrolprovider/" + TestBrokerConfiguration.ENTRY_NAME_ACL_FILE);
+ Map<String, Object> acl = getRestTestHelper().getJsonAsSingletonList("accesscontrolprovider/" + TestBrokerConfiguration.ENTRY_NAME_ACL_FILE + "?" + RestServlet.OVERSIZE_PARAM + "=" + (file.getAbsolutePath().length()+10));
assertEquals("Unexpected id", id.toString(), acl.get(AccessControlProvider.ID));
assertEquals("Unexpected path", file.getAbsolutePath() , acl.get(FileAccessControlProviderConstants.PATH));
assertEquals("Unexpected state", State.ERRORED.name() , acl.get(AccessControlProvider.STATE));
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/KeyStoreRestTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/KeyStoreRestTest.java
index 03b0a7a304..a3bf1ab3cb 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/KeyStoreRestTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/KeyStoreRestTest.java
@@ -28,6 +28,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.KeyStore;
import org.apache.qpid.server.security.FileKeyStore;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -52,8 +53,12 @@ public class KeyStoreRestTest extends QpidRestTestCase
List<Map<String, Object>> keyStores = assertNumberOfKeyStores(1);
Map<String, Object> keystore = keyStores.get(0);
- assertKeyStoreAttributes(keystore, TestBrokerConfiguration.ENTRY_NAME_SSL_KEYSTORE,
- QPID_HOME + "/../" + TestSSLConstants.BROKER_KEYSTORE, null);
+
+ assertEquals("Unexpected name", TestBrokerConfiguration.ENTRY_NAME_SSL_KEYSTORE, keystore.get(KeyStore.NAME));
+ assertEquals("unexpected path to key store", AbstractConfiguredObject.SECURED_STRING_VALUE, keystore.get(FileKeyStore.STORE_URL));
+ assertEquals("unexpected (dummy) password of default systests key store", AbstractConfiguredObject.SECURED_STRING_VALUE, keystore.get(FileKeyStore.PASSWORD));
+ assertEquals("unexpected type of default systests key store", java.security.KeyStore.getDefaultType(), keystore.get(FileKeyStore.KEY_STORE_TYPE));
+ assertFalse("should not be a certificateAlias attribute", keystore.containsKey(FileKeyStore.CERTIFICATE_ALIAS));
}
public void testCreate() throws Exception
@@ -67,10 +72,14 @@ public class KeyStoreRestTest extends QpidRestTestCase
createKeyStore(name, certAlias, TestSSLConstants.KEYSTORE, TestSSLConstants.KEYSTORE_PASSWORD);
assertNumberOfKeyStores(2);
- List<Map<String, Object>> keyStores = getRestTestHelper().getJsonAsList("keystore/" + name);
+ List<Map<String, Object>> keyStores = getRestTestHelper().getJsonAsList("keystore/" + name + "?actuals=true");
assertNotNull("details cannot be null", keyStores);
- assertKeyStoreAttributes(keyStores.get(0), name, TestSSLConstants.KEYSTORE, certAlias);
+ Map<String, Object> keystore = keyStores.get(0);
+ assertEquals("Unexpected name", name, keystore.get(KeyStore.NAME));
+ assertEquals("unexpected path to key store", AbstractConfiguredObject.SECURED_STRING_VALUE, keystore.get(FileKeyStore.STORE_URL));
+ assertEquals("unexpected password", AbstractConfiguredObject.SECURED_STRING_VALUE, keystore.get(FileKeyStore.PASSWORD));
+ assertEquals("unexpected alias", certAlias, keystore.get(FileKeyStore.CERTIFICATE_ALIAS));
}
public void testCreateWithDataUrl() throws Exception
@@ -85,10 +94,14 @@ public class KeyStoreRestTest extends QpidRestTestCase
createKeyStore(name, null, dataUrlForKeyStore, TestSSLConstants.KEYSTORE_PASSWORD);
assertNumberOfKeyStores(2);
- List<Map<String, Object>> keyStores = getRestTestHelper().getJsonAsList("keystore/" + name);
+ List<Map<String, Object>> keyStores = getRestTestHelper().getJsonAsList("keystore/" + name + "?actuals=true");
assertNotNull("details cannot be null", keyStores);
- assertKeyStoreAttributes(keyStores.get(0), name, dataUrlForKeyStore, null);
+ Map<String, Object> keystore = keyStores.get(0);
+ assertEquals("Unexpected name", name, keystore.get(KeyStore.NAME));
+ assertEquals("unexpected data", AbstractConfiguredObject.SECURED_STRING_VALUE, keystore.get(FileKeyStore.STORE_URL));
+ assertEquals("unexpected password", AbstractConfiguredObject.SECURED_STRING_VALUE, keystore.get(FileKeyStore.PASSWORD));
+ assertEquals("unexpected alias", null, keystore.get(FileKeyStore.CERTIFICATE_ALIAS));
}
public void testDelete() throws Exception
@@ -104,15 +117,17 @@ public class KeyStoreRestTest extends QpidRestTestCase
getRestTestHelper().submitRequest("keystore/" + name, "DELETE", HttpServletResponse.SC_OK);
- List<Map<String, Object>> keyStore = getRestTestHelper().getJsonAsList("keystore/" + name);
+ List<Map<String, Object>> keyStore = getRestTestHelper().getJsonAsList("keystore/" + name + "?actuals=true");
assertNotNull("details should not be null", keyStore);
assertTrue("details should be empty as the keystore no longer exists", keyStore.isEmpty());
//check only the default systests key store remains
List<Map<String, Object>> keyStores = assertNumberOfKeyStores(1);
Map<String, Object> keystore = keyStores.get(0);
- assertKeyStoreAttributes(keystore, TestBrokerConfiguration.ENTRY_NAME_SSL_KEYSTORE,
- QPID_HOME + "/../" + TestSSLConstants.BROKER_KEYSTORE, null);
+ assertEquals("Unexpected name", TestBrokerConfiguration.ENTRY_NAME_SSL_KEYSTORE, keystore.get(KeyStore.NAME));
+ assertEquals("unexpected path to key store", AbstractConfiguredObject.SECURED_STRING_VALUE, keystore.get(FileKeyStore.STORE_URL));
+ assertEquals("unexpected (dummy) password of default systests key store", AbstractConfiguredObject.SECURED_STRING_VALUE, keystore.get(FileKeyStore.PASSWORD));
+ assertFalse("should not be a certificateAlias attribute", keystore.containsKey(FileKeyStore.CERTIFICATE_ALIAS));
}
public void testUpdate() throws Exception
@@ -127,14 +142,18 @@ public class KeyStoreRestTest extends QpidRestTestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(KeyStore.NAME, name);
- attributes.put(FileKeyStore.PATH, TestSSLConstants.UNTRUSTED_KEYSTORE);
+ attributes.put(FileKeyStore.STORE_URL, TestSSLConstants.UNTRUSTED_KEYSTORE);
getRestTestHelper().submitRequest("keystore/" + name, "PUT", attributes, HttpServletResponse.SC_OK);
- List<Map<String, Object>> keyStore = getRestTestHelper().getJsonAsList("keystore/" + name);
- assertNotNull("details should not be null", keyStore);
+ List<Map<String, Object>> keyStores = getRestTestHelper().getJsonAsList("keystore/" + name + "?actuals=true");
+ assertNotNull("details should not be null", keyStores);
- assertKeyStoreAttributes(keyStore.get(0), name, TestSSLConstants.UNTRUSTED_KEYSTORE, null);
+ Map<String, Object> keystore = keyStores.get(0);
+ assertEquals("Unexpected name", name, keystore.get(KeyStore.NAME));
+ assertEquals("unexpected data", AbstractConfiguredObject.SECURED_STRING_VALUE, keystore.get(FileKeyStore.STORE_URL));
+ assertEquals("unexpected password", AbstractConfiguredObject.SECURED_STRING_VALUE, keystore.get(FileKeyStore.PASSWORD));
+ assertEquals("unexpected alias", null, keystore.get(FileKeyStore.CERTIFICATE_ALIAS));
}
@@ -151,7 +170,7 @@ public class KeyStoreRestTest extends QpidRestTestCase
{
Map<String, Object> keyStoreAttributes = new HashMap<>();
keyStoreAttributes.put(KeyStore.NAME, name);
- keyStoreAttributes.put(FileKeyStore.PATH, keyStorePath);
+ keyStoreAttributes.put(FileKeyStore.STORE_URL, keyStorePath);
keyStoreAttributes.put(FileKeyStore.PASSWORD, keystorePassword);
if (certAlias != null)
{
@@ -161,26 +180,4 @@ public class KeyStoreRestTest extends QpidRestTestCase
getRestTestHelper().submitRequest("keystore/" + name, "PUT", keyStoreAttributes, HttpServletResponse.SC_CREATED);
}
- private void assertKeyStoreAttributes(Map<String, Object> keystore, String name, String path, String certAlias)
- {
- assertEquals("default systests key store is missing",
- name, keystore.get(KeyStore.NAME));
- assertEquals("unexpected path to key store",
- path, keystore.get(FileKeyStore.PATH));
- assertEquals("unexpected (dummy) password of default systests key store",
- AbstractConfiguredObject.SECURED_STRING_VALUE, keystore.get(FileKeyStore.PASSWORD));
- assertEquals("unexpected type of default systests key store",
- java.security.KeyStore.getDefaultType(), keystore.get(FileKeyStore.KEY_STORE_TYPE));
- if(certAlias == null)
- {
- assertFalse("should not be a certificateAlias attribute",
- keystore.containsKey(FileKeyStore.CERTIFICATE_ALIAS));
- }
- else
- {
- assertEquals("unexpected certificateAlias value",
- certAlias, keystore.get(FileKeyStore.CERTIFICATE_ALIAS));
-
- }
- }
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/TrustStoreRestTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/TrustStoreRestTest.java
index 6cca3fc12c..c1ea83e0dd 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/TrustStoreRestTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/TrustStoreRestTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
import javax.servlet.http.HttpServletResponse;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.security.FileTrustStore;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -36,6 +37,7 @@ import org.apache.qpid.util.FileUtils;
public class TrustStoreRestTest extends QpidRestTestCase
{
+
@Override
public void setUp() throws Exception
{
@@ -51,8 +53,14 @@ public class TrustStoreRestTest extends QpidRestTestCase
List<Map<String, Object>> trustStores = assertNumberOfTrustStores(1);
Map<String, Object> truststore = trustStores.get(0);
- assertTrustStoreAttributes(truststore, TestBrokerConfiguration.ENTRY_NAME_SSL_TRUSTSTORE,
- QPID_HOME + "/../" + TestSSLConstants.BROKER_TRUSTSTORE, false);
+ assertEquals("default systests trust store is missing",
+ TestBrokerConfiguration.ENTRY_NAME_SSL_TRUSTSTORE, truststore.get(TrustStore.NAME));
+ assertEquals("unexpected store URL", ConfiguredObject.OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, truststore.get(FileTrustStore.STORE_URL));
+ assertEquals("unexpected (dummy) password of default systests trust store",
+ AbstractConfiguredObject.SECURED_STRING_VALUE, truststore.get(FileTrustStore.PASSWORD));
+ assertEquals("unexpected type of default systests trust store",
+ java.security.KeyStore.getDefaultType(), truststore.get(FileTrustStore.TRUST_STORE_TYPE));
+ assertEquals("unexpected peersOnly value", false, truststore.get(FileTrustStore.PEERS_ONLY));
}
public void testCreate() throws Exception
@@ -68,7 +76,12 @@ public class TrustStoreRestTest extends QpidRestTestCase
List<Map<String, Object>> trustStores = getRestTestHelper().getJsonAsList("truststore/" + name);
assertNotNull("details cannot be null", trustStores);
- assertTrustStoreAttributes(trustStores.get(0), name, TestSSLConstants.TRUSTSTORE, true);
+ Map<String, Object> truststore = trustStores.get(0);
+ assertEquals("unexpected trust store name", name, truststore.get(TrustStore.NAME));
+ assertEquals("unexpected store URL", TestSSLConstants.TRUSTSTORE, truststore.get(FileTrustStore.STORE_URL));
+ assertEquals("unexpected password value", AbstractConfiguredObject.SECURED_STRING_VALUE, truststore.get(FileTrustStore.PASSWORD));
+ assertEquals("unexpected type", java.security.KeyStore.getDefaultType(), truststore.get(FileTrustStore.TRUST_STORE_TYPE));
+ assertEquals("unexpected peersOnly value", true, truststore.get(FileTrustStore.PEERS_ONLY));
}
public void testCreateUsingDataUrl() throws Exception
@@ -88,7 +101,12 @@ public class TrustStoreRestTest extends QpidRestTestCase
List<Map<String, Object>> trustStores = getRestTestHelper().getJsonAsList("truststore/" + name);
assertNotNull("details cannot be null", trustStores);
- assertTrustStoreAttributes(trustStores.get(0), name, dataUrlForTruststore, false);
+ Map<String, Object> truststore = trustStores.get(0);
+ assertEquals("nexpected trust store name", name, truststore.get(TrustStore.NAME));
+ assertEquals("unexpected store URL value", ConfiguredObject.OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, truststore.get(FileTrustStore.STORE_URL));
+ assertEquals("unexpected password value", AbstractConfiguredObject.SECURED_STRING_VALUE, truststore.get(FileTrustStore.PASSWORD));
+ assertEquals("unexpected type of trust store", java.security.KeyStore.getDefaultType(), truststore.get(FileTrustStore.TRUST_STORE_TYPE));
+ assertEquals("unexpected peersOnly value", false, truststore.get(FileTrustStore.PEERS_ONLY));
}
public void testDelete() throws Exception
@@ -110,8 +128,11 @@ public class TrustStoreRestTest extends QpidRestTestCase
//check only the default systests trust store remains
List<Map<String, Object>> trustStores = assertNumberOfTrustStores(1);
Map<String, Object> truststore = trustStores.get(0);
- assertTrustStoreAttributes(truststore, TestBrokerConfiguration.ENTRY_NAME_SSL_TRUSTSTORE,
- QPID_HOME + "/../" + TestSSLConstants.BROKER_TRUSTSTORE, false);
+ assertEquals("unexpected name", TestBrokerConfiguration.ENTRY_NAME_SSL_TRUSTSTORE, truststore.get(TrustStore.NAME));
+ assertEquals("unexpected store URL value", ConfiguredObject.OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, truststore.get(FileTrustStore.STORE_URL));
+ assertEquals("unexpected password value", AbstractConfiguredObject.SECURED_STRING_VALUE, truststore.get(FileTrustStore.PASSWORD));
+ assertEquals("unexpected type of trust store", java.security.KeyStore.getDefaultType(), truststore.get(FileTrustStore.TRUST_STORE_TYPE));
+ assertEquals("unexpected peersOnly value", false, truststore.get(FileTrustStore.PEERS_ONLY));
}
@@ -127,14 +148,19 @@ public class TrustStoreRestTest extends QpidRestTestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(TrustStore.NAME, name);
- attributes.put(FileTrustStore.PATH, TestSSLConstants.TRUSTSTORE);
+ attributes.put(FileTrustStore.STORE_URL, TestSSLConstants.TRUSTSTORE);
getRestTestHelper().submitRequest("truststore/" + name , "PUT", attributes, HttpServletResponse.SC_OK);
List<Map<String, Object>> trustStore = getRestTestHelper().getJsonAsList("truststore/" + name);
assertNotNull("details should not be null", trustStore);
- assertTrustStoreAttributes(trustStore.get(0), name, TestSSLConstants.TRUSTSTORE, false);
+ Map<String, Object> truststore = trustStore.get(0);
+ assertEquals("unexpected name", name, truststore.get(TrustStore.NAME));
+ assertEquals("unexpected path to trust store", TestSSLConstants.TRUSTSTORE, truststore.get(FileTrustStore.STORE_URL));
+ assertEquals("unexpected password", AbstractConfiguredObject.SECURED_STRING_VALUE, truststore.get(FileTrustStore.PASSWORD));
+ assertEquals("unexpected type", java.security.KeyStore.getDefaultType(), truststore.get(FileTrustStore.TRUST_STORE_TYPE));
+ assertEquals("unexpected peersOnly value", false, truststore.get(FileTrustStore.PEERS_ONLY));
}
private List<Map<String, Object>> assertNumberOfTrustStores(int numberOfTrustStores) throws Exception
@@ -151,24 +177,11 @@ public class TrustStoreRestTest extends QpidRestTestCase
Map<String, Object> trustStoreAttributes = new HashMap<String, Object>();
trustStoreAttributes.put(TrustStore.NAME, name);
//deliberately using the client trust store to differentiate from the one we are already for broker
- trustStoreAttributes.put(FileTrustStore.PATH, truststorePath);
+ trustStoreAttributes.put(FileTrustStore.STORE_URL, truststorePath);
trustStoreAttributes.put(FileTrustStore.PASSWORD, truststorePassword);
trustStoreAttributes.put(FileTrustStore.PEERS_ONLY, peersOnly);
getRestTestHelper().submitRequest("truststore/" + name, "PUT", trustStoreAttributes, HttpServletResponse.SC_CREATED);
}
- private void assertTrustStoreAttributes(Map<String, Object> truststore, String name, String path, boolean peersOnly)
- {
- assertEquals("default systests trust store is missing",
- name, truststore.get(TrustStore.NAME));
- assertEquals("unexpected path to trust store",
- path, truststore.get(FileTrustStore.PATH));
- assertEquals("unexpected (dummy) password of default systests trust store",
- AbstractConfiguredObject.SECURED_STRING_VALUE, truststore.get(FileTrustStore.PASSWORD));
- assertEquals("unexpected type of default systests trust store",
- java.security.KeyStore.getDefaultType(), truststore.get(FileTrustStore.TRUST_STORE_TYPE));
- assertEquals("unexpected peersOnly value",
- peersOnly, truststore.get(FileTrustStore.PEERS_ONLY));
- }
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
index 86ebf11575..c05e95c4d4 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
@@ -183,7 +183,7 @@ public class BrokerACLTest extends QpidRestTestCase
assertEquals("Setting of provider attribites should be allowed", 403, responseCode);
Map<String, Object> provider = getRestTestHelper().getJsonAsSingletonList("authenticationprovider/" + providerName);
- assertEquals("Unexpected PATH attribute value",
+ assertEquals("Unexpected STORE_URL attribute value",
providerData.get(ExternalFileBasedAuthenticationManager.PATH),
provider.get(ExternalFileBasedAuthenticationManager.PATH));
}
@@ -922,7 +922,7 @@ public class BrokerACLTest extends QpidRestTestCase
{
Map<String, Object> keyStoreAttributes = new HashMap<String, Object>();
keyStoreAttributes.put(KeyStore.NAME, name);
- keyStoreAttributes.put(FileKeyStore.PATH, TestSSLConstants.KEYSTORE);
+ keyStoreAttributes.put(FileKeyStore.STORE_URL, TestSSLConstants.KEYSTORE);
keyStoreAttributes.put(FileKeyStore.PASSWORD, TestSSLConstants.KEYSTORE_PASSWORD);
keyStoreAttributes.put(FileKeyStore.CERTIFICATE_ALIAS, certAlias);
@@ -933,7 +933,7 @@ public class BrokerACLTest extends QpidRestTestCase
{
Map<String, Object> trustStoreAttributes = new HashMap<String, Object>();
trustStoreAttributes.put(TrustStore.NAME, name);
- trustStoreAttributes.put(FileTrustStore.PATH, TestSSLConstants.KEYSTORE);
+ trustStoreAttributes.put(FileTrustStore.STORE_URL, TestSSLConstants.KEYSTORE);
trustStoreAttributes.put(FileTrustStore.PASSWORD, TestSSLConstants.KEYSTORE_PASSWORD);
trustStoreAttributes.put(FileTrustStore.PEERS_ONLY, peersOnly);
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
deleted file mode 100644
index 5895d670a7..0000000000
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.test.unit.close;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.QpidClientConnection;
-import org.apache.qpid.url.URLSyntaxException;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class MessageRequeueTest extends QpidBrokerTestCase
-{
- private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class);
-
- protected static AtomicInteger consumerIds = new AtomicInteger(0);
- protected final Integer numTestMessages = 150;
-
- protected final int consumeTimeout = 3000;
-
- protected final String queue = "direct://amq.direct//message-requeue-test-queue";
- protected String payload = "Message:";
-
- protected final String BROKER = "tcp://127.0.0.1:5672";
- private boolean testReception = true;
-
- private long[] receieved = new long[numTestMessages + 1];
- private boolean passed = false;
- private QpidClientConnection conn;
-
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- conn = new QpidClientConnection(BROKER);
-
- conn.connect();
- // clear queue
- conn.consume(queue, consumeTimeout);
- // load test data
- _logger.info("creating test data, " + numTestMessages + " messages");
- conn.put(queue, payload, numTestMessages);
- // close this connection
- conn.disconnect();
- }
-
- protected void tearDown() throws Exception
- {
-
- if (!passed) // clean up
- {
- QpidClientConnection conn = new QpidClientConnection(BROKER);
-
- conn.connect();
- // clear queue
- conn.consume(queue, consumeTimeout);
-
- conn.disconnect();
- }
-
- super.tearDown();
- }
-
- /**
- * multiple consumers
- *
- * @throws javax.jms.JMSException if a JMS problem occurs
- * @throws InterruptedException on timeout
- */
- public void testDrain() throws Exception
- {
- QpidClientConnection conn = new QpidClientConnection(BROKER);
-
- conn.connect();
-
- _logger.info("consuming queue " + queue);
- Queue q = conn.getSession().createQueue(queue);
-
- final MessageConsumer consumer = conn.getSession().createConsumer(q);
- int messagesReceived = 0;
-
- long[] messageLog = new long[numTestMessages + 1];
-
- _logger.info("consuming...");
- Message msg = consumer.receive(1000);
- while (msg != null)
- {
- messagesReceived++;
-
- long dt = ((AbstractJMSMessage) msg).getDeliveryTag();
-
- int msgindex = msg.getIntProperty("index");
- if (messageLog[msgindex] != 0)
- {
- _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag()
- + ") more than once.");
- }
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + "IN:" + msgindex);
- }
-
- if (dt == 0)
- {
- _logger.error("DT is zero for msg:" + msgindex);
- }
-
- messageLog[msgindex] = dt;
-
- // get Next message
- msg = consumer.receive(1000);
- }
-
- _logger.info("consuming done.");
- conn.getSession().commit();
- consumer.close();
-
- int index = 0;
- StringBuilder list = new StringBuilder();
- list.append("Failed to receive:");
- int failed = 0;
-
- _logger.info("consumed: " + messagesReceived);
-
- assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
- // with 0_10 we can have a delivery tag of 0
- if (!conn.isBroker010())
- {
- for (long b : messageLog)
- {
- if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist
- {
- _logger.error("Index: " + index + " was not received.");
- list.append(" ");
- list.append(index);
- list.append(":");
- list.append(b);
- failed++;
- }
-
- index++;
- }
-
- assertEquals(list.toString(), 0, failed);
- }
-
- conn.disconnect();
- passed = true;
- }
-
- /** multiple consumers
- * Based on code subbmitted by client FT-304
- */
- public void testTwoCompetingConsumers()
- {
- Consumer c1 = new Consumer();
- Consumer c2 = new Consumer();
- Consumer c3 = new Consumer();
- Consumer c4 = new Consumer();
-
- Thread t1 = new Thread(c1);
- Thread t2 = new Thread(c2);
- Thread t3 = new Thread(c3);
- Thread t4 = new Thread(c4);
-
- t1.start();
- t2.start();
- t3.start();
- // t4.start();
-
- try
- {
- t1.join();
- t2.join();
- t3.join();
- t4.join();
- }
- catch (InterruptedException e)
- {
- fail("Unable to join to Consumer theads");
- }
-
- _logger.info("consumer 1 count is " + c1.getCount());
- _logger.info("consumer 2 count is " + c2.getCount());
- _logger.info("consumer 3 count is " + c3.getCount());
- _logger.info("consumer 4 count is " + c4.getCount());
-
- Integer totalConsumed = c1.getCount() + c2.getCount() + c3.getCount() + c4.getCount();
-
- // Check all messages were correctly delivered
- int index = 0;
- StringBuilder list = new StringBuilder();
- list.append("Failed to receive:");
- int failed = 0;
- if (!conn.isBroker010())
- {
- for (long b : receieved)
- {
- if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist (and we don't have msg 0)
- {
- _logger.error("Index: " + index + " was not received.");
- list.append(" ");
- list.append(index);
- list.append(":");
- list.append(b);
- failed++;
- }
-
- index++;
- }
-
- assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
- }
- assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
- passed = true;
- }
-
- class Consumer implements Runnable
- {
- private Integer count = 0;
- private Integer id;
-
- public Consumer()
- {
- id = consumerIds.addAndGet(1);
- }
-
- public void run()
- {
- try
- {
- _logger.info("consumer-" + id + ": starting");
- QpidClientConnection conn = new QpidClientConnection(BROKER);
-
- conn.connect();
-
- _logger.info("consumer-" + id + ": connected, consuming...");
- Message result;
- do
- {
- result = conn.getNextMessage(queue, consumeTimeout);
- if (result != null)
- {
-
- long dt = ((AbstractJMSMessage) result).getDeliveryTag();
-
- if (testReception)
- {
- int msgindex = result.getIntProperty("index");
- if (receieved[msgindex] != 0)
- {
- _logger.error("Received Message(" + msgindex + ":"
- + ((AbstractJMSMessage) result).getDeliveryTag() + ") more than once.");
- }
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt
- + "IN:" + msgindex);
- }
-
- if (dt == 0)
- {
- _logger.error("DT is zero for msg:" + msgindex);
- }
-
- receieved[msgindex] = dt;
- }
-
- count++;
- if ((count % 100) == 0)
- {
- _logger.info("consumer-" + id + ": got " + result + ", new count is " + count);
- }
- }
- }
- while (result != null);
-
- _logger.info("consumer-" + id + ": complete");
- conn.disconnect();
-
- }
- catch (Exception e)
- {
- _logger.error("Consumer run error",e);
- }
- }
-
- public Integer getCount()
- {
- return count;
- }
-
- public Integer getId()
- {
- return id;
- }
- }
-
- public void testRequeue() throws JMSException, AMQException, URLSyntaxException
- {
- int run = 0;
- // while (run < 10)
- {
- run++;
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("testRequeue run " + run);
- }
-
- String virtualHost = "/test";
- String brokerlist = BROKER;
- String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- QpidClientConnection qpc = new QpidClientConnection(BROKER);
- qpc.connect();
- Connection conn = qpc. getConnection();
-
- Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue q = session.createQueue(queue);
-
- _logger.debug("Create Consumer");
- MessageConsumer consumer = session.createConsumer(q);
-
- conn.start();
-
- _logger.debug("Receiving msg");
- Message msg = consumer.receive(2000);
-
- assertNotNull("Message should not be null", msg);
-
- // As we have not ack'd message will be requeued.
- _logger.debug("Close Consumer");
- consumer.close();
-
- _logger.debug("Close Connection");
- conn.close();
- }
- }
-
-}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/QpidClientConnection.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/QpidClientConnection.java
deleted file mode 100644
index 0e0032da64..0000000000
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/utils/QpidClientConnection.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.test.utils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.JMSAMQException;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-public class QpidClientConnection extends QpidBrokerTestCase implements ExceptionListener
-{
- private static final Logger _logger = LoggerFactory.getLogger(QpidClientConnection.class);
-
- private boolean transacted = true;
- private int ackMode = Session.CLIENT_ACKNOWLEDGE;
- private Connection connection;
-
- private String virtualHost;
- private String brokerlist;
- private int prefetch;
- protected Session session;
- protected boolean connected;
-
- public QpidClientConnection(String broker)
- {
- super();
- setVirtualHost("/test");
- setBrokerList(broker);
- setPrefetch(5000);
- }
-
-
- public Connection getConnection()
- {
- return connection;
- }
-
- public void connect() throws JMSException
- {
- if (!connected)
- {
- /*
- * amqp://[user:pass@][clientid]/virtualhost?
- * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
- * [&failover='method[?option='value'[&option='value']]']
- * [&option='value']"
- */
- String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- try
- {
- _logger.info("connecting to Qpid :" + brokerUrl);
- connection = getConnection("guest", "guest") ;
- // register exception listener
- connection.setExceptionListener(this);
-
- session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
-
- _logger.info("starting connection");
- connection.start();
-
- connected = true;
- }
- catch (Exception e)
- {
- throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e);
- }
- }
- }
-
- public void disconnect() throws Exception
- {
- if (connected)
- {
- session.commit();
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected");
- }
- }
-
- public void disconnectWithoutCommit() throws JMSException
- {
- if (connected)
- {
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected without commit");
- }
- }
-
- public String getBrokerList()
- {
- return brokerlist;
- }
-
- public void setBrokerList(String brokerlist)
- {
- this.brokerlist = brokerlist;
- }
-
- public String getVirtualHost()
- {
- return virtualHost;
- }
-
- public void setVirtualHost(String virtualHost)
- {
- this.virtualHost = virtualHost;
- }
-
- public void setPrefetch(int prefetch)
- {
- this.prefetch = prefetch;
- }
-
- /** override as necessary */
- public void onException(JMSException exception)
- {
- _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
- }
-
- public boolean isConnected()
- {
- return connected;
- }
-
- public Session getSession()
- {
- return session;
- }
-
- /**
- * Put a String as a text messages, repeat n times. A null payload will result in a null message.
- *
- * @param queueName The queue name to put to
- * @param payload the content of the payload
- * @param copies the number of messages to put
- *
- * @throws javax.jms.JMSException any exception that occurs
- */
- public void put(String queueName, String payload, int copies) throws JMSException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("putting to queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageProducer sender = session.createProducer(queue);
-
- for (int i = 0; i < copies; i++)
- {
- Message m = session.createTextMessage(payload + i);
- m.setIntProperty("index", i + 1);
- sender.send(m);
- }
-
- session.commit();
- sender.close();
- _logger.info("put " + copies + " copies");
- }
-
- /**
- * GET the top message on a queue. Consumes the message. Accepts timeout value.
- *
- * @param queueName The quename to get from
- * @param readTimeout The timeout to use
- *
- * @return the content of the text message if any
- *
- * @throws javax.jms.JMSException any exception that occured
- */
- public Message getNextMessage(String queueName, long readTimeout) throws JMSException
- {
- if (!connected)
- {
- connect();
- }
-
- Queue queue = session.createQueue(queueName);
-
- final MessageConsumer consumer = session.createConsumer(queue);
-
- Message message = consumer.receive(readTimeout);
- session.commit();
- consumer.close();
-
- Message result;
-
- // all messages we consume should be TextMessages
- if (message instanceof TextMessage)
- {
- result = ((TextMessage) message);
- }
- else if (null == message)
- {
- result = null;
- }
- else
- {
- _logger.info("warning: received non-text message");
- result = message;
- }
-
- return result;
- }
-
- /**
- * GET the top message on a queue. Consumes the message.
- *
- * @param queueName The Queuename to get from
- *
- * @return The string content of the text message, if any received
- *
- * @throws javax.jms.JMSException any exception that occurs
- */
- public Message getNextMessage(String queueName) throws JMSException
- {
- return getNextMessage(queueName, 0);
- }
-
- /**
- * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
- *
- * @param queueName The Queue name to consume from
- * @param readTimeout The timeout for each consume
- *
- * @throws javax.jms.JMSException Any exception that occurs during the consume
- * @throws InterruptedException If the consume thread was interrupted during a consume.
- */
- public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("consuming queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageConsumer consumer = session.createConsumer(queue);
- int messagesReceived = 0;
-
- _logger.info("consuming...");
- while ((consumer.receive(readTimeout)) != null)
- {
- messagesReceived++;
- }
-
- session.commit();
- consumer.close();
- _logger.info("consumed: " + messagesReceived);
- }
-}