From f56230eaa511dbfa02759b1b1e4e85769cd80aae Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sun, 1 Mar 2015 21:33:36 +0000 Subject: QPID-6424 : Implement Connection.Redirect in 0-8/9/9-1 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1663170 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/filter/ArrivalTimeFilterFactory.java | 15 +- .../server/filter/JMSSelectorFilterFactory.java | 6 +- .../qpid/server/model/AttributeValueConverter.java | 91 +++- .../java/org/apache/qpid/server/model/Queue.java | 2 +- .../org/apache/qpid/server/model/VirtualHost.java | 3 + .../qpid/server/plugin/MessageFilterFactory.java | 2 +- .../apache/qpid/server/queue/AbstractQueue.java | 10 +- .../server/virtualhost/AbstractVirtualHost.java | 7 + .../virtualhostnode/RedirectingVirtualHost.java | 32 ++ .../RedirectingVirtualHostImpl.java | 492 +++++++++++++++++++++ .../RedirectingVirtualHostNode.java | 36 ++ .../RedirectingVirtualHostNodeImpl.java | 124 ++++++ 12 files changed, 798 insertions(+), 22 deletions(-) create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java (limited to 'qpid/java/broker-core') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java index 28e05eaa52..8c55c8ac76 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java @@ -31,22 +31,15 @@ public final class ArrivalTimeFilterFactory implements MessageFilterFactory { @Override - public MessageFilter newInstance(final List arguments) + public MessageFilter newInstance(final List arguments) { if(arguments == null || arguments.size() != 1) { throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments); } - Object arg = arguments.get(0); - long startingFrom; - if(arg instanceof Number) - { - startingFrom = ((Number)arg).longValue(); - } - else - { - startingFrom = Long.parseLong(String.valueOf(arg)); - } + String arg = arguments.get(0); + long startingFrom= Long.parseLong(arg); + return new ArrivalTimeFilter(startingFrom); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java index 683906dc88..233edc78cd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java @@ -38,16 +38,16 @@ public final class JMSSelectorFilterFactory implements MessageFilterFactory } @Override - public MessageFilter newInstance(final List arguments) + public MessageFilter newInstance(final List arguments) { if(arguments == null || arguments.size() != 1) { throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments); } - Object arg = arguments.get(0); + String arg = arguments.get(0); try { - return new JMSSelectorFilter(String.valueOf(arg)); + return new JMSSelectorFilter(arg); } catch (ParseException | TokenMgrError | SelectorParsingException e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java index 15e804e6f5..24e62ce7de 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java @@ -50,6 +50,25 @@ abstract class AttributeValueConverter } }; + static final AttributeValueConverter OBJECT_CONVERTER = new AttributeValueConverter() + { + @Override + public Object convert(final Object value, final ConfiguredObject object) + { + if(value instanceof String) + { + return AbstractConfiguredObject.interpolate(object, (String) value); + } + else if(value == null) + { + return null; + } + else + { + return value; + } + } + }; static final AttributeValueConverter UUID_CONVERTER = new AttributeValueConverter() { @Override @@ -398,7 +417,17 @@ abstract class AttributeValueConverter } else if(Map.class.isAssignableFrom(type)) { - return (AttributeValueConverter) MAP_CONVERTER; + if(returnType instanceof ParameterizedType) + { + Type keyType = ((ParameterizedType) returnType).getActualTypeArguments()[0]; + Type valueType = ((ParameterizedType) returnType).getActualTypeArguments()[1]; + + return (AttributeValueConverter) new GenericMapConverter(keyType,valueType); + } + else + { + return (AttributeValueConverter) MAP_CONVERTER; + } } else if(Collection.class.isAssignableFrom(type)) { @@ -416,6 +445,10 @@ abstract class AttributeValueConverter { return (AttributeValueConverter) new ConfiguredObjectConverter(type); } + else if(Object.class == type) + { + return (AttributeValueConverter) OBJECT_CONVERTER; + } throw new IllegalArgumentException("Cannot create attribute converter of type " + type.getName()); } @@ -575,6 +608,62 @@ abstract class AttributeValueConverter } } + public static class GenericMapConverter extends AttributeValueConverter + { + + private final AttributeValueConverter _keyConverter; + private final AttributeValueConverter _valueConverter; + + + public GenericMapConverter(final Type keyType, final Type valueType) + { + _keyConverter = getConverter(getRawType(keyType), keyType); + + _valueConverter = getConverter(getRawType(valueType), valueType); + } + + + @Override + public Map convert(final Object value, final ConfiguredObject object) + { + if(value instanceof Map) + { + Map original = (Map)value; + Map converted = new LinkedHashMap(original.size()); + for(Map.Entry entry : original.entrySet()) + { + converted.put(_keyConverter.convert(entry.getKey(),object), + _valueConverter.convert(entry.getValue(), object)); + } + return Collections.unmodifiableMap(converted); + } + else if(value == null) + { + return null; + } + else + { + if(value instanceof String) + { + String interpolated = AbstractConfiguredObject.interpolate(object, (String) value); + ObjectMapper objectMapper = new ObjectMapper(); + try + { + return convert(objectMapper.readValue(interpolated, Map.class), object); + } + catch (IOException e) + { + // fall through to the non-JSON single object case + } + } + + throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Map"); + } + + } + } + + static final class EnumConverter> extends AttributeValueConverter { private final Class _klazz; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index 9c6442e7c3..ba1f262cfc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -163,7 +163,7 @@ public interface Queue> extends ConfiguredObject long getMaximumMessageTtl(); @ManagedAttribute - Map>> getDefaultFilters(); + Map>> getDefaultFilters(); //children Collection getBindings(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 6742a5dfa5..38853e0a64 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -28,6 +28,7 @@ import java.util.UUID; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.store.MessageStore; @ManagedObject( defaultType = "ProvidedStore") @@ -144,6 +145,8 @@ public interface VirtualHost, Q extends Queue, void delete(); + String getRedirectHost(AmqpPort port); + public static interface Transaction { void dequeue(MessageInstance entry); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java index 9c76f5590e..372642310e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java @@ -26,5 +26,5 @@ import org.apache.qpid.server.filter.MessageFilter; public interface MessageFilterFactory extends Pluggable { - MessageFilter newInstance(List arguments); + MessageFilter newInstance(List arguments); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 6e9af7780c..fc7d9d0fec 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -191,7 +191,7 @@ public abstract class AbstractQueue> private MessageDurability _messageDurability; @ManagedAttributeField - private Map>> _defaultFilters; + private Map>> _defaultFilters; private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name @@ -467,17 +467,17 @@ public abstract class AbstractQueue> final Map messageFilterFactories = qpidServiceLoader.getInstancesByType(MessageFilterFactory.class); - for (Map.Entry>> entry : _defaultFilters.entrySet()) + for (Map.Entry>> entry : _defaultFilters.entrySet()) { String name = String.valueOf(entry.getKey()); - Map> filterValue = entry.getValue(); + Map> filterValue = entry.getValue(); if(filterValue.size() == 1) { String filterTypeName = String.valueOf(filterValue.keySet().iterator().next()); MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName); if(filterFactory != null) { - List filterArguments = filterValue.values().iterator().next(); + List filterArguments = filterValue.values().iterator().next(); _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments)); } else @@ -599,7 +599,7 @@ public abstract class AbstractQueue> } @Override - public Map>> getDefaultFilters() + public Map>> getDefaultFilters() { return _defaultFilters; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 21f0f47835..dff598790a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -63,6 +63,7 @@ import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.adapter.ConnectionAdapter; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ConnectionValidator; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.plugin.SystemNodeCreator; @@ -934,6 +935,12 @@ public abstract class AbstractVirtualHost> exte } } + @Override + public String getRedirectHost(final AmqpPort port) + { + return null; + } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask { public VirtualHostHouseKeepingTask() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java new file mode 100644 index 0000000000..5e87b2e511 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.NonStandardVirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public interface RedirectingVirtualHost> + extends VirtualHostImpl, ExchangeImpl>, + NonStandardVirtualHost,ExchangeImpl> +{ +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java new file mode 100644 index 0000000000..89bd2fc8b9 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java @@ -0,0 +1,492 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; + +import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHostAlias; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.HouseKeepingTask; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; + +@ManagedObject( category = false, type = RedirectingVirtualHostImpl.TYPE, register = false ) +class RedirectingVirtualHostImpl + extends AbstractConfiguredObject + implements RedirectingVirtualHost +{ + public static final String TYPE = "REDIRECTOR"; + private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + + @ManagedAttributeField + private boolean _queue_deadLetterQueueEnabled; + + @ManagedAttributeField + private long _housekeepingCheckPeriod; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutWarn; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutWarn; + @ManagedAttributeField + private int _housekeepingThreadCount; + + @ManagedAttributeField + private List _enabledConnectionValidators; + + @ManagedAttributeField + private List _disabledConnectionValidators; + + + @ManagedObjectFactoryConstructor + public RedirectingVirtualHostImpl(final Map attributes, VirtualHostNode virtualHostNode) + { + super(parentsMap(virtualHostNode), attributes); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); + _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); + _messagesReceived = new StatisticsCounter("messages-received-" + getName()); + _dataReceived = new StatisticsCounter("bytes-received-" + getName()); + setState(State.UNAVAILABLE); + } + + @Override + protected void validateChange(final ConfiguredObject proxyForValidation, final Set changedAttributes) + { + super.validateChange(proxyForValidation, changedAttributes); + + throwUnsupportedForRedirector(); + } + + @Override + public String getModelVersion() + { + return BrokerModel.MODEL_VERSION; + } + + @Override + protected C addChild(final Class childClass, + final Map attributes, + final ConfiguredObject... otherParents) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public ExchangeImpl createExchange(final Map attributes) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public void removeExchange(final ExchangeImpl exchange, final boolean force) + throws ExchangeIsAlternateException, RequiredExchangeException + { + throwUnsupportedForRedirector(); + } + + @Override + public MessageDestination getMessageDestination(final String name) + { + return null; + } + + @Override + public ExchangeImpl getExchange(final String name) + { + return null; + } + + @Override + public AMQQueue createQueue(final Map attributes) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public void executeTransaction(final TransactionalOperation op) + { + throwUnsupportedForRedirector(); + } + + @Override + public Collection getExchangeTypeNames() + { + return getObjectFactory().getSupportedTypes(Exchange.class); + } + + @Override + public String getRedirectHost(final AmqpPort port) + { + return ((RedirectingVirtualHostNode)(getParent(VirtualHostNode.class))).getRedirects().get(port); + } + + @Override + public boolean isQueue_deadLetterQueueEnabled() + { + return false; + } + + @Override + public long getHousekeepingCheckPeriod() + { + return 0; + } + + @Override + public long getStoreTransactionIdleTimeoutClose() + { + return 0; + } + + @Override + public long getStoreTransactionIdleTimeoutWarn() + { + return 0; + } + + @Override + public long getStoreTransactionOpenTimeoutClose() + { + return 0; + } + + @Override + public long getStoreTransactionOpenTimeoutWarn() + { + return 0; + } + + @Override + public int getHousekeepingThreadCount() + { + return 0; + } + + @Override + public long getQueueCount() + { + return 0; + } + + @Override + public long getExchangeCount() + { + return 0; + } + + @Override + public long getConnectionCount() + { + return 0; + } + + @Override + public long getBytesIn() + { + return 0; + } + + @Override + public long getBytesOut() + { + return 0; + } + + @Override + public long getMessagesIn() + { + return 0; + } + + @Override + public long getMessagesOut() + { + return 0; + } + + @Override + public Collection getAliases() + { + return Collections.emptyList(); + } + + @Override + public Collection getConnections() + { + return Collections.emptyList(); + } + + @Override + public IConnectionRegistry getConnectionRegistry() + { + return null; + } + + @Override + public AMQQueue getQueue(final String name) + { + return null; + } + + @Override + public MessageSource getMessageSource(final String name) + { + return null; + } + + @Override + public AMQQueue getQueue(final UUID id) + { + return null; + } + + @Override + public Collection> getQueues() + { + return Collections.emptyList(); + } + + @Override + public int removeQueue(final AMQQueue queue) + { + throwUnsupportedForRedirector(); + return 0; + } + + @Override + public Collection> getExchanges() + { + return Collections.emptyList(); + } + + @Override + public DurableConfigurationStore getDurableConfigurationStore() + { + return null; + } + + @Override + public ExchangeImpl getExchange(final UUID id) + { + return null; + } + + @Override + public MessageDestination getDefaultDestination() + { + return null; + } + + @Override + public MessageStore getMessageStore() + { + return null; + } + + @Override + public void setTargetSize(final long targetSize) + { + + } + + @Override + public long getTotalQueueDepthBytes() + { + return 0l; + } + + @Override + public org.apache.qpid.server.security.SecurityManager getSecurityManager() + { + return null; + } + + @Override + public void scheduleHouseKeepingTask(final long period, final HouseKeepingTask task) + { + } + + @Override + public long getHouseKeepingTaskCount() + { + return 0; + } + + @Override + public long getHouseKeepingCompletedTaskCount() + { + return 0; + } + + @Override + public int getHouseKeepingPoolSize() + { + return 0; + } + + @Override + public void setHouseKeepingPoolSize(final int newSize) + { + } + + @Override + public int getHouseKeepingActiveCount() + { + return 0; + } + + @Override + public DtxRegistry getDtxRegistry() + { + return null; + } + + @Override + public LinkRegistry getLinkRegistry(final String remoteContainerId) + { + return null; + } + + @Override + public ScheduledFuture scheduleTask(final long delay, final Runnable timeoutTask) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public boolean getDefaultDeadLetterQueueEnabled() + { + return false; + } + + @Override + public EventLogger getEventLogger() + { + return null; + } + + @Override + public void registerMessageReceived(final long messageSize, final long timestamp) + { + throwUnsupportedForRedirector(); + } + + @Override + public void registerMessageDelivered(final long messageSize) + { + throwUnsupportedForRedirector(); + } + + @Override + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + @Override + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + @Override + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + @Override + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + @Override + public void resetStatistics() + { + } + + @Override + public boolean authoriseCreateConnection(final AMQConnectionModel connection) + { + return false; + } + + @Override + public List getEnabledConnectionValidators() + { + return _enabledConnectionValidators; + } + + @Override + public List getDisabledConnectionValidators() + { + return _disabledConnectionValidators; + } + + private void throwUnsupportedForRedirector() + { + throw new IllegalStateException("The virtual host state of " + getState() + + " does not permit this operation."); + } + + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java new file mode 100644 index 0000000000..636681db72 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import java.util.Map; + +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.VirtualHostNode; + +@ManagedObject(type= RedirectingVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category=false, validChildTypes = "org.apache.qpid.server.virtualhostnode.RedirectingVirtualHostNodeImpl#getSupportedChildTypes()") +public interface RedirectingVirtualHostNode> extends VirtualHostNode +{ + + @ManagedAttribute( defaultValue = "{}") + Map, String> getRedirects(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java new file mode 100644 index 0000000000..c94d113514 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.RemoteReplicationNode; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.DurableConfigurationStore; + + +public class RedirectingVirtualHostNodeImpl + extends AbstractConfiguredObject implements RedirectingVirtualHostNode +{ + private static final Logger LOGGER = LoggerFactory.getLogger(RedirectingVirtualHostImpl.class); + public static final String VIRTUAL_HOST_NODE_TYPE = "Redirector"; + + + @ManagedAttributeField + private String _virtualHostInitialConfiguration; + + @ManagedAttributeField + private Map,String> _redirects; + + private RedirectingVirtualHostImpl _virtualHost; + + @ManagedObjectFactoryConstructor + public RedirectingVirtualHostNodeImpl(Map attributes, Broker parent) + { + super(Collections.,ConfiguredObject>singletonMap(Broker.class, parent), + attributes); + } + + @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) + protected void doActivate() + { + try + { + _virtualHost = new RedirectingVirtualHostImpl(Collections.singletonMap(ConfiguredObject.NAME,getName()), this); + _virtualHost.create(); + setState(State.ACTIVE); + } + catch(RuntimeException e) + { + setState(State.ERRORED); + if (getParent(Broker.class).isManagementMode()) + { + LOGGER.warn("Failed to make " + this + " active.", e); + } + else + { + throw e; + } + } + } + + @Override + public String getVirtualHostInitialConfiguration() + { + return _virtualHostInitialConfiguration; + } + + @Override + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + @Override + public DurableConfigurationStore getConfigurationStore() + { + return null; + } + + @Override + public Collection getRemoteReplicationNodes() + { + return Collections.emptySet(); + } + + @Override + public Map, String> getRedirects() + { + return _redirects; + } + + public static Map> getSupportedChildTypes() + { + Collection validVhostTypes = Collections.singleton(RedirectingVirtualHostImpl.TYPE); + return Collections.singletonMap(VirtualHost.class.getSimpleName(), validVhostTypes); + } + +} -- cgit v1.2.1