From 49c6109ef7370409c93bc35b6a9ddf58e6fca465 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Thu, 3 May 2007 13:41:35 +0000 Subject: Merged revisions 533704-533720,533722-533763,533766-533818,533820-533839,533841-533859,533862-534112,534114-534117 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r533704 | bhupendrab | 2007-04-30 12:37:59 +0100 (Mon, 30 Apr 2007) | 1 line Added time-out for secure server connection. ........ r534036 | ritchiem | 2007-05-01 13:28:03 +0100 (Tue, 01 May 2007) | 1 line QPID-461 Update to CommitRollbackTest. Ensuring messages received have the correct redelivered value, regardless of order. Different test case also was problematic. ........ r534117 | ritchiem | 2007-05-01 16:22:17 +0100 (Tue, 01 May 2007) | 4 lines Comments and Test changes VirtualHost Added comments HeapExhaustion - Updated to send transient messages. QpidClientConnection - Allowed specification of type of message to send. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@534856 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/virtualhost/VirtualHost.java | 263 +++++++++++++++++++++ .../test/unit/transacted/CommitRollbackTest.java | 34 ++- java/management/eclipse-plugin/bin/qpidmc.bat | 2 +- java/management/eclipse-plugin/bin/qpidmc.sh | 2 +- .../qpid/management/ui/ApplicationRegistry.java | 1 + .../qpid/management/ui/jmx/JMXServerRegistry.java | 43 ++-- .../apache/qpid/server/failure/HeapExhaustion.java | 15 +- .../apache/qpid/testutil/QpidClientConnection.java | 5 +- 8 files changed, 334 insertions(+), 31 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 7e329a5274..15edb2289f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -1,3 +1,4 @@ +<<<<<<< .working /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -270,3 +271,265 @@ public class VirtualHost implements Accessable return _virtualHostMBean; } } +======= +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import javax.management.NotCompliantMBeanException; + +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.server.AMQBrokerManagerMBean; +import org.apache.qpid.server.security.access.AccessManager; +import org.apache.qpid.server.security.access.AccessManagerImpl; +import org.apache.qpid.server.security.access.Accessable; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.DefaultExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; + +public class VirtualHost implements Accessable +{ + private static final Logger _logger = Logger.getLogger(VirtualHost.class); + + + private final String _name; + + private QueueRegistry _queueRegistry; + + private ExchangeRegistry _exchangeRegistry; + + private ExchangeFactory _exchangeFactory; + + private MessageStore _messageStore; + + protected VirtualHostMBean _virtualHostMBean; + + private AMQBrokerManagerMBean _brokerMBean; + + private AuthenticationManager _authenticationManager; + + private AccessManager _accessManager; + + + public void setAccessableName(String name) + { + _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" + + name + ") ignored remains :" + getAccessableName()); + } + + public String getAccessableName() + { + return _name; + } + + + /** + * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any + * implementaion of an Exchange MBean should extend this class. + */ + public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost + { + public VirtualHostMBean() throws NotCompliantMBeanException + { + super(ManagedVirtualHost.class, "VirtualHost"); + } + + public String getObjectInstanceName() + { + return _name.toString(); + } + + public String getName() + { + return _name.toString(); + } + + public VirtualHost getVirtualHost() + { + return VirtualHost.this; + } + + + } // End of MBean class + + /** + * Used for testing only + * @param name + * @param store + * @throws Exception + */ + public VirtualHost(String name, MessageStore store) throws Exception + { + this(name, null, store); + } + + /** + * Normal Constructor + * @param name + * @param hostConfig + * @throws Exception + */ + public VirtualHost(String name, Configuration hostConfig) throws Exception + { + this(name, hostConfig, null); + } + + private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception + { + _name = name; + + _virtualHostMBean = new VirtualHostMBean(); + // This isn't needed to be registered + //_virtualHostMBean.register(); + + _queueRegistry = new DefaultQueueRegistry(this); + _exchangeFactory = new DefaultExchangeFactory(this); + _exchangeRegistry = new DefaultExchangeRegistry(this); + + if (store != null) + { + _messageStore = store; + } + else + { + if (hostConfig == null) + { + throw new IllegalAccessException("HostConfig and MessageStore cannot be null"); + } + initialiseMessageStore(hostConfig); + } + + _exchangeRegistry.initialise(); + + _logger.warn("VirtualHost authentication Managers require spec change to be operational."); + _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig); + + _accessManager = new AccessManagerImpl(name, hostConfig); + + _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); + _brokerMBean.register(); + } + + private void initialiseMessageStore(Configuration config) throws Exception + { + String messageStoreClass = config.getString("store.class"); + + Class clazz = Class.forName(messageStoreClass); + Object o = clazz.newInstance(); + + if (!(o instanceof MessageStore)) + { + throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + + " does not."); + } + _messageStore = (MessageStore) o; + _messageStore.configure(this, "store", config); + } + + + public T getConfiguredObject(Class instanceType, Configuration config) + { + T instance; + try + { + instance = instanceType.newInstance(); + } + catch (Exception e) + { + _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); + throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e); + } + Configurator.configure(instance); + + return instance; + } + + + public String getName() + { + return _name; + } + + public QueueRegistry getQueueRegistry() + { + return _queueRegistry; + } + + public ExchangeRegistry getExchangeRegistry() + { + return _exchangeRegistry; + } + + public ExchangeFactory getExchangeFactory() + { + return _exchangeFactory; + } + + public ApplicationRegistry getApplicationRegistry() + { + throw new UnsupportedOperationException(); + } + + public MessageStore getMessageStore() + { + return _messageStore; + } + + public AuthenticationManager getAuthenticationManager() + { + return _authenticationManager; + } + + public AccessManager getAccessManager() + { + return _accessManager; + } + + public void close() throws Exception + { + if (_messageStore != null) + { + _messageStore.close(); + } + } + + public ManagedObject getBrokerMBean() + { + return _brokerMBean; + } + + public ManagedObject getManagedObject() + { + return _virtualHostMBean; + } +} +>>>>>>> .merge-right.r534117 diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 8aaa760537..0e718da19b 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -359,17 +359,37 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); Message result = _consumer.receive(1000); - assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("1", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); - result = _consumer.receive(1000); assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("2", ((TextMessage) result).getText()); - assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + + + if (((TextMessage) result).getText().equals("2")) + { + assertTrue("Messasge is marked as redelivered", !result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + } + else + { + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + } result = _consumer.receive(1000); + if (result != null) + { + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + result = _consumer.receive(1000); + } + assertNull("test message should be null", result); } @@ -411,7 +431,7 @@ public class CommitRollbackTest extends TestCase else // or it will be msg 2 arriving the first time due to latency. { _logger.info("Message 2 wasn't prefetched so wasn't rejected"); - assertEquals("2", ((TextMessage) result).getText()); + assertEquals("2", ((TextMessage) result).getText()); } result = _consumer.receive(1000); diff --git a/java/management/eclipse-plugin/bin/qpidmc.bat b/java/management/eclipse-plugin/bin/qpidmc.bat index e444bc5811..1f3207f043 100644 --- a/java/management/eclipse-plugin/bin/qpidmc.bat +++ b/java/management/eclipse-plugin/bin/qpidmc.bat @@ -52,4 +52,4 @@ goto exit rem Slurp the command line arguments. This loop allows for an unlimited number rem of agruments (up to the command line limit, anyway). -"%JAVA_HOME%\bin\java" -Xms40m -Xmx256m -Declipse.consoleLog=false -Dsecurity=PLAIN -jar %QPIDMC_HOME%\eclipse\startup.jar org.eclipse.core.launcher.Main -launcher %QPIDMC_HOME%\eclipse\eclipse -name "Qpid Management Console" -showsplash 600 -configuration "file:%QPIDMC_HOME%\configuration" -os win32 -ws win32 -arch x86 +"%JAVA_HOME%\bin\java" -Xms40m -Xmx256m -Declipse.consoleLog=false -jar %QPIDMC_HOME%\eclipse\startup.jar org.eclipse.core.launcher.Main -launcher %QPIDMC_HOME%\eclipse\eclipse -name "Qpid Management Console" -showsplash 600 -configuration "file:%QPIDMC_HOME%\configuration" -os win32 -ws win32 -arch x86 diff --git a/java/management/eclipse-plugin/bin/qpidmc.sh b/java/management/eclipse-plugin/bin/qpidmc.sh index aae61b14c7..2472545a14 100755 --- a/java/management/eclipse-plugin/bin/qpidmc.sh +++ b/java/management/eclipse-plugin/bin/qpidmc.sh @@ -61,4 +61,4 @@ elif [ $os = "Linux" ]; then os="linux" fi -"$JAVA_HOME/bin/java" -Xms40m -Xmx256m -Declipse.consoleLog=false -Dsecurity=PLAIN -jar $QPIDMC_HOME/eclipse/startup.jar org.eclipse.core.launcher.Main -launcher $QPIDMC_HOME/eclipse/eclipse -name "Qpid Management Console" -showsplash 600 -configuration "file:$QPIDMC_HOME/configuration" -os $os -ws $ws -arch $arch +"$JAVA_HOME/bin/java" -Xms40m -Xmx256m -Declipse.consoleLog=false -jar $QPIDMC_HOME/eclipse/startup.jar org.eclipse.core.launcher.Main -launcher $QPIDMC_HOME/eclipse/eclipse -name "Qpid Management Console" -showsplash 600 -configuration "file:$QPIDMC_HOME/configuration" -os $os -ws $ws -arch $arch diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java index 0693a4fc93..f6eace3101 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java @@ -44,6 +44,7 @@ public abstract class ApplicationRegistry public static final boolean debug = Boolean.getBoolean("eclipse.consoleLog"); public static final String securityMechanism = System.getProperty("security", null); public static final String connectorClass = System.getProperty("jmxconnector"); + public static final long timeout = Long.parseLong(System.getProperty("timeout", "5000")); static { diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java index 4d4b3e87e7..f97568e275 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java @@ -59,6 +59,7 @@ import org.apache.qpid.management.ui.sasl.UsernameHashedPasswordCallbackHandler; public class JMXServerRegistry extends ServerRegistry { + private boolean _connected = false; private ObjectName _serverObjectName = null; private Map _env = null; private JMXServiceURL _jmxUrl = null; @@ -98,23 +99,12 @@ public class JMXServerRegistry extends ServerRegistry super(server); String securityMechanism = ApplicationRegistry.getSecurityMechanism(); String connectorClassName = ApplicationRegistry.getJMXConnectorClass(); - - boolean saslPluginAvailable = false; if ((securityMechanism != null) && (connectorClassName != null)) { - try - { - createSASLConnector(securityMechanism, connectorClassName); - saslPluginAvailable = true; - } - catch (Exception ex) - { - MBeanUtility.printStackTrace(ex); - } + createSASLConnector(securityMechanism, connectorClassName); } - - if (!saslPluginAvailable) + else { _jmxUrl = new JMXServiceURL(server.getUrl()); _jmxc = JMXConnectorFactory.connect(_jmxUrl, null); @@ -179,9 +169,34 @@ public class JMXServerRegistry extends ServerRegistry Object theObject = cons.newInstance(args); _jmxc = (JMXConnector)theObject; - _jmxc.connect(); + + Thread connectorThread = new Thread(new ConnectorThread()); + connectorThread.start(); + long timeNow = System.currentTimeMillis(); + connectorThread.join(ApplicationRegistry.timeout); + + if (!_connected && (System.currentTimeMillis() - timeNow >= ApplicationRegistry.timeout)) + { + throw new Exception("Qpid server connection timed out"); + } } + private class ConnectorThread implements Runnable + { + public void run() + { + try + { + _connected = false; + _jmxc.connect(); + _connected = true; + } + catch (Exception ex) + { + MBeanUtility.printStackTrace(ex); + } + } + } /** * removes all listeners from the mbean server. This is required when user * disconnects the Qpid server connection diff --git a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java index 228bd1ec6f..c6cbac0ba8 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java +++ b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java @@ -8,6 +8,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.log4j.Logger; import javax.jms.JMSException; +import javax.jms.DeliveryMode; import java.io.IOException; @@ -16,7 +17,7 @@ public class HeapExhaustion extends TestCase { private static final Logger _logger = Logger.getLogger(HeapExhaustion.class); - protected QpidClientConnection conn; + protected QpidClientConnection conn; protected final String BROKER = "localhost"; protected final String vhost = "/test"; protected final String queue = "direct://amq.direct//queue"; @@ -54,7 +55,7 @@ public class HeapExhaustion extends TestCase * * @throws Exception on error */ - public void testUntilFailure() throws Exception + public void testUntilFailureTransient() throws Exception { int copies = 0; int total = 0; @@ -62,7 +63,7 @@ public class HeapExhaustion extends TestCase int size = payload.getBytes().length; while (true) { - conn.put(queue, payload, 1); + conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT); copies++; total += size; System.out.println("put copy " + copies + " OK for total bytes: " + total); @@ -74,7 +75,7 @@ public class HeapExhaustion extends TestCase * * @throws Exception on error */ - public void testUntilFailureWithDelays() throws Exception + public void testUntilFailureWithDelaysTransient() throws Exception { int copies = 0; int total = 0; @@ -82,7 +83,7 @@ public class HeapExhaustion extends TestCase int size = payload.getBytes().length; while (true) { - conn.put(queue, payload, 1); + conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT); copies++; total += size; System.out.println("put copy " + copies + " OK for total bytes: " + total); @@ -109,7 +110,7 @@ public class HeapExhaustion extends TestCase _logger.info("Running testUntilFailure"); try { - he.testUntilFailure(); + he.testUntilFailureTransient(); } catch (FailoverException fe) { @@ -158,7 +159,7 @@ public class HeapExhaustion extends TestCase _logger.info("Running testUntilFailure"); try { - he.testUntilFailureWithDelays(); + he.testUntilFailureWithDelaysTransient(); } catch (FailoverException fe) { diff --git a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java index 80773c102d..d3f064293e 100644 --- a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -16,6 +16,7 @@ import javax.jms.MessageProducer; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.TextMessage; +import javax.jms.DeliveryMode; public class QpidClientConnection implements ExceptionListener { @@ -150,7 +151,7 @@ public class QpidClientConnection implements ExceptionListener * * @throws javax.jms.JMSException any exception that occurs */ - public void put(String queueName, String payload, int copies) throws JMSException + public void put(String queueName, String payload, int copies, int deliveryMode) throws JMSException { if (!connected) { @@ -162,6 +163,8 @@ public class QpidClientConnection implements ExceptionListener final MessageProducer sender = session.createProducer(queue); + sender.setDeliveryMode(deliveryMode); + for (int i = 0; i < copies; i++) { Message m = session.createTextMessage(payload + i); -- cgit v1.2.1