From b595da7578a5e421b04cdf5590a2c9bd5af4c08d Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Fri, 23 May 2014 11:46:46 +0000 Subject: QPID-5715: [Java Broker] Prevent sporadic failure of BDB HA REST test testNewMasterElectedWhenVirtualHostIsStopped * VHN role attribute now mutated after the completion of onMaster/onReplica event * Made BDBHAReplicaVirtualHost a type (BDB_HA_REPLICA) within the VirtualHost category. This no-op vhost represents the virtualhost when the node is replica (and the mastership is elsewhere within the group). Work by Andrew MacBean and me. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1597066 13f79535-47bb-0310-9956-ffa450edef68 --- .../berkeleydb/BDBHAReplicaVirtualHost.java | 473 +++++++++++++++++++++ .../berkeleydb/BDBHAVirtualHostNodeImpl.java | 85 +--- .../server/connection/IConnectionRegistry.java | 1 - .../protocol/v0_10/ServerConnectionDelegate.java | 20 +- .../v0_8/handler/ConnectionOpenMethodHandler.java | 14 +- 5 files changed, 496 insertions(+), 97 deletions(-) create mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java new file mode 100644 index 0000000000..1f26a8cae7 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.virtualhost.berkeleydb; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; + +import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostAlias; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.HouseKeepingTask; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +/** + Object that represents the VirtualHost whilst the VirtualHostNode is in the replica role. The + real virtualhost will be elsewhere in the group. + */ +@ManagedObject( category = false, type = "BDB_HA_REPLICA" ) +public class BDBHAReplicaVirtualHost extends AbstractConfiguredObject + implements VirtualHostImpl, ExchangeImpl>, + VirtualHost, ExchangeImpl> +{ + private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + + @ManagedAttributeField + private Map _messageStoreSettings; + + @ManagedAttributeField + private boolean _queue_deadLetterQueueEnabled; + + @ManagedAttributeField + private long _housekeepingCheckPeriod; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutWarn; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutWarn; + @ManagedAttributeField + private int _housekeepingThreadCount; + + @ManagedObjectFactoryConstructor + public BDBHAReplicaVirtualHost(final Map attributes, VirtualHostNode virtualHostNode) + { + super(parentsMap(virtualHostNode), attributes); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); + _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); + _messagesReceived = new StatisticsCounter("messages-received-" + getName()); + _dataReceived = new StatisticsCounter("bytes-received-" + getName()); + } + + @Override + public String getModelVersion() + { + return BrokerModel.MODEL_VERSION; + } + + @Override + protected C addChild(final Class childClass, + final Map attributes, + final ConfiguredObject... otherParents) + { + throwUnsupportedForReplica(); + return null; + } + + @Override + public ExchangeImpl createExchange(final Map attributes) + { + throwUnsupportedForReplica(); + return null; + } + + @Override + public void removeExchange(final ExchangeImpl exchange, final boolean force) + throws ExchangeIsAlternateException, RequiredExchangeException + { + + } + + @Override + public MessageDestination getMessageDestination(final String name) + { + return null; + } + + @Override + public ExchangeImpl getExchange(final String name) + { + return null; + } + + @Override + public AMQQueue createQueue(final Map attributes) + { + throwUnsupportedForReplica(); + return null; + } + + @Override + public void executeTransaction(final TransactionalOperation op) + { + throwUnsupportedForReplica(); + } + + @Override + public State getState() + { + return State.UNAVAILABLE; + } + + @Override + public Collection getExchangeTypeNames() + { + return getObjectFactory().getSupportedTypes(Exchange.class); + } + + @Override + public Collection getSupportedExchangeTypes() + { + return getObjectFactory().getSupportedTypes(Exchange.class); + } + + @Override + public Collection getSupportedQueueTypes() + { + return getObjectFactory().getSupportedTypes(Queue.class); + } + + @Override + public boolean isQueue_deadLetterQueueEnabled() + { + return false; + } + + @Override + public long getHousekeepingCheckPeriod() + { + return 0; + } + + @Override + public long getStoreTransactionIdleTimeoutClose() + { + return 0; + } + + @Override + public long getStoreTransactionIdleTimeoutWarn() + { + return 0; + } + + @Override + public long getStoreTransactionOpenTimeoutClose() + { + return 0; + } + + @Override + public long getStoreTransactionOpenTimeoutWarn() + { + return 0; + } + + @Override + public int getHousekeepingThreadCount() + { + return 0; + } + + @Override + public Map getMessageStoreSettings() + { + return null; + } + + @Override + public long getQueueCount() + { + return 0; + } + + @Override + public long getExchangeCount() + { + return 0; + } + + @Override + public long getConnectionCount() + { + return 0; + } + + @Override + public long getBytesIn() + { + return 0; + } + + @Override + public long getBytesOut() + { + return 0; + } + + @Override + public long getMessagesIn() + { + return 0; + } + + @Override + public long getMessagesOut() + { + return 0; + } + + @Override + public Collection getAliases() + { + return Collections.emptyList(); + } + + @Override + public Collection getConnections() + { + return Collections.emptyList(); + } + + @Override + public IConnectionRegistry getConnectionRegistry() + { + return null; + } + + @Override + public AMQQueue getQueue(final String name) + { + return null; + } + + @Override + public MessageSource getMessageSource(final String name) + { + return null; + } + + @Override + public AMQQueue getQueue(final UUID id) + { + return null; + } + + @Override + public Collection> getQueues() + { + return Collections.emptyList(); + } + + @Override + public int removeQueue(final AMQQueue queue) + { + return 0; + } + + @Override + public Collection> getExchanges() + { + return Collections.emptyList(); + } + + @Override + public DurableConfigurationStore getDurableConfigurationStore() + { + return null; + } + + @Override + public ExchangeImpl getExchange(final UUID id) + { + return null; + } + + @Override + public MessageDestination getDefaultDestination() + { + return null; + } + + @Override + public MessageStore getMessageStore() + { + return null; + } + + @Override + public org.apache.qpid.server.security.SecurityManager getSecurityManager() + { + return null; + } + + @Override + public void scheduleHouseKeepingTask(final long period, final HouseKeepingTask task) + { + } + + @Override + public long getHouseKeepingTaskCount() + { + return 0; + } + + @Override + public long getHouseKeepingCompletedTaskCount() + { + return 0; + } + + @Override + public int getHouseKeepingPoolSize() + { + return 0; + } + + @Override + public void setHouseKeepingPoolSize(final int newSize) + { + } + + @Override + public int getHouseKeepingActiveCount() + { + return 0; + } + + @Override + public DtxRegistry getDtxRegistry() + { + return null; + } + + @Override + public LinkRegistry getLinkRegistry(final String remoteContainerId) + { + return null; + } + + @Override + public ScheduledFuture scheduleTask(final long delay, final Runnable timeoutTask) + { + return null; + } + + @Override + public void block() + { + } + + @Override + public void unblock() + { + } + + @Override + public boolean getDefaultDeadLetterQueueEnabled() + { + return false; + } + + @Override + public EventLogger getEventLogger() + { + return null; + } + + @Override + public void registerMessageReceived(final long messageSize, final long timestamp) + { + } + + @Override + public void registerMessageDelivered(final long messageSize) + { + } + + @Override + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + @Override + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + @Override + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + @Override + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + @Override + public void resetStatistics() + { + } + + private void throwUnsupportedForReplica() + { + throw new IllegalStateException("The virtual host state of " + getState() + + " does not permit this operation."); + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index d162a43834..d23ccd1510 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -45,7 +45,6 @@ import com.sleepycat.je.rep.utilint.HostPortPair; import org.apache.log4j.Logger; -import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -57,13 +56,9 @@ import org.apache.qpid.server.model.RemoteReplicationNode; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.model.VirtualHostNode; -import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer; -import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHost; import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; @@ -85,6 +80,8 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode _environmentFacade = new AtomicReference<>(); + private final AtomicReference _lastReplicatedEnvironmentState = new AtomicReference<>(ReplicatedEnvironment.State.UNKNOWN); + @ManagedAttributeField private Map _environmentConfiguration; @@ -216,12 +213,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode factory = - getObjectFactory().getConfiguredObjectTypeFactory(VirtualHost.class.getSimpleName(), "BDB_HA"); - return (C) factory.create(getObjectFactory(), attributes, this); - } - else - { - ReplicaVirtualHost host = new ReplicaVirtualHost(attributes, this); - host.create(); - return (C) host; - } + return (C) getObjectFactory().create(VirtualHost.class, attributes, this); } return super.addChild(childClass, attributes, otherParents); } @@ -458,12 +439,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode hostAttributes = new HashMap(); hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION); hostAttributes.put(VirtualHost.NAME, getGroupName()); - hostAttributes.put(VirtualHost.TYPE, "BDB_HA"); + hostAttributes.put(VirtualHost.TYPE, "BDB_HA_REPLICA"); createChild(VirtualHost.class, hostAttributes); } catch (Exception e) { - LOGGER.error("Failed to create a replica host", e); + LOGGER.error("Failed to create a replica virtualhost", e); } } @@ -513,7 +494,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode attributes, VirtualHostNode virtualHostNode) - { - super(attributes, virtualHostNode); - } - - @Override - protected void onCreate() - { - // Do not persist replica virtualhost - } - - @Override - protected C addChild(final Class childClass, - final Map attributes, - final ConfiguredObject... otherParents) - { - throwUnsupportedForReplica(); - return null; - } - - @Override - public ExchangeImpl createExchange(final Map attributes) - { - throwUnsupportedForReplica(); - return null; - } - - @Override - public AMQQueue createQueue(final Map attributes) - { - throwUnsupportedForReplica(); - return null; - } - - @Override - public State getState() - { - return State.UNAVAILABLE; - } - - private void throwUnsupportedForReplica() - { - throw new IllegalStateException("The virtual host state of " + getState() - + " does not permit this operation."); - } - - } - } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index 209e2739f4..02e09977ed 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -27,7 +27,6 @@ import java.util.List; public interface IConnectionRegistry { public static final String BROKER_SHUTDOWN_REPLY_TEXT = "Broker is shutting down"; - public static final String VHOST_PASSIVATE_REPLY_TEXT = "Virtual host is being passivated"; public void initialise(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 56700b5ebd..6dd4124258 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -211,8 +211,14 @@ public class ServerConnectionDelegate extends ServerDelegate if(vhost != null) { - sconn.setVirtualHost(vhost); + if (vhost.getState() != State.ACTIVE) + { + sconn.setState(Connection.State.CLOSING); + sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host '"+vhostName+"' is not active")); + return; + } + sconn.setVirtualHost(vhost); try { vhost.getSecurityManager().authoriseCreateConnection(sconn); @@ -224,16 +230,8 @@ public class ServerConnectionDelegate extends ServerDelegate return; } - if (vhost.getState() != State.ACTIVE) - { - sconn.setState(Connection.State.CLOSING); - sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host '"+vhostName+"' is not active")); - } - else - { - sconn.setState(Connection.State.OPEN); - sconn.invoke(new ConnectionOpenOk(Collections.emptyList())); - } + sconn.setState(Connection.State.OPEN); + sconn.invoke(new ConnectionOpenOk(Collections.emptyList())); } else { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java index 80a66292bf..632f751756 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java @@ -81,9 +81,13 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener