diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-10-01 23:48:14 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-10-01 23:48:14 +0000 |
| commit | a638bc903339cac26e522df787ad4fcbca2344aa (patch) | |
| tree | 94a5bae92749b96c229ca36590e681032f6aa752 /qpid/java/bdbstore/src/main | |
| parent | f84ed512e919a6c717cbdbcc22e8139bc64bc205 (diff) | |
| download | qpid-python-a638bc903339cac26e522df787ad4fcbca2344aa.tar.gz | |
QPID-6126: Add ability to validate CO attributes on creation, transit COs into ERRORED state if exception occurs on recovery, allow ERRORED CO restart after remediation of configuration problem
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1628867 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src/main')
2 files changed, 114 insertions, 36 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBSystemConfigImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBSystemConfigImpl.java index 0fc44605fe..5d65d6e16d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBSystemConfigImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBSystemConfigImpl.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.model.AbstractSystemConfig; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.BrokerShutdownProvider; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.SystemConfigFactoryConstructor; @@ -48,9 +49,10 @@ public class BDBSystemConfigImpl extends AbstractSystemConfig<BDBSystemConfigImp public BDBSystemConfigImpl(final TaskExecutor taskExecutor, final EventLogger eventLogger, final LogRecorder logRecorder, - final BrokerOptions brokerOptions) + final BrokerOptions brokerOptions, + final BrokerShutdownProvider brokerShutdownProvider) { - super(taskExecutor, eventLogger, logRecorder, brokerOptions); + super(taskExecutor, eventLogger, logRecorder, brokerOptions, brokerShutdownProvider); } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index e6109954c0..98b9cc3cf0 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.virtualhostnode.berkeleydb; +import java.io.File; import java.net.InetSocketAddress; +import java.nio.file.Files; import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.ArrayList; @@ -74,6 +76,7 @@ import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.store.berkeleydb.replication.ReplicationGroupListener; +import org.apache.qpid.server.util.PortUtil; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; @@ -280,20 +283,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu public void onCreate() { super.onCreate(); - if (!isFirstNodeInAGroup()) - { - try - { - int dbPingSocketTimeout = getContextKeys(false).contains("qpid.bdb.ha.db_ping_socket_timeout") ? getContextValue(Integer.class, "qpid.bdb.ha.db_ping_socket_timeout") : 10000 /* JE's own default */; - Collection<String> permittedNodes = ReplicatedEnvironmentFacade.connectToHelperNodeAndCheckPermittedHosts(getName(), getAddress(), getGroupName(), getHelperNodeName(), getHelperAddress(), dbPingSocketTimeout); - setAttribute(PERMITTED_NODES, null, new ArrayList<String>(permittedNodes)); - } - catch(IllegalConfigurationException e) - { - deleted(); - throw e; - } - } getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.CREATED()); } @@ -435,19 +424,102 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu public void onValidate() { super.onValidate(); + validatePermittedNodes(_permittedNodes); + } + + @Override + protected void postResolve() + { + super.postResolve(); _virtualHostNodeLogSubject = new BDBHAVirtualHostNodeLogSubject(getGroupName(), getName()); _groupLogSubject = new GroupLogSubject(getGroupName()); _virtualHostNodePrincipalName = MessageFormat.format(VIRTUAL_HOST_PRINCIPAL_NAME_FORMAT, getGroupName(), getName()); } @Override - public void onOpen() + public void validateOnCreate() { - super.onOpen(); + super.validateOnCreate(); - validatePermittedNodes(_permittedNodes); + validateAddress(); + + validateStorePath(); + + if (!isFirstNodeInAGroup()) + { + // validate that helper address points to valid node + // we need _permittedNodes for the further validation in onValidate + _permittedNodes = new ArrayList<>(getPermittedNodesFromHelper()); + } + } + + private Collection<String> getPermittedNodesFromHelper() + { + int dbPingSocketTimeout = getContextKeys(false).contains("qpid.bdb.ha.db_ping_socket_timeout") ? getContextValue(Integer.class, "qpid.bdb.ha.db_ping_socket_timeout") : 10000 /* JE's own default */; + return ReplicatedEnvironmentFacade.connectToHelperNodeAndCheckPermittedHosts(getName(), getAddress(), getGroupName(), getHelperNodeName(), getHelperAddress(), dbPingSocketTimeout); + } + + private void validateStorePath() + { + File storePath = new File(getStorePath()); + while (!storePath.exists()) + { + storePath = storePath.getParentFile(); + if (storePath == null) + { + throw new IllegalConfigurationException(String.format("Store path '%s' is invalid", getStorePath())); + } + } + + if (!storePath.isDirectory()) + { + throw new IllegalConfigurationException(String.format("Store path '%s' is not a folder", getStorePath())); + } + + if (!Files.isWritable(storePath.toPath())) + { + throw new IllegalConfigurationException(String.format("Store path '%s' is not writable", getStorePath())); + } } + private void validateAddress() + { + String address = getAddress(); + + if (address == null || "".equals(address)) + { + throw new IllegalConfigurationException("Node address is not set"); + } + + String[] tokens = address.split(":"); + if (tokens.length != 2) + { + throw new IllegalConfigurationException(String.format("Invalid address specified '%s'. ", address)); + } + + String hostName = tokens[0]; + if ("".equals(hostName.trim())) + { + throw new IllegalConfigurationException(String.format("Invalid address specified '%s'. ", address)); + } + + int port = -1; + try + { + port = Integer.parseInt(tokens[1]); + } + catch(Exception e) + { + throw new IllegalConfigurationException(String.format("Invalid port is specified in address '%s'. ", address)); + } + if (!PortUtil.isPortAvailable(hostName, port)) + { + throw new IllegalConfigurationException(String.format("Cannot bind to address '%s'. Address is already in use.", address)); + } + } + + + private void onMaster() { try @@ -761,7 +833,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } - private void validatePermittedNodes(List<String> proposedPermittedNodes) + private void validatePermittedNodes(Collection<String> proposedPermittedNodes) { if (getRemoteReplicationNodes().size() > 0 && getRole() != NodeRole.MASTER && !(getState() == State.STOPPED || getState() == State.ERRORED)) { @@ -772,28 +844,32 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu throw new IllegalArgumentException(String.format("Attribute '%s' is mandatory and must be set", PERMITTED_NODES)); } - String missingNodeAddress = null; - if (getPermittedNodes().contains(getAddress()) && !proposedPermittedNodes.contains(getAddress())) + if (_permittedNodes != null) { - missingNodeAddress = getAddress(); - } - else - { - for (final RemoteReplicationNode<?> node : getRemoteReplicationNodes()) + String missingNodeAddress = null; + + if (_permittedNodes.contains(getAddress()) && !proposedPermittedNodes.contains(getAddress())) { - final BDBHARemoteReplicationNode<?> bdbHaRemoteReplicationNode = (BDBHARemoteReplicationNode<?>) node; - final String remoteNodeAddress = bdbHaRemoteReplicationNode.getAddress(); - if (getPermittedNodes().contains(remoteNodeAddress) && !proposedPermittedNodes.contains(remoteNodeAddress)) + missingNodeAddress = getAddress(); + } + else + { + for (final RemoteReplicationNode<?> node : getRemoteReplicationNodes()) { - missingNodeAddress = remoteNodeAddress; - break; + final BDBHARemoteReplicationNode<?> bdbHaRemoteReplicationNode = (BDBHARemoteReplicationNode<?>) node; + final String remoteNodeAddress = bdbHaRemoteReplicationNode.getAddress(); + if (_permittedNodes.contains(remoteNodeAddress) && !proposedPermittedNodes.contains(remoteNodeAddress)) + { + missingNodeAddress = remoteNodeAddress; + break; + } } } - } - if (missingNodeAddress != null) - { - throw new IllegalArgumentException(String.format("The current group node '%s' cannot be removed from '%s' as its already a group member", missingNodeAddress, PERMITTED_NODES)); + if (missingNodeAddress != null) + { + throw new IllegalArgumentException(String.format("The current group node '%s' cannot be removed from '%s' as its already a group member", missingNodeAddress, PERMITTED_NODES)); + } } for (String permittedNode: proposedPermittedNodes) |
