diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-05-03 13:41:35 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-05-03 13:41:35 +0000 |
| commit | 49c6109ef7370409c93bc35b6a9ddf58e6fca465 (patch) | |
| tree | 4e3c1d0a7ef8dcb518de078244f946230bfbc88b /java | |
| parent | b2d3708cb018e4746a1aed79d5089eea5afbb43f (diff) | |
| download | qpid-python-49c6109ef7370409c93bc35b6a9ddf58e6fca465.tar.gz | |
Merged revisions 533704-533720,533722-533763,533766-533818,533820-533839,533841-533859,533862-534112,534114-534117 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r533704 | bhupendrab | 2007-04-30 12:37:59 +0100 (Mon, 30 Apr 2007) | 1 line
Added time-out for secure server connection.
........
r534036 | ritchiem | 2007-05-01 13:28:03 +0100 (Tue, 01 May 2007) | 1 line
QPID-461 Update to CommitRollbackTest. Ensuring messages received have the correct redelivered value, regardless of order. Different test case also was problematic.
........
r534117 | ritchiem | 2007-05-01 16:22:17 +0100 (Tue, 01 May 2007) | 4 lines
Comments and Test changes
VirtualHost Added comments
HeapExhaustion - Updated to send transient messages.
QpidClientConnection - Allowed specification of type of message to send.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@534856 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
8 files changed, 334 insertions, 31 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 7e329a5274..15edb2289f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -1,3 +1,4 @@ +<<<<<<< .working /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -270,3 +271,265 @@ public class VirtualHost implements Accessable return _virtualHostMBean; } } +======= +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import javax.management.NotCompliantMBeanException;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.security.access.AccessManager;
+import org.apache.qpid.server.security.access.AccessManagerImpl;
+import org.apache.qpid.server.security.access.Accessable;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+
+public class VirtualHost implements Accessable
+{
+ private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+ private final String _name;
+
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ protected VirtualHostMBean _virtualHostMBean;
+
+ private AMQBrokerManagerMBean _brokerMBean;
+
+ private AuthenticationManager _authenticationManager;
+
+ private AccessManager _accessManager;
+
+
+ public void setAccessableName(String name)
+ {
+ _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
+ + name + ") ignored remains :" + getAccessableName());
+ }
+
+ public String getAccessableName()
+ {
+ return _name;
+ }
+
+
+ /**
+ * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
+ * implementaion of an Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedVirtualHost.class, "VirtualHost");
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _name.toString();
+ }
+
+ public String getName()
+ {
+ return _name.toString();
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return VirtualHost.this;
+ }
+
+
+ } // End of MBean class
+
+ /**
+ * Used for testing only
+ * @param name
+ * @param store
+ * @throws Exception
+ */
+ public VirtualHost(String name, MessageStore store) throws Exception
+ {
+ this(name, null, store);
+ }
+
+ /**
+ * Normal Constructor
+ * @param name
+ * @param hostConfig
+ * @throws Exception
+ */
+ public VirtualHost(String name, Configuration hostConfig) throws Exception
+ {
+ this(name, hostConfig, null);
+ }
+
+ private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
+ {
+ _name = name;
+
+ _virtualHostMBean = new VirtualHostMBean();
+ // This isn't needed to be registered
+ //_virtualHostMBean.register();
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
+
+ if (store != null)
+ {
+ _messageStore = store;
+ }
+ else
+ {
+ if (hostConfig == null)
+ {
+ throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
+ }
+ initialiseMessageStore(hostConfig);
+ }
+
+ _exchangeRegistry.initialise();
+
+ _logger.warn("VirtualHost authentication Managers require spec change to be operational.");
+ _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
+
+ _accessManager = new AccessManagerImpl(name, hostConfig);
+
+ _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+ _brokerMBean.register();
+ }
+
+ private void initialiseMessageStore(Configuration config) throws Exception
+ {
+ String messageStoreClass = config.getString("store.class");
+
+ Class clazz = Class.forName(messageStoreClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ _messageStore = (MessageStore) o;
+ _messageStore.configure(this, "store", config);
+ }
+
+
+ public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
+ {
+ T instance;
+ try
+ {
+ instance = instanceType.newInstance();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
+ }
+ Configurator.configure(instance);
+
+ return instance;
+ }
+
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public AccessManager getAccessManager()
+ {
+ return _accessManager;
+ }
+
+ public void close() throws Exception
+ {
+ if (_messageStore != null)
+ {
+ _messageStore.close();
+ }
+ }
+
+ public ManagedObject getBrokerMBean()
+ {
+ return _brokerMBean;
+ }
+
+ public ManagedObject getManagedObject()
+ {
+ return _virtualHostMBean;
+ }
+}
+>>>>>>> .merge-right.r534117 diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 8aaa760537..0e718da19b 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -359,17 +359,37 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); Message result = _consumer.receive(1000); - assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("1", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); - result = _consumer.receive(1000); assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("2", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + + + if (((TextMessage) result).getText().equals("2")) + { + assertTrue("Messasge is marked as redelivered", !result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + } + else + { + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + } result = _consumer.receive(1000); + if (result != null) + { + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + result = _consumer.receive(1000); + } + assertNull("test message should be null", result); } @@ -411,7 +431,7 @@ public class CommitRollbackTest extends TestCase else // or it will be msg 2 arriving the first time due to latency. { _logger.info("Message 2 wasn't prefetched so wasn't rejected"); - assertEquals("2", ((TextMessage) result).getText()); + assertEquals("2", ((TextMessage) result).getText()); } result = _consumer.receive(1000); diff --git a/java/management/eclipse-plugin/bin/qpidmc.bat b/java/management/eclipse-plugin/bin/qpidmc.bat index e444bc5811..1f3207f043 100644 --- a/java/management/eclipse-plugin/bin/qpidmc.bat +++ b/java/management/eclipse-plugin/bin/qpidmc.bat @@ -52,4 +52,4 @@ goto exit rem Slurp the command line arguments. This loop allows for an unlimited number
rem of agruments (up to the command line limit, anyway).
-"%JAVA_HOME%\bin\java" -Xms40m -Xmx256m -Declipse.consoleLog=false -Dsecurity=PLAIN -jar %QPIDMC_HOME%\eclipse\startup.jar org.eclipse.core.launcher.Main -launcher %QPIDMC_HOME%\eclipse\eclipse -name "Qpid Management Console" -showsplash 600 -configuration "file:%QPIDMC_HOME%\configuration" -os win32 -ws win32 -arch x86
+"%JAVA_HOME%\bin\java" -Xms40m -Xmx256m -Declipse.consoleLog=false -jar %QPIDMC_HOME%\eclipse\startup.jar org.eclipse.core.launcher.Main -launcher %QPIDMC_HOME%\eclipse\eclipse -name "Qpid Management Console" -showsplash 600 -configuration "file:%QPIDMC_HOME%\configuration" -os win32 -ws win32 -arch x86
diff --git a/java/management/eclipse-plugin/bin/qpidmc.sh b/java/management/eclipse-plugin/bin/qpidmc.sh index aae61b14c7..2472545a14 100755 --- a/java/management/eclipse-plugin/bin/qpidmc.sh +++ b/java/management/eclipse-plugin/bin/qpidmc.sh @@ -61,4 +61,4 @@ elif [ $os = "Linux" ]; then os="linux" fi -"$JAVA_HOME/bin/java" -Xms40m -Xmx256m -Declipse.consoleLog=false -Dsecurity=PLAIN -jar $QPIDMC_HOME/eclipse/startup.jar org.eclipse.core.launcher.Main -launcher $QPIDMC_HOME/eclipse/eclipse -name "Qpid Management Console" -showsplash 600 -configuration "file:$QPIDMC_HOME/configuration" -os $os -ws $ws -arch $arch +"$JAVA_HOME/bin/java" -Xms40m -Xmx256m -Declipse.consoleLog=false -jar $QPIDMC_HOME/eclipse/startup.jar org.eclipse.core.launcher.Main -launcher $QPIDMC_HOME/eclipse/eclipse -name "Qpid Management Console" -showsplash 600 -configuration "file:$QPIDMC_HOME/configuration" -os $os -ws $ws -arch $arch diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java index 0693a4fc93..f6eace3101 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java @@ -44,6 +44,7 @@ public abstract class ApplicationRegistry public static final boolean debug = Boolean.getBoolean("eclipse.consoleLog"); public static final String securityMechanism = System.getProperty("security", null); public static final String connectorClass = System.getProperty("jmxconnector"); + public static final long timeout = Long.parseLong(System.getProperty("timeout", "5000")); static { diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java index 4d4b3e87e7..f97568e275 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java @@ -59,6 +59,7 @@ import org.apache.qpid.management.ui.sasl.UsernameHashedPasswordCallbackHandler; public class JMXServerRegistry extends ServerRegistry { + private boolean _connected = false; private ObjectName _serverObjectName = null; private Map<String, Object> _env = null; private JMXServiceURL _jmxUrl = null; @@ -98,23 +99,12 @@ public class JMXServerRegistry extends ServerRegistry super(server); String securityMechanism = ApplicationRegistry.getSecurityMechanism(); String connectorClassName = ApplicationRegistry.getJMXConnectorClass(); - - boolean saslPluginAvailable = false; if ((securityMechanism != null) && (connectorClassName != null)) { - try - { - createSASLConnector(securityMechanism, connectorClassName); - saslPluginAvailable = true; - } - catch (Exception ex) - { - MBeanUtility.printStackTrace(ex); - } + createSASLConnector(securityMechanism, connectorClassName); } - - if (!saslPluginAvailable) + else { _jmxUrl = new JMXServiceURL(server.getUrl()); _jmxc = JMXConnectorFactory.connect(_jmxUrl, null); @@ -179,9 +169,34 @@ public class JMXServerRegistry extends ServerRegistry Object theObject = cons.newInstance(args); _jmxc = (JMXConnector)theObject; - _jmxc.connect(); + + Thread connectorThread = new Thread(new ConnectorThread()); + connectorThread.start(); + long timeNow = System.currentTimeMillis(); + connectorThread.join(ApplicationRegistry.timeout); + + if (!_connected && (System.currentTimeMillis() - timeNow >= ApplicationRegistry.timeout)) + { + throw new Exception("Qpid server connection timed out"); + } } + private class ConnectorThread implements Runnable + { + public void run() + { + try + { + _connected = false; + _jmxc.connect(); + _connected = true; + } + catch (Exception ex) + { + MBeanUtility.printStackTrace(ex); + } + } + } /** * removes all listeners from the mbean server. This is required when user * disconnects the Qpid server connection diff --git a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java index 228bd1ec6f..c6cbac0ba8 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java +++ b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java @@ -8,6 +8,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.log4j.Logger; import javax.jms.JMSException; +import javax.jms.DeliveryMode; import java.io.IOException; @@ -16,7 +17,7 @@ public class HeapExhaustion extends TestCase { private static final Logger _logger = Logger.getLogger(HeapExhaustion.class); - protected QpidClientConnection conn; + protected QpidClientConnection conn; protected final String BROKER = "localhost"; protected final String vhost = "/test"; protected final String queue = "direct://amq.direct//queue"; @@ -54,7 +55,7 @@ public class HeapExhaustion extends TestCase * * @throws Exception on error */ - public void testUntilFailure() throws Exception + public void testUntilFailureTransient() throws Exception { int copies = 0; int total = 0; @@ -62,7 +63,7 @@ public class HeapExhaustion extends TestCase int size = payload.getBytes().length; while (true) { - conn.put(queue, payload, 1); + conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT); copies++; total += size; System.out.println("put copy " + copies + " OK for total bytes: " + total); @@ -74,7 +75,7 @@ public class HeapExhaustion extends TestCase * * @throws Exception on error */ - public void testUntilFailureWithDelays() throws Exception + public void testUntilFailureWithDelaysTransient() throws Exception { int copies = 0; int total = 0; @@ -82,7 +83,7 @@ public class HeapExhaustion extends TestCase int size = payload.getBytes().length; while (true) { - conn.put(queue, payload, 1); + conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT); copies++; total += size; System.out.println("put copy " + copies + " OK for total bytes: " + total); @@ -109,7 +110,7 @@ public class HeapExhaustion extends TestCase _logger.info("Running testUntilFailure"); try { - he.testUntilFailure(); + he.testUntilFailureTransient(); } catch (FailoverException fe) { @@ -158,7 +159,7 @@ public class HeapExhaustion extends TestCase _logger.info("Running testUntilFailure"); try { - he.testUntilFailureWithDelays(); + he.testUntilFailureWithDelaysTransient(); } catch (FailoverException fe) { diff --git a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java index 80773c102d..d3f064293e 100644 --- a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -16,6 +16,7 @@ import javax.jms.MessageProducer; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.TextMessage; +import javax.jms.DeliveryMode; public class QpidClientConnection implements ExceptionListener { @@ -150,7 +151,7 @@ public class QpidClientConnection implements ExceptionListener * * @throws javax.jms.JMSException any exception that occurs */ - public void put(String queueName, String payload, int copies) throws JMSException + public void put(String queueName, String payload, int copies, int deliveryMode) throws JMSException { if (!connected) { @@ -162,6 +163,8 @@ public class QpidClientConnection implements ExceptionListener final MessageProducer sender = session.createProducer(queue); + sender.setDeliveryMode(deliveryMode); + for (int i = 0; i < copies; i++) { Message m = session.createTextMessage(payload + i); |
