summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-04-29 08:29:28 +0000
committerKeith Wall <kwall@apache.org>2014-04-29 08:29:28 +0000
commit5953ca009492eb4d40a63960ea1d8f1854351548 (patch)
tree4076c42aeeed98e4bdd668fb941b8360596a8aeb /qpid/java/bdbstore/src/main
parent1eecf05ef31eebe90631208ba1bf005167b9f234 (diff)
downloadqpid-python-5953ca009492eb4d40a63960ea1d8f1854351548.tar.gz
QPID-5715: [Java Broker]: Wire up the BDB HA VirtualHostNode to the ReplicatedEnvironmentFacade.
* Attributes priority, quorumOverride, designatedPrimary are exposed as read/write attributes. * Attribute role is readable (to observe the current role of the node), and writable, to request a change in mastership. * Attributes joinTime and lastKnownReplicationTransactionId are exposed as derived attributes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1590917 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java73
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java12
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java269
3 files changed, 328 insertions, 26 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 0b00800b04..0f839ea02d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -81,14 +81,17 @@ import com.sleepycat.je.utilint.VLSN;
public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
{
+ public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval";
public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout";
public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval";
private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class);
+ private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60;
private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000;
private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000;
+ private static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT);
private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT);
private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
@@ -145,14 +148,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING);
private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
+ private final AtomicBoolean _initialised;
+ private final EnvironmentFacadeTask[] _initialisationTasks;
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
- private AtomicBoolean _initialised;
- private EnvironmentFacadeTask[] _initialisationTasks;
-
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks)
{
_environmentDirectory = new File(configuration.getStorePath());
@@ -214,8 +216,14 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
shutdownAndAwaitExecutorService(_environmentJobExecutor);
shutdownAndAwaitExecutorService(_groupChangeExecutor);
- closeDatabases();
- closeEnvironment();
+ try
+ {
+ closeDatabases();
+ }
+ finally
+ {
+ closeEnvironment();
+ }
}
finally
{
@@ -634,10 +642,52 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ public Future<Void> transferMasterToSelfAsynchronously()
+ {
+ final String nodeName = getNodeName();
+ return transferMasterAsynchronously(nodeName);
+ }
+
+ public Future<Void> transferMasterAsynchronously(final String nodeName)
+ {
+ return _groupChangeExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ ReplicationGroupAdmin admin = createReplicationGroupAdmin();
+ String newMaster = admin.transferMaster(Collections.singleton(nodeName), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("The mastership has been transfered to " + newMaster);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ LOGGER.warn("Exception on transfering the mastership to " + _prettyGroupNodeName
+ + " Master transfer timeout : " + MASTER_TRANSFER_TIMEOUT, e);
+ throw e;
+ }
+ return null;
+ }
+ });
+ }
+
+ public void removeNodeFromGroup(final String nodeName)
+ {
+ createReplicationGroupAdmin().removeMember(nodeName);
+ }
+
+ public void updateAddress(final String nodeName, final String newHostName, final int newPort)
+ {
+ createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
+ }
public long getJoinTime()
{
- return _joinTime ;
+ return _joinTime;
}
public long getLastKnownReplicationTransactionId()
@@ -669,16 +719,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return members;
}
- public void removeNodeFromGroup(final String nodeName)
- {
- createReplicationGroupAdmin().removeMember(nodeName);
- }
-
- public void updateAddress(final String nodeName, final String newHostName, final int newPort)
- {
- createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
- }
-
private ReplicationGroupAdmin createReplicationGroupAdmin()
{
final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
@@ -690,7 +730,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers);
}
-
public ReplicatedEnvironment getEnvironment()
{
return _environment;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
index f0325b24f8..0e92ac83de 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
@@ -34,7 +34,10 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends
public static final String DESIGNATED_PRIMARY = "designatedPrimary";
public static final String PRIORITY = "priority";
public static final String QUORUM_OVERRIDE = "quorumOverride";
+ public static final String ROLE = "role";
public static final String REPLICATED_ENVIRONMENT_CONFIGURATION = "replicatedEnvironmentConfiguration";
+ public static final String LAST_KNOWN_REPLICATION_TRANSACTION_ID = "lastKnownReplicationTransactionId";
+ public static final String JOIN_TIME = "joinTime";
@ManagedAttribute(automate = true, mandatory=true)
String getGroupName();
@@ -61,5 +64,14 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends
int getQuorumOverride();
@ManagedAttribute(automate = true)
+ String getRole();
+
+ @ManagedAttribute(automate = true)
Map<String, String> getReplicatedEnvironmentConfiguration();
+
+ @ManagedAttribute(derived = true)
+ Long getLastKnownReplicationTransactionId();
+
+ @ManagedAttribute(derived = true)
+ Long getJoinTime();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 8b4948da08..8b2dce4168 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -23,13 +23,16 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
-import com.sleepycat.je.rep.StateChangeEvent;
-import com.sleepycat.je.rep.StateChangeListener;
import org.apache.log4j.Logger;
-
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
@@ -42,20 +45,34 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer;
import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHost;
import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostState;
import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
@ManagedObject( category = false, type = "BDB_HA" )
public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl>
{
+ /**
+ * Length of time we synchronously await the a JE mutation to complete. It is not considered an error if we exceed this timeout, although a
+ * a warning will be logged.
+ */
+ private static final int MUTATE_JE_TIMEOUT_MS = 100;
+
private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHostNodeImpl.class);
+ private final AtomicReference<ReplicatedEnvironmentFacade> _environmentFacade = new AtomicReference<>();
+
@ManagedAttributeField
private Map<String, String> _environmentConfiguration;
@@ -77,18 +94,22 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
@ManagedAttributeField
private boolean _coalescingSync;
- @ManagedAttributeField
+ @ManagedAttributeField(afterSet="postSetDesignatedPrimary")
private boolean _designatedPrimary;
- @ManagedAttributeField
+ @ManagedAttributeField(afterSet="postSetPriority")
private int _priority;
- @ManagedAttributeField
+ @ManagedAttributeField(afterSet="postSetQuorumOverride")
private int _quorumOverride;
+ @ManagedAttributeField(beforeSet="preSetRole", afterSet="postSetRole")
+ private String _role;
+
@ManagedAttributeField
private Map<String, String> _replicatedEnvironmentConfiguration;
+
@ManagedObjectFactoryConstructor
public BDBHAVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> broker)
{
@@ -162,6 +183,39 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
@Override
+ public String getRole()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ return environmentFacade.getNodeState();
+ }
+ return "UNKNOWN";
+ }
+
+ @Override
+ public Long getLastKnownReplicationTransactionId()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ return environmentFacade.getLastKnownReplicationTransactionId();
+ }
+ return -1L;
+ }
+
+ @Override
+ public Long getJoinTime()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ return environmentFacade.getJoinTime();
+ }
+ return -1L;
+ }
+
+ @Override
public Map<String, String> getReplicatedEnvironmentConfiguration()
{
return _replicatedEnvironmentConfiguration;
@@ -171,7 +225,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
public String toString()
{
return "BDBHAVirtualHostNodeImpl [id=" + getId() + ", name=" + getName() + ", storePath=" + _storePath + ", groupName=" + _groupName + ", address=" + _address
- + ", state=" + getState() + "]";
+ + ", state=" + getState() + ", priority=" + _priority + ", designatedPrimary=" + _designatedPrimary + ", designatedPrimary=" + _quorumOverride + "]";
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -223,9 +277,26 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.CREATED());
getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.STORE_LOCATION(getStorePath()));
-
ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) getConfigurationStore().getEnvironmentFacade();
environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
+ _environmentFacade.set(environmentFacade);
+ }
+
+ @Override
+ protected void stop()
+ {
+ try
+ {
+ super.stop();
+ }
+ finally
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (_environmentFacade.compareAndSet(environmentFacade, null))
+ {
+ environmentFacade.close();
+ }
+ }
}
private void onMaster()
@@ -348,9 +419,188 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
}
- private class ReplicaVirtualHost extends BDBHAVirtualHost
+ // used as post action by field _priority
+ @SuppressWarnings("unused")
+ private void postSetPriority()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ try
+ {
+ environmentFacade.setPriority(_priority).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Node priority changed. " + this);
+ }
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Change node priority did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _priority + " will become effective once the JE task thread is free.");
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ throw new ServerScopedRuntimeException("Failed to set priority node to value " + _priority + " on " + this, e);
+ }
+ }
+ }
+
+ // used as post action by field _designatedPrimary
+ @SuppressWarnings("unused")
+ private void postSetDesignatedPrimary()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ try
+ {
+ environmentFacade.setDesignatedPrimary(_designatedPrimary).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Designated primary changed. " + this);
+ }
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Change designated primary did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _designatedPrimary + " will become effective once the JE task thread is free.");
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ throw new ServerScopedRuntimeException("Failed to set designated primary to value " + _designatedPrimary + " on " + this, e);
+ }
+ }
+ }
+
+ // used as post action by field _quorumOverride
+ @SuppressWarnings("unused")
+ private void postSetQuorumOverride()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ try
+ {
+ environmentFacade.setElectableGroupSizeOverride(_quorumOverride).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Quorum override changed. " + this);
+ }
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Change quorum override did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _durability + " will become effective once the JE task thread is free.");
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ throw new ServerScopedRuntimeException("Failed to set quorum override to value " + _quorumOverride + " on " + this, e);
+ }
+ }
+ }
+
+ // used as pre action by field _role
+ @SuppressWarnings("unused")
+ private void preSetRole()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ String currentRole = environmentFacade.getNodeState();
+ if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole))
+ {
+ throw new IllegalConfigurationException("Cannot transfer mastership when node is not in a replica role."
+ + "Current role is " + currentRole);
+ }
+ }
+ else
+ {
+ // Ignored
+ }
+ }
+
+ // used as post action by field _role
+ @SuppressWarnings("unused")
+ private void postSetRole()
+ {
+ ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get();
+ if (environmentFacade != null)
+ {
+ try
+ {
+ environmentFacade.transferMasterToSelfAsynchronously().get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Requested master transfer to self. " + this);
+ }
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Transfer master did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. Node may still be elected master at a later time.");
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ throw new ServerScopedRuntimeException("Failed to transfer master to " + this, e);
+ }
+ }
+ else
+ {
+ // Ignored
+ }
+ }
+
+ // TODO - need a better way of suppressing the persistence of the role field.
+ @Override
+ public ConfiguredObjectRecord asObjectRecord()
{
+ final ConfiguredObjectRecord underlying = super.asObjectRecord();
+ return new ConfiguredObjectRecord()
+ {
+ @Override
+ public String getType()
+ {
+ return underlying.getType();
+ }
+
+ @Override
+ public Map<String, ConfiguredObjectRecord> getParents()
+ {
+ return underlying.getParents();
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return underlying.getId();
+ }
+
+ @Override
+ public Map<String, Object> getAttributes()
+ {
+ Map<String, Object> copy = new HashMap<String, Object>(underlying.getAttributes());
+ copy.remove(BDBHAVirtualHostNode.ROLE);
+ return copy;
+ }
+ };
+ }
+
+ private class ReplicaVirtualHost extends BDBHAVirtualHost
+ {
ReplicaVirtualHost(Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
super(attributes, virtualHostNode);
@@ -372,4 +622,5 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
return super.setState(currentState, desiredState);
}
}
+
}