diff options
| author | Keith Wall <kwall@apache.org> | 2015-03-09 17:12:14 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-03-09 17:12:14 +0000 |
| commit | 98faeab2840203c8e4eb4526afe0fd20a596aa28 (patch) | |
| tree | 665f6493dcca389d39b0a5496ad4a0eaab160ef8 /qpid/java/bdbstore/src | |
| parent | 10b21b20fbd892d19ae64084165ec8942f864eac (diff) | |
| download | qpid-python-98faeab2840203c8e4eb4526afe0fd20a596aa28.tar.gz | |
Add sync/async varients to most ACO methods
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1665306 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
2 files changed, 119 insertions, 46 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java index dfbdce4399..926e9a956f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java @@ -24,11 +24,12 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb; import java.security.AccessControlException; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.sleepycat.je.rep.MasterStateException; - import org.apache.log4j.Logger; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.HighAvailabilityMessages; @@ -150,7 +151,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB } @StateTransition(currentState = {State.ACTIVE, State.UNAVAILABLE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { String nodeName = getName(); @@ -170,6 +171,8 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB { throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e); } + + return Futures.immediateFuture(null); } protected void afterSetRole() diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 2000897e87..6a4e048e5c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -42,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.LogWriteException; import com.sleepycat.je.rep.NodeState; @@ -318,7 +322,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @Override - protected void activate() + protected ListenableFuture<Void> activate() { if (LOGGER.isDebugEnabled()) { @@ -352,6 +356,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), nodeAddress)); shutdownOnIntruder(nodeAddress); + throw new IllegalStateException("Intruder node detected: " + nodeAddress); } } @@ -367,24 +372,49 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu environmentFacade.setReplicationGroupListener(new RemoteNodesDiscoverer()); environmentFacade.setPermittedNodes(_permittedNodes); } + + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) - protected void doStop() + protected ListenableFuture<Void> doStop() { - try - { - super.doStop(); - } - finally + final SettableFuture<Void> returnVal = SettableFuture.create(); + + ListenableFuture<Void> superFuture = super.doStop(); + Futures.addCallback(superFuture, new FutureCallback<Void>() { - closeEnvironment(); + @Override + public void onSuccess(final Void result) + { + doFinally(); + } - // closing the environment does not cause a state change. Adjust the role - // so that our observers will see DETACHED rather than our previous role in the group. - _lastRole.set(NodeRole.DETACHED); - attributeSet(ROLE, _role, NodeRole.DETACHED); - } + @Override + public void onFailure(final Throwable t) + { + doFinally(); + } + + private void doFinally() + { + try + { + closeEnvironment(); + + // closing the environment does not cause a state change. Adjust the role + // so that our observers will see DETACHED rather than our previous role in the group. + _lastRole.set(NodeRole.DETACHED); + attributeSet(ROLE, _role, NodeRole.DETACHED); + } + finally + { + returnVal.set(null); + } + + } + }); + return returnVal; } private void closeEnvironment() @@ -397,43 +427,60 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { - // get helpers before close. on close all children are closed and not available anymore - Set<InetSocketAddress> helpers = getRemoteNodeAddresses(); - super.doDelete(); - - if (getConfigurationStore() != null) - { - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED()); - } + final SettableFuture<Void> returnVal = SettableFuture.create(); - if (getState() == State.DELETED && !helpers.isEmpty()) + // get helpers before close. on close all children are closed and not available anymore + final Set<InetSocketAddress> helpers = getRemoteNodeAddresses(); + final ListenableFuture<Void> superFuture = super.doDelete(); + superFuture.addListener(new Runnable() { - try + @Override + public void run() { - new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName()); - } - catch(DatabaseException e) - { - LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage() - + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required."); + try + { + if (getConfigurationStore() != null) + { + getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED()); + } + + if (getState() == State.DELETED && !helpers.isEmpty()) + { + try + { + new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName()); + } + catch(DatabaseException e) + { + LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage() + + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required."); + } + } + } + finally + { + returnVal.set(null); + } } - } + }, getTaskExecutor().getExecutor()); + + return returnVal; } @Override - protected void deleteVirtualHostIfExists() + protected ListenableFuture<Void> deleteVirtualHostIfExists() { ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getReplicatedEnvironmentFacade(); if (replicatedEnvironmentFacade != null && replicatedEnvironmentFacade.isMaster() && replicatedEnvironmentFacade.getNumberOfElectableGroupMembers() == 1) { - super.deleteVirtualHostIfExists(); + return super.deleteVirtualHostIfExists(); } else { - closeVirtualHostIfExist(); + return closeVirtualHostIfExist(); } } @@ -553,7 +600,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { try { - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); getConfigurationStore().upgradeStoreStructure(); @@ -640,7 +687,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { try { - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); Map<String, Object> hostAttributes = new HashMap<>(); hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION); @@ -654,13 +701,32 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } - protected void closeVirtualHostIfExist() + protected ListenableFuture<Void> closeVirtualHostIfExist() { - VirtualHost<?,?,?> virtualHost = getVirtualHost(); + final VirtualHost<?,?,?> virtualHost = getVirtualHost(); if (virtualHost!= null) { - virtualHost.close(); - childRemoved(virtualHost); + final SettableFuture<Void> returnVal = SettableFuture.create(); + virtualHost.closeAsync().addListener(new Runnable() + { + @Override + public void run() + { + try + { + childRemoved(virtualHost); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor()); + return returnVal; + } + else + { + return Futures.immediateFuture(null); } } @@ -687,15 +753,19 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu onReplica(); break; case DETACHED: - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); break; case UNKNOWN: - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); break; default: LOGGER.error("Unexpected state change: " + state); } } + catch (InterruptedException | ExecutionException e) + { + throw new ServerScopedRuntimeException(e); + } finally { NodeRole newRole = NodeRole.fromJeState(state); @@ -1137,7 +1207,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu try { - close(); + closeAsync(); } finally { |
