summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-03-09 17:12:14 +0000
committerKeith Wall <kwall@apache.org>2015-03-09 17:12:14 +0000
commit98faeab2840203c8e4eb4526afe0fd20a596aa28 (patch)
tree665f6493dcca389d39b0a5496ad4a0eaab160ef8 /qpid/java/bdbstore/src
parent10b21b20fbd892d19ae64084165ec8942f864eac (diff)
downloadqpid-python-98faeab2840203c8e4eb4526afe0fd20a596aa28.tar.gz
Add sync/async varients to most ACO methods
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1665306 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java9
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java156
2 files changed, 119 insertions, 46 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
index dfbdce4399..926e9a956f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
@@ -24,11 +24,12 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb;
import java.security.AccessControlException;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.sleepycat.je.rep.MasterStateException;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.HighAvailabilityMessages;
@@ -150,7 +151,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
}
@StateTransition(currentState = {State.ACTIVE, State.UNAVAILABLE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
String nodeName = getName();
@@ -170,6 +171,8 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
{
throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e);
}
+
+ return Futures.immediateFuture(null);
}
protected void afterSetRole()
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 2000897e87..6a4e048e5c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -42,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LogWriteException;
import com.sleepycat.je.rep.NodeState;
@@ -318,7 +322,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
@Override
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
if (LOGGER.isDebugEnabled())
{
@@ -352,6 +356,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), nodeAddress));
shutdownOnIntruder(nodeAddress);
+
throw new IllegalStateException("Intruder node detected: " + nodeAddress);
}
}
@@ -367,24 +372,49 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
environmentFacade.setReplicationGroupListener(new RemoteNodesDiscoverer());
environmentFacade.setPermittedNodes(_permittedNodes);
}
+
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
- protected void doStop()
+ protected ListenableFuture<Void> doStop()
{
- try
- {
- super.doStop();
- }
- finally
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ ListenableFuture<Void> superFuture = super.doStop();
+ Futures.addCallback(superFuture, new FutureCallback<Void>()
{
- closeEnvironment();
+ @Override
+ public void onSuccess(final Void result)
+ {
+ doFinally();
+ }
- // closing the environment does not cause a state change. Adjust the role
- // so that our observers will see DETACHED rather than our previous role in the group.
- _lastRole.set(NodeRole.DETACHED);
- attributeSet(ROLE, _role, NodeRole.DETACHED);
- }
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ doFinally();
+ }
+
+ private void doFinally()
+ {
+ try
+ {
+ closeEnvironment();
+
+ // closing the environment does not cause a state change. Adjust the role
+ // so that our observers will see DETACHED rather than our previous role in the group.
+ _lastRole.set(NodeRole.DETACHED);
+ attributeSet(ROLE, _role, NodeRole.DETACHED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+
+ }
+ });
+ return returnVal;
}
private void closeEnvironment()
@@ -397,43 +427,60 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
@StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED )
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
- // get helpers before close. on close all children are closed and not available anymore
- Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
- super.doDelete();
-
- if (getConfigurationStore() != null)
- {
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
- }
+ final SettableFuture<Void> returnVal = SettableFuture.create();
- if (getState() == State.DELETED && !helpers.isEmpty())
+ // get helpers before close. on close all children are closed and not available anymore
+ final Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
+ final ListenableFuture<Void> superFuture = super.doDelete();
+ superFuture.addListener(new Runnable()
{
- try
+ @Override
+ public void run()
{
- new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName());
- }
- catch(DatabaseException e)
- {
- LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage()
- + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required.");
+ try
+ {
+ if (getConfigurationStore() != null)
+ {
+ getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
+ }
+
+ if (getState() == State.DELETED && !helpers.isEmpty())
+ {
+ try
+ {
+ new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName());
+ }
+ catch(DatabaseException e)
+ {
+ LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage()
+ + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required.");
+ }
+ }
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
}
- }
+ }, getTaskExecutor().getExecutor());
+
+ return returnVal;
}
@Override
- protected void deleteVirtualHostIfExists()
+ protected ListenableFuture<Void> deleteVirtualHostIfExists()
{
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getReplicatedEnvironmentFacade();
if (replicatedEnvironmentFacade != null && replicatedEnvironmentFacade.isMaster()
&& replicatedEnvironmentFacade.getNumberOfElectableGroupMembers() == 1)
{
- super.deleteVirtualHostIfExists();
+ return super.deleteVirtualHostIfExists();
}
else
{
- closeVirtualHostIfExist();
+ return closeVirtualHostIfExist();
}
}
@@ -553,7 +600,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
try
{
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
getConfigurationStore().upgradeStoreStructure();
@@ -640,7 +687,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
try
{
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
Map<String, Object> hostAttributes = new HashMap<>();
hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION);
@@ -654,13 +701,32 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
}
- protected void closeVirtualHostIfExist()
+ protected ListenableFuture<Void> closeVirtualHostIfExist()
{
- VirtualHost<?,?,?> virtualHost = getVirtualHost();
+ final VirtualHost<?,?,?> virtualHost = getVirtualHost();
if (virtualHost!= null)
{
- virtualHost.close();
- childRemoved(virtualHost);
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+ virtualHost.closeAsync().addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ childRemoved(virtualHost);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+ }
+ }, getTaskExecutor().getExecutor());
+ return returnVal;
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
@@ -687,15 +753,19 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
onReplica();
break;
case DETACHED:
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
break;
case UNKNOWN:
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
break;
default:
LOGGER.error("Unexpected state change: " + state);
}
}
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
finally
{
NodeRole newRole = NodeRole.fromJeState(state);
@@ -1137,7 +1207,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
try
{
- close();
+ closeAsync();
}
finally
{