diff options
| author | Keith Wall <kwall@apache.org> | 2014-04-29 08:29:28 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-04-29 08:29:28 +0000 |
| commit | 5953ca009492eb4d40a63960ea1d8f1854351548 (patch) | |
| tree | 4076c42aeeed98e4bdd668fb941b8360596a8aeb /qpid/java/bdbstore/src/main | |
| parent | 1eecf05ef31eebe90631208ba1bf005167b9f234 (diff) | |
| download | qpid-python-5953ca009492eb4d40a63960ea1d8f1854351548.tar.gz | |
QPID-5715: [Java Broker]: Wire up the BDB HA VirtualHostNode to the ReplicatedEnvironmentFacade.
* Attributes priority, quorumOverride, designatedPrimary are exposed as read/write attributes.
* Attribute role is readable (to observe the current role of the node), and writable, to request a change in mastership.
* Attributes joinTime and lastKnownReplicationTransactionId are exposed as derived attributes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1590917 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
3 files changed, 328 insertions, 26 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 0b00800b04..0f839ea02d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -81,14 +81,17 @@ import com.sleepycat.je.utilint.VLSN; public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener { + public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval"; public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout"; public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval"; private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class); + private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60; private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000; private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000; + private static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT); private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT); private static final int REMOTE_NODE_MONITOR_INTERVAL = Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, DEFAULT_REMOTE_NODE_MONITOR_INTERVAL); @@ -145,14 +148,13 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING); private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>(); private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); + private final AtomicBoolean _initialised; + private final EnvironmentFacadeTask[] _initialisationTasks; private volatile ReplicatedEnvironment _environment; private volatile long _joinTime; private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; - private AtomicBoolean _initialised; - private EnvironmentFacadeTask[] _initialisationTasks; - public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks) { _environmentDirectory = new File(configuration.getStorePath()); @@ -214,8 +216,14 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan shutdownAndAwaitExecutorService(_environmentJobExecutor); shutdownAndAwaitExecutorService(_groupChangeExecutor); - closeDatabases(); - closeEnvironment(); + try + { + closeDatabases(); + } + finally + { + closeEnvironment(); + } } finally { @@ -634,10 +642,52 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + public Future<Void> transferMasterToSelfAsynchronously() + { + final String nodeName = getNodeName(); + return transferMasterAsynchronously(nodeName); + } + + public Future<Void> transferMasterAsynchronously(final String nodeName) + { + return _groupChangeExecutor.submit(new Callable<Void>() + { + @Override + public Void call() throws Exception + { + try + { + ReplicationGroupAdmin admin = createReplicationGroupAdmin(); + String newMaster = admin.transferMaster(Collections.singleton(nodeName), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("The mastership has been transfered to " + newMaster); + } + } + catch (DatabaseException e) + { + LOGGER.warn("Exception on transfering the mastership to " + _prettyGroupNodeName + + " Master transfer timeout : " + MASTER_TRANSFER_TIMEOUT, e); + throw e; + } + return null; + } + }); + } + + public void removeNodeFromGroup(final String nodeName) + { + createReplicationGroupAdmin().removeMember(nodeName); + } + + public void updateAddress(final String nodeName, final String newHostName, final int newPort) + { + createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); + } public long getJoinTime() { - return _joinTime ; + return _joinTime; } public long getLastKnownReplicationTransactionId() @@ -669,16 +719,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return members; } - public void removeNodeFromGroup(final String nodeName) - { - createReplicationGroupAdmin().removeMember(nodeName); - } - - public void updateAddress(final String nodeName, final String newHostName, final int newPort) - { - createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); - } - private ReplicationGroupAdmin createReplicationGroupAdmin() { final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); @@ -690,7 +730,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers); } - public ReplicatedEnvironment getEnvironment() { return _environment; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java index f0325b24f8..0e92ac83de 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java @@ -34,7 +34,10 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends public static final String DESIGNATED_PRIMARY = "designatedPrimary"; public static final String PRIORITY = "priority"; public static final String QUORUM_OVERRIDE = "quorumOverride"; + public static final String ROLE = "role"; public static final String REPLICATED_ENVIRONMENT_CONFIGURATION = "replicatedEnvironmentConfiguration"; + public static final String LAST_KNOWN_REPLICATION_TRANSACTION_ID = "lastKnownReplicationTransactionId"; + public static final String JOIN_TIME = "joinTime"; @ManagedAttribute(automate = true, mandatory=true) String getGroupName(); @@ -61,5 +64,14 @@ public interface BDBHAVirtualHostNode<X extends BDBHAVirtualHostNode<X>> extends int getQuorumOverride(); @ManagedAttribute(automate = true) + String getRole(); + + @ManagedAttribute(automate = true) Map<String, String> getReplicatedEnvironmentConfiguration(); + + @ManagedAttribute(derived = true) + Long getLastKnownReplicationTransactionId(); + + @ManagedAttribute(derived = true) + Long getJoinTime(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 8b4948da08..8b2dce4168 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -23,13 +23,16 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb; import java.security.PrivilegedAction; import java.util.HashMap; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; -import com.sleepycat.je.rep.StateChangeEvent; -import com.sleepycat.je.rep.StateChangeListener; import org.apache.log4j.Logger; - +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -42,20 +45,34 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer; import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHost; import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostState; import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; + @ManagedObject( category = false, type = "BDB_HA" ) public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl> { + /** + * Length of time we synchronously await the a JE mutation to complete. It is not considered an error if we exceed this timeout, although a + * a warning will be logged. + */ + private static final int MUTATE_JE_TIMEOUT_MS = 100; + private static final Logger LOGGER = Logger.getLogger(BDBHAVirtualHostNodeImpl.class); + private final AtomicReference<ReplicatedEnvironmentFacade> _environmentFacade = new AtomicReference<>(); + @ManagedAttributeField private Map<String, String> _environmentConfiguration; @@ -77,18 +94,22 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @ManagedAttributeField private boolean _coalescingSync; - @ManagedAttributeField + @ManagedAttributeField(afterSet="postSetDesignatedPrimary") private boolean _designatedPrimary; - @ManagedAttributeField + @ManagedAttributeField(afterSet="postSetPriority") private int _priority; - @ManagedAttributeField + @ManagedAttributeField(afterSet="postSetQuorumOverride") private int _quorumOverride; + @ManagedAttributeField(beforeSet="preSetRole", afterSet="postSetRole") + private String _role; + @ManagedAttributeField private Map<String, String> _replicatedEnvironmentConfiguration; + @ManagedObjectFactoryConstructor public BDBHAVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> broker) { @@ -162,6 +183,39 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @Override + public String getRole() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + return environmentFacade.getNodeState(); + } + return "UNKNOWN"; + } + + @Override + public Long getLastKnownReplicationTransactionId() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + return environmentFacade.getLastKnownReplicationTransactionId(); + } + return -1L; + } + + @Override + public Long getJoinTime() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + return environmentFacade.getJoinTime(); + } + return -1L; + } + + @Override public Map<String, String> getReplicatedEnvironmentConfiguration() { return _replicatedEnvironmentConfiguration; @@ -171,7 +225,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu public String toString() { return "BDBHAVirtualHostNodeImpl [id=" + getId() + ", name=" + getName() + ", storePath=" + _storePath + ", groupName=" + _groupName + ", address=" + _address - + ", state=" + getState() + "]"; + + ", state=" + getState() + ", priority=" + _priority + ", designatedPrimary=" + _designatedPrimary + ", designatedPrimary=" + _quorumOverride + "]"; } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -223,9 +277,26 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.CREATED()); getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.STORE_LOCATION(getStorePath())); - ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) getConfigurationStore().getEnvironmentFacade(); environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener()); + _environmentFacade.set(environmentFacade); + } + + @Override + protected void stop() + { + try + { + super.stop(); + } + finally + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (_environmentFacade.compareAndSet(environmentFacade, null)) + { + environmentFacade.close(); + } + } } private void onMaster() @@ -348,9 +419,188 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } - private class ReplicaVirtualHost extends BDBHAVirtualHost + // used as post action by field _priority + @SuppressWarnings("unused") + private void postSetPriority() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + try + { + environmentFacade.setPriority(_priority).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Node priority changed. " + this); + } + } + catch (TimeoutException e) + { + LOGGER.warn("Change node priority did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _priority + " will become effective once the JE task thread is free."); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + throw new ServerScopedRuntimeException("Failed to set priority node to value " + _priority + " on " + this, e); + } + } + } + + // used as post action by field _designatedPrimary + @SuppressWarnings("unused") + private void postSetDesignatedPrimary() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + try + { + environmentFacade.setDesignatedPrimary(_designatedPrimary).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Designated primary changed. " + this); + } + } + catch (TimeoutException e) + { + LOGGER.warn("Change designated primary did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _designatedPrimary + " will become effective once the JE task thread is free."); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + throw new ServerScopedRuntimeException("Failed to set designated primary to value " + _designatedPrimary + " on " + this, e); + } + } + } + + // used as post action by field _quorumOverride + @SuppressWarnings("unused") + private void postSetQuorumOverride() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + try + { + environmentFacade.setElectableGroupSizeOverride(_quorumOverride).get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Quorum override changed. " + this); + } + } + catch (TimeoutException e) + { + LOGGER.warn("Change quorum override did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _durability + " will become effective once the JE task thread is free."); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + throw new ServerScopedRuntimeException("Failed to set quorum override to value " + _quorumOverride + " on " + this, e); + } + } + } + + // used as pre action by field _role + @SuppressWarnings("unused") + private void preSetRole() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + String currentRole = environmentFacade.getNodeState(); + if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole)) + { + throw new IllegalConfigurationException("Cannot transfer mastership when node is not in a replica role." + + "Current role is " + currentRole); + } + } + else + { + // Ignored + } + } + + // used as post action by field _role + @SuppressWarnings("unused") + private void postSetRole() + { + ReplicatedEnvironmentFacade environmentFacade = _environmentFacade.get(); + if (environmentFacade != null) + { + try + { + environmentFacade.transferMasterToSelfAsynchronously().get(MUTATE_JE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Requested master transfer to self. " + this); + } + } + catch (TimeoutException e) + { + LOGGER.warn("Transfer master did not complete within " + MUTATE_JE_TIMEOUT_MS + "ms. Node may still be elected master at a later time."); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + throw new ServerScopedRuntimeException("Failed to transfer master to " + this, e); + } + } + else + { + // Ignored + } + } + + // TODO - need a better way of suppressing the persistence of the role field. + @Override + public ConfiguredObjectRecord asObjectRecord() { + final ConfiguredObjectRecord underlying = super.asObjectRecord(); + return new ConfiguredObjectRecord() + { + @Override + public String getType() + { + return underlying.getType(); + } + + @Override + public Map<String, ConfiguredObjectRecord> getParents() + { + return underlying.getParents(); + } + + @Override + public UUID getId() + { + return underlying.getId(); + } + + @Override + public Map<String, Object> getAttributes() + { + Map<String, Object> copy = new HashMap<String, Object>(underlying.getAttributes()); + copy.remove(BDBHAVirtualHostNode.ROLE); + return copy; + } + }; + } + + private class ReplicaVirtualHost extends BDBHAVirtualHost + { ReplicaVirtualHost(Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) { super(attributes, virtualHostNode); @@ -372,4 +622,5 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu return super.setState(currentState, desiredState); } } + } |
