diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-07-19 11:23:49 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-07-19 11:23:49 +0000 |
| commit | 2797a71b663841fe1de6e8925e2168b1c59cbb8f (patch) | |
| tree | 7522e1bb76f527d024a22c98affe74d3129562f1 /qpid/java/broker/src | |
| parent | 5a94fabec3fd53e7046628baf2f0c7b3da81fe96 (diff) | |
| download | qpid-python-2797a71b663841fe1de6e8925e2168b1c59cbb8f.tar.gz | |
QPID-4149: Add REST functionality to delete exchanges, queues and bindings, add queue/exchange/binding delete operations into web UI
Applied patch from Oleksandr Rudyy <orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1363298 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
7 files changed, 130 insertions, 39 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index baf9cc3d09..07813b073b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -225,4 +225,21 @@ public class DefaultExchangeRegistry implements ExchangeRegistry } } + public boolean isReservedExchangeName(String name) + { + if (name == null || "".equals(name) || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString().equals(name) + || name.startsWith("amq.") || name.startsWith("qpid.")) + { + return true; + } + Collection<ExchangeType<? extends Exchange>> registeredTypes = _host.getExchangeFactory().getRegisteredTypes(); + for (ExchangeType<? extends Exchange> type : registeredTypes) + { + if (type.getDefaultExchangeName().toString().equals(name)) + { + return true; + } + } + return false; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index 692a2b2b0d..4dcedb4797 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -62,6 +62,13 @@ public interface ExchangeRegistry void addRegistryChangeListener(RegistryChangeListener listener); + /** + * Validates the name of user custom exchange. + * <p> + * Return true if the exchange name is reserved and false otherwise. + */ + boolean isReservedExchangeName(String name); + interface RegistryChangeListener { void exchangeRegistered(Exchange exchange); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 53faefc954..24a3d43386 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -131,8 +131,6 @@ public interface VirtualHost extends ConfiguredObject boolean exclusive, LifetimePolicy lifetime, long ttl, Map<String, Object> attributes) throws AccessControlException, IllegalArgumentException; - void deleteQueue(Queue queue) throws AccessControlException, IllegalStateException; - Collection<String> getExchangeTypes(); public static interface Transaction diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java index f041494781..abd3160686 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java @@ -31,6 +31,7 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; @@ -216,4 +217,16 @@ final class BindingAdapter extends AbstractAdapter implements Binding { return Binding.AVAILABLE_ATTRIBUTES; } + + @Override + public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException, + AccessControlException + { + if (desiredState == State.DELETED) + { + delete(); + return State.DELETED; + } + return super.setDesiredState(currentState, desiredState); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java index c0d85845d6..df0f29fbc3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java @@ -30,9 +30,13 @@ import java.util.Map; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Publisher; import org.apache.qpid.server.model.Queue; @@ -50,7 +54,6 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa private VirtualHostAdapter _vhost; private final ExchangeStatistics _statistics; - public ExchangeAdapter(final VirtualHostAdapter virtualHostAdapter, final org.apache.qpid.server.exchange.Exchange exchange) { @@ -164,7 +167,21 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa { try { - _vhost.getVirtualHost().getExchangeRegistry().unregisterExchange(getName(), false); + ExchangeRegistry exchangeRegistry = _vhost.getVirtualHost().getExchangeRegistry(); + if (exchangeRegistry.isReservedExchangeName(getName())) + { + throw new UnsupportedOperationException("'" + getName() + "' is a reserved exchange and can't be deleted"); + } + + if(_exchange.hasReferrers()) + { + throw new AMQException( AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange", null); + } + + synchronized(exchangeRegistry) + { + exchangeRegistry.unregisterExchange(getName(), false); + } } catch(AMQException e) { @@ -364,6 +381,18 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa return AVAILABLE_ATTRIBUTES; } + @Override + public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException, + AccessControlException + { + if (desiredState == State.DELETED) + { + delete(); + return State.DELETED; + } + return super.setDesiredState(currentState, desiredState); + } + private class ExchangeStatistics implements Statistics { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index 21c4aef323..5a5f7435a2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.binding.Binding; @@ -34,6 +35,7 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFinder; import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; @@ -130,11 +132,14 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs { try { - _queue.delete(); - if (_queue.isDurable()) + QueueRegistry queueRegistry = _queue.getVirtualHost().getQueueRegistry(); + synchronized(queueRegistry) { - - _queue.getVirtualHost().getMessageStore().removeQueue(_queue); + _queue.delete(); + if (_queue.isDurable()) + { + _queue.getVirtualHost().getMessageStore().removeQueue(_queue); + } } } catch(AMQException e) @@ -705,4 +710,17 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs listener.notifyClients(notification, this, notificationMsg); } } + + @Override + public State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException, + AccessControlException + { + if (desiredState == State.DELETED) + { + delete(); + return State.DELETED; + } + return super.setDesiredState(currentState, desiredState); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index bcfdb22fa9..35838e51d2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -228,18 +228,29 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E try { - org.apache.qpid.server.exchange.Exchange exchange = - _virtualHost.getExchangeFactory().createExchange(name, type, durable, - lifetime == LifetimePolicy.AUTO_DELETE); - _virtualHost.getExchangeRegistry().registerExchange(exchange); - if(durable) + ExchangeRegistry exchangeRegistry = _virtualHost.getExchangeRegistry(); + if (exchangeRegistry.isReservedExchangeName(name)) { - _virtualHost.getMessageStore().createExchange(exchange); + throw new UnsupportedOperationException("'" + name + "' is a reserved exchange name"); } - - synchronized (_exchangeAdapters) + synchronized(exchangeRegistry) { - return _exchangeAdapters.get(exchange); + org.apache.qpid.server.exchange.Exchange exchange = exchangeRegistry.getExchange(name); + if (exchange != null) + { + throw new IllegalArgumentException("Exchange with name '" + name + "' already exists"); + } + exchange = _virtualHost.getExchangeFactory().createExchange(name, type, durable, + lifetime == LifetimePolicy.AUTO_DELETE); + _virtualHost.getExchangeRegistry().registerExchange(exchange); + if(durable) + { + _virtualHost.getMessageStore().createExchange(exchange); + } + synchronized (_exchangeAdapters) + { + return _exchangeAdapters.get(exchange); + } } } catch(AMQException e) @@ -326,23 +337,27 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E } try { - if(_virtualHost.getQueueRegistry().getQueue(name)!=null) + QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + synchronized (queueRegistry) { - throw new IllegalArgumentException("Queue with name "+name+" already exists"); - } - AMQQueue queue = - AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name, - durable, owner, lifetime == LifetimePolicy.AUTO_DELETE, - exclusive, _virtualHost, attributes); - _virtualHost.getBindingFactory().addBinding(name, queue, _virtualHost.getExchangeRegistry().getDefaultExchange(), null); + if(_virtualHost.getQueueRegistry().getQueue(name)!=null) + { + throw new IllegalArgumentException("Queue with name "+name+" already exists"); + } + AMQQueue queue = + AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name, + durable, owner, lifetime == LifetimePolicy.AUTO_DELETE, + exclusive, _virtualHost, attributes); + _virtualHost.getBindingFactory().addBinding(name, queue, _virtualHost.getExchangeRegistry().getDefaultExchange(), null); - if(durable) - { - _virtualHost.getMessageStore().createQueue(queue, FieldTable.convertToFieldTable(attributes)); - } - synchronized (_queueAdapters) - { - return _queueAdapters.get(queue); + if(durable) + { + _virtualHost.getMessageStore().createQueue(queue, FieldTable.convertToFieldTable(attributes)); + } + synchronized (_queueAdapters) + { + return _queueAdapters.get(queue); + } } } @@ -569,13 +584,6 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E } } - public void deleteQueue(Queue queue) - throws AccessControlException, IllegalStateException - { - // TODO - throw new UnsupportedOperationException("Not Yet Implemented"); - } - public Collection<String> getExchangeTypes() { Collection<ExchangeType<? extends org.apache.qpid.server.exchange.Exchange>> types = @@ -880,4 +888,5 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E } } } + } |
