diff options
| author | Keith Wall <kwall@apache.org> | 2012-05-22 18:18:50 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-05-22 18:18:50 +0000 |
| commit | 5b4f8623bd816668c3526088b1dd474f7573fcc3 (patch) | |
| tree | 4de57e643be308df7ee9b0768627e1e2d35dbede /qpid/java | |
| parent | 31295e25f33cb49f1ad23fb2e75ff252df40471e (diff) | |
| download | qpid-python-5b4f8623bd816668c3526088b1dd474f7573fcc3.tar.gz | |
QPID-4006: BDB HA. Make passivation async to avoid holding up the BDB thread. Introduce VirtualHost ERROR state to be used when virtual host is unable to activate or passivate itself completely. Change MULTISYNC mode to use WRITE_NO_SYNC. Stop relying on Monitor nodes to perform some tests.
Work of Robbie Gemmell <robbie@apache.org> and myself.
squashme
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1341584 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
7 files changed, 175 insertions, 173 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index f887c8ce36..0f1349527c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -67,7 +68,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess { private static final String MUTLI_SYNC = "MUTLI_SYNC"; private static final String DEFAULT_REPLICATION_POLICY = - MUTLI_SYNC + "," + SyncPolicy.NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name(); + MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name(); private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); @@ -107,7 +108,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess if(_replicationPolicy.startsWith(MUTLI_SYNC)) { - _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.SYNC.name())); + _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.WRITE_NO_SYNC.name())); _localMultiSyncCommits = true; } else @@ -388,11 +389,11 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess activateStoreAsync(); break; case REPLICA: - passivateStore(); + passivateStoreAsync(); break; case DETACHED: LOGGER.error("BDB replicated node in detached state, therefore passivating."); - passivateStore(); + passivateStoreAsync(); break; case UNKNOWN: LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)"); @@ -403,20 +404,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess } } - /** synchronously calls passivate. This is acceptable because {@link HAMessageStore#passivate()} is expected to be fast */ - private void passivateStore() - { - try - { - passivate(); - } - catch(Exception e) - { - LOGGER.error("Unable to passivate", e); - throw new RuntimeException("Unable to passivate", e); - } - } - /** * Calls {@link MessageStore#activate()}. * @@ -429,34 +416,93 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess */ private void activateStoreAsync() { + String threadName = "BDBHANodeActivationThread-" + _name; + executeStateChangeAsync(new Callable<Void>() + { + @Override + public Void call() throws Exception + { + try + { + activate(); + } + catch (Exception e) + { + LOGGER.error("Failed to activate on hearing MASTER change event",e); + throw e; + } + return null; + } + }, threadName); + } + + /** + * Calls {@link #passivate()}. + * + * <p/> + * This is done a background thread, in line with + * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because + * passivation due to the effect of state change listeners. + */ + private void passivateStoreAsync() + { + String threadName = "BDBHANodePassivationThread-" + _name; + executeStateChangeAsync(new Callable<Void>() + { + + @Override + public Void call() throws Exception + { + try + { + passivate(); + } + catch (Exception e) + { + LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event",e); + throw e; + } + return null; + } + }, threadName); + } + + private void executeStateChangeAsync(final Callable<Void> callable, final String threadName) + { final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger(); _executor.execute(new Runnable() { - private static final String _THREAD_NAME = "BDBHANodeActivationThread"; @Override public void run() { - Thread.currentThread().setName(_THREAD_NAME); - CurrentActor.set(new AbstractActor(_rootLogger) + final String originalThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName(threadName); + try { - @Override - public String getLogMessage() + CurrentActor.set(new AbstractActor(_rootLogger) + { + @Override + public String getLogMessage() + { + return threadName; + } + }); + + try { - return _THREAD_NAME; + callable.call(); + } + catch (Exception e) + { + LOGGER.error("Exception during state change", e); } - }); - - try - { - activate(); } - catch (Exception e) + finally { - LOGGER.error("Failed to activate on hearing MASTER change event",e); + Thread.currentThread().setName(originalThreadName); } - } }); } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java index 900f528b76..6c5c51a355 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java @@ -76,7 +76,7 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase // Don't start default broker provided by QBTC. } - public void testLossOfActiveNodeCausesClientToFailover() throws Exception + public void testLossOfMasterNodeCausesClientToFailover() throws Exception { final Connection connection = getConnection(_brokerFailoverUrl); @@ -93,10 +93,11 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } - public void testLossOfInactiveNodeDoesNotCauseClientToFailover() throws Exception + public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception { LOGGER.info("Connecting to " + _brokerFailoverUrl); final Connection connection = getConnection(_brokerFailoverUrl); + LOGGER.info("Got connection to cluster"); ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java index f694de61f2..12e281cd65 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java @@ -22,8 +22,6 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; import java.io.IOException; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.Destination; @@ -53,8 +51,6 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase private final int NUMBER_OF_NODES = 3; private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - // TODO Factory refactoring?? // MessageStore construction?? - @Override protected void setUp() throws Exception { @@ -86,7 +82,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase { try { - getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber)); + getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithoutRetry(brokerPortNumber)); connectionSuccesses++; } catch(JMSException e) @@ -105,7 +101,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase final Connection initialConnection = getConnectionToNodeInCluster(); assertNotNull(initialConnection); - killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + killConnectionBroker(initialConnection); final Connection subsequentConnection = getConnectionToNodeInCluster(); assertNotNull(subsequentConnection); @@ -121,7 +117,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase final Connection initialConnection = getConnectionToNodeInCluster(); assertNotNull(initialConnection); - killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + killConnectionBroker(initialConnection); final Connection subsequentConnection = getConnectionToNodeInCluster(); assertNotNull(subsequentConnection); @@ -159,7 +155,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); - killConnectionBrokerAndWaitForNewMasterElection(initialConnection); + killConnectionBroker(initialConnection); final Connection subsequentConnection = getConnectionToNodeInCluster(); @@ -168,7 +164,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl); } - public void testRecoveryOfOutOfDateNode() throws Exception + public void xtestRecoveryOfOutOfDateNode() throws Exception { /* * TODO: Implement @@ -220,7 +216,7 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase { try { - connection = getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber)); + connection = getConnection(_clusterCreator.getConnectionUrlForSingleNodeWithRetry(brokerPortNumber)); break; } catch(JMSException je) @@ -231,26 +227,11 @@ public class HAClusterWhiteboxTest extends QpidBrokerTestCase return connection; } - private void killConnectionBrokerAndWaitForNewMasterElection(final Connection initialConnection) throws IOException, + private void killConnectionBroker(final Connection initialConnection) throws IOException, InterruptedException { - try - { - // NewMasterEvent is received twice: first for the existing master, - // second for a new master - CountDownLatch newMasterLatch = new CountDownLatch(2); - _clusterCreator.startMonitorNode(); - _clusterCreator.statListeningForNewMasterEvent(newMasterLatch); - - final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); - killBroker(initialPortNumber); - - assertTrue("New master was not elected", newMasterLatch.await(30, TimeUnit.SECONDS)); - } - finally - { - _clusterCreator.shutdownMonitor(); - } + final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + killBroker(initialPortNumber); // kill awaits the death of the child } private void assertProducingConsuming(final Connection connection) throws JMSException, Exception diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index 43cfa5f4d5..a47597942b 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -19,7 +19,6 @@ */ package org.apache.qpid.server.store.berkeleydb; -import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; @@ -32,7 +31,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -48,26 +46,22 @@ import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.url.URLSyntaxException; -import com.sleepycat.je.rep.ReplicationNode; -import com.sleepycat.je.rep.monitor.GroupChangeEvent; -import com.sleepycat.je.rep.monitor.JoinGroupEvent; -import com.sleepycat.je.rep.monitor.LeaveGroupEvent; -import com.sleepycat.je.rep.monitor.Monitor; -import com.sleepycat.je.rep.monitor.MonitorChangeListener; -import com.sleepycat.je.rep.monitor.MonitorConfig; -import com.sleepycat.je.rep.monitor.NewMasterEvent; - public class HATestClusterCreator { protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class); private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; - private static final String SINGLE_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; - private static final int CYCLECOUNT = 2; - private static final int RETRIES = 2; - private static final int CONNECTDELAY = 1000; + private static final int FAILOVER_CYCLECOUNT = 2; + private static final int FAILOVER_RETRIES = 2; + private static final int FAILOVER_CONNECTDELAY = 1000; + + private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; + private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'"; + + private static final int RETRIES = 30; + private static final int CONNECTDELAY = 75; private final QpidBrokerTestCase _testcase; private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>(); @@ -80,7 +74,6 @@ public class HATestClusterCreator private final int _numberOfNodes; private int _bdbHelperPort; private int _primaryBrokerPort; - private Monitor _monitor; public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) { @@ -216,7 +209,6 @@ public class HATestClusterCreator public void stopCluster() throws Exception { - shutdownMonitor(); for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) { try @@ -265,19 +257,38 @@ public class HATestClusterCreator { int brokerPortNumber = itr.next(); - brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, CONNECTDELAY, RETRIES)); + brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, FAILOVER_CONNECTDELAY, FAILOVER_RETRIES)); if (itr.hasNext()) { brokerList.append(";"); } } - return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, CYCLECOUNT)); + return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, FAILOVER_CYCLECOUNT)); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, false); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, true); } - public AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber) throws URLSyntaxException + private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException { - String url = String.format(SINGLE_BROKER_URL_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + final String url; + if (retryAllowed) + { + url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + } + else + { + url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber); + } + return new AMQConnectionURL(url); } @@ -343,7 +354,6 @@ public class HATestClusterCreator _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeName", nodeName); _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); - // TODO replication policy } public String getIpAddressOfBrokerHost() @@ -403,77 +413,4 @@ public class HATestClusterCreator virtualHostConfig.setProperty(configKey, newBdbHostPort); collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig); } - - public void startMonitorNode() - { - shutdownMonitor(); - - MonitorConfig config = new MonitorConfig(); - config.setGroupName(_groupName); - int monitorPort = _testcase.findFreePort(); - config.setNodeName(getNodeNameForNodeAt(monitorPort)); - config.setNodeHostPort("" + monitorPort); - config.setHelperHosts(getHelperHostPort()); - - _monitor = new Monitor(config); - - ReplicationNode currentMaster = _monitor.register(); - LOGGER.info("Current master " + currentMaster.getName()); - } - - public void startListening(MonitorChangeListener listener) throws IOException - { - _monitor.startListener(listener); - } - - public void statListeningForNewMasterEvent(final CountDownLatch latch) throws IOException - { - startListening(new MonitorChangeListenerSupport(){ - @Override - public void notify(NewMasterEvent newMasterEvent) - { - LOGGER.debug("New master is elected " + newMasterEvent.getMasterName()); - latch.countDown(); - } - }); - } - - public void shutdownMonitor() - { - if (_monitor != null) - { - try - { - _monitor.shutdown(); - } - catch (Exception e) - { - LOGGER.warn("Monitor shutdown error:", e); - } - } - } - - public static class MonitorChangeListenerSupport implements MonitorChangeListener - { - - @Override - public void notify(NewMasterEvent newMasterEvent) - { - } - - @Override - public void notify(GroupChangeEvent groupChangeEvent) - { - } - - @Override - public void notify(JoinGroupEvent joinGroupEvent) - { - } - - @Override - public void notify(LeaveGroupEvent leaveGroupEvent) - { - } - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties index 3e640c7929..5695026cbc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties @@ -23,4 +23,6 @@ CREATED = VHT-1001 : Created : {0} CLOSED = VHT-1002 : Closed STATS_DATA = VHT-1003 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} kB/s peak : {3,number,#} bytes total -STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total`
\ No newline at end of file +STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total + +ERRORED = VHT-1005 : Unexpected fatal error
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java index fb50b3e289..55e2539dcf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java @@ -25,5 +25,7 @@ public enum State INITIALISING, ACTIVE, PASSIVE, - STOPPED + STOPPED, + /** Terminal state that signifies the virtual host has experienced an unexpected condition. */ + ERRORED } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 5a14092930..5a56fe1765 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -122,7 +122,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr private final MessageStore _messageStore; - private State _state = State.INITIALISING; + private volatile State _state = State.INITIALISING; private boolean _statisticsEnabled = false; @@ -824,17 +824,25 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr @Override public void event(Event event) { - initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); + State finalState = State.ERRORED; try { - _brokerMBean.register(); + initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); + try + { + _brokerMBean.register(); + } + catch (JMException e) + { + throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e); + } + finalState = State.ACTIVE; } - catch (JMException e) + finally { - throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e); + _state = finalState; + reportIfError(_state); } - - _state = State.ACTIVE; } } @@ -842,16 +850,33 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr { public void event(Event event) { - _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); - _brokerMBean.unregister(); - removeHouseKeepingTasks(); + State finalState = State.ERRORED; + + try + { + /* the approach here is not ideal as there is a race condition where a + * queue etc could be created while the virtual host is on the way to + * the passivated state. However the store state change from MASTER to UNKNOWN + * is documented as exceptionally rare.. + */ + + _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); + _brokerMBean.unregister(); + removeHouseKeepingTasks(); - _queueRegistry.stopAllAndUnregisterMBeans(); - _exchangeRegistry.clearAndUnregisterMbeans(); - _dtxRegistry.close(); + _queueRegistry.stopAllAndUnregisterMBeans(); + _exchangeRegistry.clearAndUnregisterMbeans(); + _dtxRegistry.close(); - _state = State.PASSIVE; + finalState = State.PASSIVE; + } + finally + { + _state = finalState; + reportIfError(_state); + } } + } private final class BeforeCloseListener implements EventListener @@ -864,6 +889,14 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr } } + private void reportIfError(State state) + { + if (state == State.ERRORED) + { + CurrentActor.get().message(VirtualHostMessages.ERRORED()); + } + } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask { public VirtualHostHouseKeepingTask() |
