diff options
| author | Keith Wall <kwall@apache.org> | 2014-05-23 11:46:46 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-05-23 11:46:46 +0000 |
| commit | b595da7578a5e421b04cdf5590a2c9bd5af4c08d (patch) | |
| tree | 888093ed46c8625810d41c5d62d9897a38594df3 /qpid/java | |
| parent | 81105f54b26217b7a0760f00e61d5203f8c4255c (diff) | |
| download | qpid-python-b595da7578a5e421b04cdf5590a2c9bd5af4c08d.tar.gz | |
QPID-5715: [Java Broker] Prevent sporadic failure of BDB HA REST test testNewMasterElectedWhenVirtualHostIsStopped
* VHN role attribute now mutated after the completion of onMaster/onReplica event
* Made BDBHAReplicaVirtualHost a type (BDB_HA_REPLICA) within the VirtualHost category. This no-op vhost represents the virtualhost
when the node is replica (and the mastership is elsewhere within the group).
Work by Andrew MacBean <andymacbean@gmail.com> and me.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1597066 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
5 files changed, 496 insertions, 97 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java new file mode 100644 index 0000000000..1f26a8cae7 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.virtualhost.berkeleydb; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; + +import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostAlias; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.HouseKeepingTask; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +/** + Object that represents the VirtualHost whilst the VirtualHostNode is in the replica role. The + real virtualhost will be elsewhere in the group. + */ +@ManagedObject( category = false, type = "BDB_HA_REPLICA" ) +public class BDBHAReplicaVirtualHost extends AbstractConfiguredObject<BDBHAReplicaVirtualHost> + implements VirtualHostImpl<BDBHAReplicaVirtualHost, AMQQueue<?>, ExchangeImpl<?>>, + VirtualHost<BDBHAReplicaVirtualHost,AMQQueue<?>, ExchangeImpl<?>> +{ + private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + + @ManagedAttributeField + private Map<String, Object> _messageStoreSettings; + + @ManagedAttributeField + private boolean _queue_deadLetterQueueEnabled; + + @ManagedAttributeField + private long _housekeepingCheckPeriod; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutWarn; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutWarn; + @ManagedAttributeField + private int _housekeepingThreadCount; + + @ManagedObjectFactoryConstructor + public BDBHAReplicaVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) + { + super(parentsMap(virtualHostNode), attributes); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); + _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); + _messagesReceived = new StatisticsCounter("messages-received-" + getName()); + _dataReceived = new StatisticsCounter("bytes-received-" + getName()); + } + + @Override + public String getModelVersion() + { + return BrokerModel.MODEL_VERSION; + } + + @Override + protected <C extends ConfiguredObject> C addChild(final Class<C> childClass, + final Map<String, Object> attributes, + final ConfiguredObject... otherParents) + { + throwUnsupportedForReplica(); + return null; + } + + @Override + public ExchangeImpl createExchange(final Map<String, Object> attributes) + { + throwUnsupportedForReplica(); + return null; + } + + @Override + public void removeExchange(final ExchangeImpl<?> exchange, final boolean force) + throws ExchangeIsAlternateException, RequiredExchangeException + { + + } + + @Override + public MessageDestination getMessageDestination(final String name) + { + return null; + } + + @Override + public ExchangeImpl<?> getExchange(final String name) + { + return null; + } + + @Override + public AMQQueue<?> createQueue(final Map<String, Object> attributes) + { + throwUnsupportedForReplica(); + return null; + } + + @Override + public void executeTransaction(final TransactionalOperation op) + { + throwUnsupportedForReplica(); + } + + @Override + public State getState() + { + return State.UNAVAILABLE; + } + + @Override + public Collection<String> getExchangeTypeNames() + { + return getObjectFactory().getSupportedTypes(Exchange.class); + } + + @Override + public Collection<String> getSupportedExchangeTypes() + { + return getObjectFactory().getSupportedTypes(Exchange.class); + } + + @Override + public Collection<String> getSupportedQueueTypes() + { + return getObjectFactory().getSupportedTypes(Queue.class); + } + + @Override + public boolean isQueue_deadLetterQueueEnabled() + { + return false; + } + + @Override + public long getHousekeepingCheckPeriod() + { + return 0; + } + + @Override + public long getStoreTransactionIdleTimeoutClose() + { + return 0; + } + + @Override + public long getStoreTransactionIdleTimeoutWarn() + { + return 0; + } + + @Override + public long getStoreTransactionOpenTimeoutClose() + { + return 0; + } + + @Override + public long getStoreTransactionOpenTimeoutWarn() + { + return 0; + } + + @Override + public int getHousekeepingThreadCount() + { + return 0; + } + + @Override + public Map<String, Object> getMessageStoreSettings() + { + return null; + } + + @Override + public long getQueueCount() + { + return 0; + } + + @Override + public long getExchangeCount() + { + return 0; + } + + @Override + public long getConnectionCount() + { + return 0; + } + + @Override + public long getBytesIn() + { + return 0; + } + + @Override + public long getBytesOut() + { + return 0; + } + + @Override + public long getMessagesIn() + { + return 0; + } + + @Override + public long getMessagesOut() + { + return 0; + } + + @Override + public Collection<VirtualHostAlias> getAliases() + { + return Collections.emptyList(); + } + + @Override + public Collection<Connection> getConnections() + { + return Collections.emptyList(); + } + + @Override + public IConnectionRegistry getConnectionRegistry() + { + return null; + } + + @Override + public AMQQueue<?> getQueue(final String name) + { + return null; + } + + @Override + public MessageSource getMessageSource(final String name) + { + return null; + } + + @Override + public AMQQueue<?> getQueue(final UUID id) + { + return null; + } + + @Override + public Collection<AMQQueue<?>> getQueues() + { + return Collections.emptyList(); + } + + @Override + public int removeQueue(final AMQQueue<?> queue) + { + return 0; + } + + @Override + public Collection<ExchangeImpl<?>> getExchanges() + { + return Collections.emptyList(); + } + + @Override + public DurableConfigurationStore getDurableConfigurationStore() + { + return null; + } + + @Override + public ExchangeImpl<?> getExchange(final UUID id) + { + return null; + } + + @Override + public MessageDestination getDefaultDestination() + { + return null; + } + + @Override + public MessageStore getMessageStore() + { + return null; + } + + @Override + public org.apache.qpid.server.security.SecurityManager getSecurityManager() + { + return null; + } + + @Override + public void scheduleHouseKeepingTask(final long period, final HouseKeepingTask task) + { + } + + @Override + public long getHouseKeepingTaskCount() + { + return 0; + } + + @Override + public long getHouseKeepingCompletedTaskCount() + { + return 0; + } + + @Override + public int getHouseKeepingPoolSize() + { + return 0; + } + + @Override + public void setHouseKeepingPoolSize(final int newSize) + { + } + + @Override + public int getHouseKeepingActiveCount() + { + return 0; + } + + @Override + public DtxRegistry getDtxRegistry() + { + return null; + } + + @Override + public LinkRegistry getLinkRegistry(final String remoteContainerId) + { + return null; + } + + @Override + public ScheduledFuture<?> scheduleTask(final long delay, final Runnable timeoutTask) + { + return null; + } + + @Override + public void block() + { + } + + @Override + public void unblock() + { + } + + @Override + public boolean getDefaultDeadLetterQueueEnabled() + { + return false; + } + + @Override + public EventLogger getEventLogger() + { + return null; + } + + @Override + public void registerMessageReceived(final long messageSize, final long timestamp) + { + } + + @Override + public void registerMessageDelivered(final long messageSize) + { + } + + @Override + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + @Override + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + @Override + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + @Override + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + @Override + public void resetStatistics() + { + } + + private void throwUnsupportedForReplica() + { + throw new IllegalStateException("The virtual host state of " + getState() + + " does not permit this operation."); + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index d162a43834..d23ccd1510 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -45,7 +45,6 @@ import com.sleepycat.je.rep.utilint.HostPortPair; import org.apache.log4j.Logger; -import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -57,13 +56,9 @@ import org.apache.qpid.server.model.RemoteReplicationNode; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer; -import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHost; import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; @@ -85,6 +80,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu private final AtomicReference<ReplicatedEnvironmentFacade> _environmentFacade = new AtomicReference<>(); + private final AtomicReference<ReplicatedEnvironment.State> _lastReplicatedEnvironmentState = new AtomicReference<>(ReplicatedEnvironment.State.UNKNOWN); + @ManagedAttributeField private Map<String, String> _environmentConfiguration; @@ -216,12 +213,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu @Override public String getRole() { - ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); - if (environmentFacade != null) - { - return environmentFacade.getNodeState(); - } - return "UNKNOWN"; + return _lastReplicatedEnvironmentState.get().name(); } @Override @@ -264,7 +256,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu public String toString() { return "BDBHAVirtualHostNodeImpl [id=" + getId() + ", name=" + getName() + ", storePath=" + _storePath + ", groupName=" + _groupName + ", address=" + _address - + ", state=" + getState() + ", priority=" + _priority + ", designatedPrimary=" + _designatedPrimary + ", designatedPrimary=" + _quorumOverride + "]"; + + ", state=" + getState() + ", priority=" + _priority + ", designatedPrimary=" + _designatedPrimary + ", quorumOverride=" + _quorumOverride + "]"; } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -274,18 +266,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { if(childClass == VirtualHost.class) { - if ("MASTER".equals(((ReplicatedEnvironmentFacade)getConfigurationStore().getEnvironmentFacade()).getNodeState())) - { - ConfiguredObjectTypeFactory<? extends ConfiguredObject> factory = - getObjectFactory().getConfiguredObjectTypeFactory(VirtualHost.class.getSimpleName(), "BDB_HA"); - return (C) factory.create(getObjectFactory(), attributes, this); - } - else - { - ReplicaVirtualHost host = new ReplicaVirtualHost(attributes, this); - host.create(); - return (C) host; - } + return (C) getObjectFactory().create(VirtualHost.class, attributes, this); } return super.addChild(childClass, attributes, otherParents); } @@ -458,12 +439,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu Map<String, Object> hostAttributes = new HashMap<String, Object>(); hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION); hostAttributes.put(VirtualHost.NAME, getGroupName()); - hostAttributes.put(VirtualHost.TYPE, "BDB_HA"); + hostAttributes.put(VirtualHost.TYPE, "BDB_HA_REPLICA"); createChild(VirtualHost.class, hostAttributes); } catch (Exception e) { - LOGGER.error("Failed to create a replica host", e); + LOGGER.error("Failed to create a replica virtualhost", e); } } @@ -513,7 +494,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu LOGGER.error("Unexpected state change: " + state); throw new IllegalStateException("Unexpected state change: " + state); } - + _lastReplicatedEnvironmentState.set(state); attributeSet(ROLE, _role, state.name()); } } @@ -698,54 +679,4 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } - private class ReplicaVirtualHost extends BDBHAVirtualHost - { - ReplicaVirtualHost(Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) - { - super(attributes, virtualHostNode); - } - - @Override - protected void onCreate() - { - // Do not persist replica virtualhost - } - - @Override - protected <C extends ConfiguredObject> C addChild(final Class<C> childClass, - final Map<String, Object> attributes, - final ConfiguredObject... otherParents) - { - throwUnsupportedForReplica(); - return null; - } - - @Override - public ExchangeImpl createExchange(final Map<String, Object> attributes) - { - throwUnsupportedForReplica(); - return null; - } - - @Override - public AMQQueue<?> createQueue(final Map<String, Object> attributes) - { - throwUnsupportedForReplica(); - return null; - } - - @Override - public State getState() - { - return State.UNAVAILABLE; - } - - private void throwUnsupportedForReplica() - { - throw new IllegalStateException("The virtual host state of " + getState() - + " does not permit this operation."); - } - - } - } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index 209e2739f4..02e09977ed 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -27,7 +27,6 @@ import java.util.List; public interface IConnectionRegistry { public static final String BROKER_SHUTDOWN_REPLY_TEXT = "Broker is shutting down"; - public static final String VHOST_PASSIVATE_REPLY_TEXT = "Virtual host is being passivated"; public void initialise(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 56700b5ebd..6dd4124258 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -211,8 +211,14 @@ public class ServerConnectionDelegate extends ServerDelegate if(vhost != null) { - sconn.setVirtualHost(vhost); + if (vhost.getState() != State.ACTIVE) + { + sconn.setState(Connection.State.CLOSING); + sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host '"+vhostName+"' is not active")); + return; + } + sconn.setVirtualHost(vhost); try { vhost.getSecurityManager().authoriseCreateConnection(sconn); @@ -224,16 +230,8 @@ public class ServerConnectionDelegate extends ServerDelegate return; } - if (vhost.getState() != State.ACTIVE) - { - sconn.setState(Connection.State.CLOSING); - sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host '"+vhostName+"' is not active")); - } - else - { - sconn.setState(Connection.State.OPEN); - sconn.invoke(new ConnectionOpenOk(Collections.emptyList())); - } + sconn.setState(Connection.State.OPEN); + sconn.invoke(new ConnectionOpenOk(Collections.emptyList())); } else { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java index 80a66292bf..632f751756 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java @@ -81,9 +81,13 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con } else { - session.setVirtualHost(virtualHost); - // Check virtualhost access + if (virtualHost.getState() != State.ACTIVE) + { + throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active"); + } + + session.setVirtualHost(virtualHost); try { virtualHost.getSecurityManager().authoriseCreateConnection(session); @@ -93,12 +97,6 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); } - if (virtualHost.getState() != State.ACTIVE) - { - throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active"); - } - - // See Spec (0.8.2). Section 3.1.2 Virtual Hosts if (session.getContextKey() == null) { |
